From 1885a4a4b8553202dd2a1f1ae9d7dff0412b3819 Mon Sep 17 00:00:00 2001 From: balibabu Date: Fri, 6 Jun 2025 16:30:18 +0800 Subject: [PATCH] Feat: Receive reply messages of different event types from the agent #3221 (#8100) ### What problem does this PR solve? Feat: Receive reply messages of different event types from the agent #3221 ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- web/src/hooks/use-send-message.ts | 138 ++++++++++++++++++++ web/src/pages/agent/chat/box.tsx | 91 +++++++++++++ web/src/pages/agent/chat/chat-sheet.tsx | 17 +-- web/src/pages/agent/chat/hooks.ts | 167 ++++++++++++++++++++++++ web/src/pages/agent/form-sheet/next.tsx | 2 +- 5 files changed, 404 insertions(+), 11 deletions(-) create mode 100644 web/src/hooks/use-send-message.ts create mode 100644 web/src/pages/agent/chat/box.tsx create mode 100644 web/src/pages/agent/chat/hooks.ts diff --git a/web/src/hooks/use-send-message.ts b/web/src/hooks/use-send-message.ts new file mode 100644 index 000000000..f86e81e41 --- /dev/null +++ b/web/src/hooks/use-send-message.ts @@ -0,0 +1,138 @@ +import { Authorization } from '@/constants/authorization'; +import api from '@/utils/api'; +import { getAuthorization } from '@/utils/authorization-util'; +import { EventSourceParserStream } from 'eventsource-parser/stream'; +import { useCallback, useRef, useState } from 'react'; + +export enum MessageEventType { + WorkflowStarted = 'workflow_started', + NodeStarted = 'node_started', + NodeFinished = 'node_finished', + Message = 'message', + MessageEnd = 'message_end', + WorkflowFinished = 'workflow_finished', +} + +export interface IAnswerEvent { + event: MessageEventType; + message_id: string; + created_at: number; + task_id: string; + data: T; +} + +export interface INodeData { + inputs: Record; + outputs: Record; + component_id: string; + error: null | string; + elapsed_time: number; + created_at: number; +} + +export interface IMessageData { + content: string; +} + +export type INodeEvent = IAnswerEvent; + +export type IMessageEvent = IAnswerEvent; + +export type IEventList = Array; + +export const useSendMessageBySSE = (url: string = api.completeConversation) => { + const [answerList, setAnswerList] = useState([]); + const [done, setDone] = useState(true); + const timer = useRef(); + const sseRef = useRef(); + + const initializeSseRef = useCallback(() => { + sseRef.current = new AbortController(); + }, []); + + const resetAnswerList = useCallback(() => { + if (timer.current) { + clearTimeout(timer.current); + } + timer.current = setTimeout(() => { + setAnswerList([]); + clearTimeout(timer.current); + }, 1000); + }, []); + + const send = useCallback( + async ( + body: any, + controller?: AbortController, + ): Promise<{ response: Response; data: ResponseType } | undefined> => { + initializeSseRef(); + try { + setDone(false); + const response = await fetch(url, { + method: 'POST', + headers: { + [Authorization]: getAuthorization(), + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + signal: controller?.signal || sseRef.current?.signal, + }); + + const res = response.clone().json(); + + const reader = response?.body + ?.pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()) + .getReader(); + + while (true) { + const x = await reader?.read(); + if (x) { + const { done, value } = x; + if (done) { + console.info('done'); + resetAnswerList(); + break; + } + try { + const val = JSON.parse(value?.data || ''); + + console.info('data:', val); + + setAnswerList((list) => { + const nextList = [...list]; + nextList.push(val); + return nextList; + }); + } catch (e) { + console.warn(e); + } + } + } + console.info('done?'); + setDone(true); + resetAnswerList(); + return { data: await res, response }; + } catch (e) { + setDone(true); + resetAnswerList(); + + console.warn(e); + } + }, + [initializeSseRef, url, resetAnswerList], + ); + + const stopOutputMessage = useCallback(() => { + sseRef.current?.abort(); + }, []); + + return { + send, + answerList, + done, + setDone, + resetAnswerList, + stopOutputMessage, + }; +}; diff --git a/web/src/pages/agent/chat/box.tsx b/web/src/pages/agent/chat/box.tsx new file mode 100644 index 000000000..c05ae267e --- /dev/null +++ b/web/src/pages/agent/chat/box.tsx @@ -0,0 +1,91 @@ +import MessageItem from '@/components/message-item'; +import { MessageType } from '@/constants/chat'; +import { useGetFileIcon } from '@/pages/chat/hooks'; +import { buildMessageItemReference } from '@/pages/chat/utils'; +import { Spin } from 'antd'; + +import { useSendNextMessage } from './hooks'; + +import MessageInput from '@/components/message-input'; +import PdfDrawer from '@/components/pdf-drawer'; +import { useClickDrawer } from '@/components/pdf-drawer/hooks'; +import { useFetchFlow } from '@/hooks/flow-hooks'; +import { useFetchUserInfo } from '@/hooks/user-setting-hooks'; +import { buildMessageUuidWithRole } from '@/utils/chat'; + +const AgentChatBox = () => { + const { + sendLoading, + handleInputChange, + handlePressEnter, + value, + loading, + ref, + derivedMessages, + reference, + stopOutputMessage, + } = useSendNextMessage(); + + const { visible, hideModal, documentId, selectedChunk, clickDocumentButton } = + useClickDrawer(); + useGetFileIcon(); + const { data: userInfo } = useFetchUserInfo(); + const { data: canvasInfo } = useFetchFlow(); + + return ( + <> +
+
+
+ + {derivedMessages?.map((message, i) => { + return ( + + ); + })} + +
+
+
+ +
+ + + ); +}; + +export default AgentChatBox; diff --git a/web/src/pages/agent/chat/chat-sheet.tsx b/web/src/pages/agent/chat/chat-sheet.tsx index 8356f3f57..703af40be 100644 --- a/web/src/pages/agent/chat/chat-sheet.tsx +++ b/web/src/pages/agent/chat/chat-sheet.tsx @@ -1,25 +1,22 @@ import { Sheet, SheetContent, - SheetDescription, SheetHeader, SheetTitle, - SheetTrigger, } from '@/components/ui/sheet'; import { IModalProps } from '@/interfaces/common'; +import { cn } from '@/lib/utils'; +import AgentChatBox from './box'; -export function ChatSheet({ visible }: IModalProps) { +export function ChatSheet({ visible, hideModal }: IModalProps) { return ( - - Open - + + + Are you absolutely sure? - - This action cannot be undone. This will permanently delete your - account and remove your data from our servers. - + ); diff --git a/web/src/pages/agent/chat/hooks.ts b/web/src/pages/agent/chat/hooks.ts new file mode 100644 index 000000000..911af3923 --- /dev/null +++ b/web/src/pages/agent/chat/hooks.ts @@ -0,0 +1,167 @@ +import { MessageType } from '@/constants/chat'; +import { useFetchFlow } from '@/hooks/flow-hooks'; +import { + useHandleMessageInputChange, + useSelectDerivedMessages, +} from '@/hooks/logic-hooks'; +import { + IEventList, + IMessageEvent, + MessageEventType, + useSendMessageBySSE, +} from '@/hooks/use-send-message'; +import { Message } from '@/interfaces/database/chat'; +import i18n from '@/locales/config'; +import api from '@/utils/api'; +import { message } from 'antd'; +import trim from 'lodash/trim'; +import { useCallback, useEffect } from 'react'; +import { useParams } from 'umi'; +import { v4 as uuid } from 'uuid'; +import { receiveMessageError } from '../utils'; + +const antMessage = message; + +export const useSelectNextMessages = () => { + const { data: flowDetail, loading } = useFetchFlow(); + const reference = flowDetail.dsl.reference; + const { + derivedMessages, + ref, + addNewestQuestion, + addNewestAnswer, + removeLatestMessage, + removeMessageById, + removeMessagesAfterCurrentMessage, + } = useSelectDerivedMessages(); + + return { + reference, + loading, + derivedMessages, + ref, + addNewestQuestion, + addNewestAnswer, + removeLatestMessage, + removeMessageById, + removeMessagesAfterCurrentMessage, + }; +}; + +function findMessageFromList(eventList: IEventList) { + const event = eventList.find((x) => x.event === MessageEventType.Message) as + | IMessageEvent + | undefined; + + return event?.data?.content; +} + +export const useSendNextMessage = () => { + const { + reference, + loading, + derivedMessages, + ref, + addNewestQuestion, + addNewestAnswer, + removeLatestMessage, + removeMessageById, + } = useSelectNextMessages(); + const { id: agentId } = useParams(); + const { handleInputChange, value, setValue } = useHandleMessageInputChange(); + const { refetch } = useFetchFlow(); + + const { send, answerList, done, stopOutputMessage } = useSendMessageBySSE( + api.runCanvas, + ); + + const sendMessage = useCallback( + async ({ message }: { message: Message; messages?: Message[] }) => { + const params: Record = { + id: agentId, + }; + params.running_hint_text = i18n.t('flow.runningHintText', { + defaultValue: 'is running...🕞', + }); + if (message.content) { + params.query = message.content; + // params.message_id = message.id; + params.inputs = {}; // begin operator inputs + } + const res = await send(params); + + if (receiveMessageError(res)) { + antMessage.error(res?.data?.message); + + // cancel loading + setValue(message.content); + removeLatestMessage(); + } else { + refetch(); // pull the message list after sending the message successfully + } + }, + [agentId, send, setValue, removeLatestMessage, refetch], + ); + + const handleSendMessage = useCallback( + async (message: Message) => { + sendMessage({ message }); + }, + [sendMessage], + ); + + useEffect(() => { + const message = findMessageFromList(answerList); + if (message) { + addNewestAnswer({ + answer: message, + reference: { + chunks: [], + doc_aggs: [], + total: 0, + }, + }); + } + }, [answerList, addNewestAnswer]); + + const handlePressEnter = useCallback(() => { + if (trim(value) === '') return; + const id = uuid(); + if (done) { + setValue(''); + handleSendMessage({ id, content: value.trim(), role: MessageType.User }); + } + addNewestQuestion({ + content: value, + id, + role: MessageType.User, + }); + }, [addNewestQuestion, handleSendMessage, done, setValue, value]); + + const fetchPrologue = useCallback(async () => { + // fetch prologue + const sendRet = await send({ id: agentId }); + if (receiveMessageError(sendRet)) { + message.error(sendRet?.data?.message); + } else { + refetch(); + } + }, [agentId, refetch, send]); + + useEffect(() => { + fetchPrologue(); + }, [fetchPrologue]); + + return { + handlePressEnter, + handleInputChange, + value, + sendLoading: !done, + reference, + loading, + derivedMessages, + ref, + removeMessageById, + stopOutputMessage, + }; +}; diff --git a/web/src/pages/agent/form-sheet/next.tsx b/web/src/pages/agent/form-sheet/next.tsx index ac1082615..e80bedb2f 100644 --- a/web/src/pages/agent/form-sheet/next.tsx +++ b/web/src/pages/agent/form-sheet/next.tsx @@ -108,9 +108,9 @@ const FormSheet = ({ return ( - +