Feat: Receive reply messages of different event types from the agent #3221 (#8100)

### What problem does this PR solve?
Feat: Receive reply messages of different event types from the agent
#3221

### Type of change


- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
balibabu
2025-06-06 16:30:18 +08:00
committed by GitHub
parent 0e03542db5
commit 1885a4a4b8
5 changed files with 404 additions and 11 deletions

View File

@ -0,0 +1,138 @@
import { Authorization } from '@/constants/authorization';
import api from '@/utils/api';
import { getAuthorization } from '@/utils/authorization-util';
import { EventSourceParserStream } from 'eventsource-parser/stream';
import { useCallback, useRef, useState } from 'react';
export enum MessageEventType {
WorkflowStarted = 'workflow_started',
NodeStarted = 'node_started',
NodeFinished = 'node_finished',
Message = 'message',
MessageEnd = 'message_end',
WorkflowFinished = 'workflow_finished',
}
export interface IAnswerEvent<T> {
event: MessageEventType;
message_id: string;
created_at: number;
task_id: string;
data: T;
}
export interface INodeData {
inputs: Record<string, any>;
outputs: Record<string, any>;
component_id: string;
error: null | string;
elapsed_time: number;
created_at: number;
}
export interface IMessageData {
content: string;
}
export type INodeEvent = IAnswerEvent<INodeData>;
export type IMessageEvent = IAnswerEvent<IMessageData>;
export type IEventList = Array<INodeEvent | IMessageEvent>;
export const useSendMessageBySSE = (url: string = api.completeConversation) => {
const [answerList, setAnswerList] = useState<IEventList>([]);
const [done, setDone] = useState(true);
const timer = useRef<any>();
const sseRef = useRef<AbortController>();
const initializeSseRef = useCallback(() => {
sseRef.current = new AbortController();
}, []);
const resetAnswerList = useCallback(() => {
if (timer.current) {
clearTimeout(timer.current);
}
timer.current = setTimeout(() => {
setAnswerList([]);
clearTimeout(timer.current);
}, 1000);
}, []);
const send = useCallback(
async (
body: any,
controller?: AbortController,
): Promise<{ response: Response; data: ResponseType } | undefined> => {
initializeSseRef();
try {
setDone(false);
const response = await fetch(url, {
method: 'POST',
headers: {
[Authorization]: getAuthorization(),
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
signal: controller?.signal || sseRef.current?.signal,
});
const res = response.clone().json();
const reader = response?.body
?.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.getReader();
while (true) {
const x = await reader?.read();
if (x) {
const { done, value } = x;
if (done) {
console.info('done');
resetAnswerList();
break;
}
try {
const val = JSON.parse(value?.data || '');
console.info('data:', val);
setAnswerList((list) => {
const nextList = [...list];
nextList.push(val);
return nextList;
});
} catch (e) {
console.warn(e);
}
}
}
console.info('done?');
setDone(true);
resetAnswerList();
return { data: await res, response };
} catch (e) {
setDone(true);
resetAnswerList();
console.warn(e);
}
},
[initializeSseRef, url, resetAnswerList],
);
const stopOutputMessage = useCallback(() => {
sseRef.current?.abort();
}, []);
return {
send,
answerList,
done,
setDone,
resetAnswerList,
stopOutputMessage,
};
};