Files
ragflow/web/src/hooks/use-send-message.ts
balibabu 8e1f8a0c48 Feat: Fixed the issue where the begin operator parameters could not be submitted during debugging #3221 (#8539)
### What problem does this PR solve?

Feat: Fixed the issue where the begin operator parameters could not be
submitted during debugging #3221

### Type of change


- [x] New Feature (non-breaking change which adds functionality)
2025-06-27 18:53:13 +08:00

142 lines
3.5 KiB
TypeScript

import { Authorization } from '@/constants/authorization';
import api from '@/utils/api';
import { getAuthorization } from '@/utils/authorization-util';
import { EventSourceParserStream } from 'eventsource-parser/stream';
import { useCallback, useRef, useState } from 'react';
export enum MessageEventType {
WorkflowStarted = 'workflow_started',
NodeStarted = 'node_started',
NodeFinished = 'node_finished',
Message = 'message',
MessageEnd = 'message_end',
WorkflowFinished = 'workflow_finished',
UserInputs = 'user_inputs',
}
export interface IAnswerEvent<T> {
event: MessageEventType;
message_id: string;
created_at: number;
task_id: string;
data: T;
}
export interface INodeData {
inputs: Record<string, any>;
outputs: Record<string, any>;
component_id: string;
error: null | string;
elapsed_time: number;
created_at: number;
}
export interface IMessageData {
content: string;
}
export type INodeEvent = IAnswerEvent<INodeData>;
export type IMessageEvent = IAnswerEvent<IMessageData>;
export type IChatEvent = INodeEvent | IMessageEvent;
export type IEventList = Array<IChatEvent>;
export const useSendMessageBySSE = (url: string = api.completeConversation) => {
const [answerList, setAnswerList] = useState<IEventList>([]);
const [done, setDone] = useState(true);
const timer = useRef<any>();
const sseRef = useRef<AbortController>();
const initializeSseRef = useCallback(() => {
sseRef.current = new AbortController();
}, []);
const resetAnswerList = useCallback(() => {
if (timer.current) {
clearTimeout(timer.current);
}
timer.current = setTimeout(() => {
setAnswerList([]);
clearTimeout(timer.current);
}, 1000);
}, []);
const send = useCallback(
async (
body: any,
controller?: AbortController,
): Promise<{ response: Response; data: ResponseType } | undefined> => {
initializeSseRef();
try {
setDone(false);
const response = await fetch(url, {
method: 'POST',
headers: {
[Authorization]: getAuthorization(),
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
signal: controller?.signal || sseRef.current?.signal,
});
const res = response.clone().json();
const reader = response?.body
?.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.getReader();
while (true) {
const x = await reader?.read();
if (x) {
const { done, value } = x;
if (done) {
console.info('done');
resetAnswerList();
break;
}
try {
const val = JSON.parse(value?.data || '');
console.info('data:', val);
setAnswerList((list) => {
const nextList = [...list];
nextList.push(val);
return nextList;
});
} catch (e) {
console.warn(e);
}
}
}
console.info('done?');
setDone(true);
resetAnswerList();
return { data: await res, response };
} catch (e) {
setDone(true);
resetAnswerList();
console.warn(e);
}
},
[initializeSseRef, url, resetAnswerList],
);
const stopOutputMessage = useCallback(() => {
sseRef.current?.abort();
}, []);
return {
send,
answerList,
done,
setDone,
resetAnswerList,
stopOutputMessage,
};
};