support workflow events

This commit is contained in:
JzoNg
2024-04-23 17:09:12 +08:00
parent 2bd93dcbaa
commit c73753138d
119 changed files with 40066 additions and 24 deletions

View File

@ -22,6 +22,77 @@ const baseOptions = {
redirect: 'follow',
}
export type WorkflowStartedResponse = {
task_id: string
workflow_run_id: string
event: string
data: {
id: string
workflow_id: string
sequence_number: number
created_at: number
}
}
export type WorkflowFinishedResponse = {
task_id: string
workflow_run_id: string
event: string
data: {
id: string
workflow_id: string
status: string
outputs: any
error: string
elapsed_time: number
total_tokens: number
total_steps: number
created_at: number
finished_at: number
}
}
export type NodeStartedResponse = {
task_id: string
workflow_run_id: string
event: string
data: {
id: string
node_id: string
node_type: string
index: number
predecessor_node_id?: string
inputs: any
created_at: number
extras?: any
}
}
export type NodeFinishedResponse = {
task_id: string
workflow_run_id: string
event: string
data: {
id: string
node_id: string
node_type: string
index: number
predecessor_node_id?: string
inputs: any
process_data: any
outputs: any
status: string
error: string
elapsed_time: number
execution_metadata: {
total_tokens: number
total_price: number
currency: string
}
created_at: number
}
}
export type IOnDataMoreInfo = {
conversationId?: string
taskId?: string
@ -38,6 +109,10 @@ export type IOnMessageReplace = (messageReplace: MessageReplace) => void
export type IOnAnnotationReply = (messageReplace: AnnotationReply) => void
export type IOnCompleted = (hasError?: boolean) => void
export type IOnError = (msg: string, code?: string) => void
export type IOnWorkflowStarted = (workflowStarted: WorkflowStartedResponse) => void
export type IOnWorkflowFinished = (workflowFinished: WorkflowFinishedResponse) => void
export type IOnNodeStarted = (nodeStarted: NodeStartedResponse) => void
export type IOnNodeFinished = (nodeFinished: NodeFinishedResponse) => void
type IOtherOptions = {
isPublicAPI?: boolean
@ -52,6 +127,10 @@ type IOtherOptions = {
onError?: IOnError
onCompleted?: IOnCompleted // for stream
getAbortController?: (abortController: AbortController) => void
onWorkflowStarted?: IOnWorkflowStarted
onWorkflowFinished?: IOnWorkflowFinished
onNodeStarted?: IOnNodeStarted
onNodeFinished?: IOnNodeFinished
}
function unicodeToChar(text: string) {
@ -60,7 +139,19 @@ function unicodeToChar(text: string) {
})
}
const handleStream = (response: Response, onData: IOnData, onCompleted?: IOnCompleted, onThought?: IOnThought, onMessageEnd?: IOnMessageEnd, onMessageReplace?: IOnMessageReplace, onFile?: IOnFile) => {
const handleStream = (
response: Response,
onData: IOnData,
onCompleted?: IOnCompleted,
onThought?: IOnThought,
onMessageEnd?: IOnMessageEnd,
onMessageReplace?: IOnMessageReplace,
onFile?: IOnFile,
onWorkflowStarted?: IOnWorkflowStarted,
onWorkflowFinished?: IOnWorkflowFinished,
onNodeStarted?: IOnNodeStarted,
onNodeFinished?: IOnNodeFinished,
) => {
if (!response.ok)
throw new Error('Network response was not ok')
@ -124,6 +215,18 @@ const handleStream = (response: Response, onData: IOnData, onCompleted?: IOnComp
else if (bufferObj.event === 'message_replace') {
onMessageReplace?.(bufferObj as MessageReplace)
}
else if (bufferObj.event === 'workflow_started') {
onWorkflowStarted?.(bufferObj as WorkflowStartedResponse)
}
else if (bufferObj.event === 'workflow_finished') {
onWorkflowFinished?.(bufferObj as WorkflowFinishedResponse)
}
else if (bufferObj.event === 'node_started') {
onNodeStarted?.(bufferObj as NodeStartedResponse)
}
else if (bufferObj.event === 'node_finished') {
onNodeFinished?.(bufferObj as NodeFinishedResponse)
}
}
})
buffer = lines[lines.length - 1]
@ -258,7 +361,23 @@ export const upload = (fetchOptions: any): Promise<any> => {
})
}
export const ssePost = (url: string, fetchOptions: any, { onData, onCompleted, onThought, onFile, onMessageEnd, onMessageReplace, onError }: IOtherOptions) => {
export const ssePost = (
url: string,
fetchOptions: any,
{
onData,
onCompleted,
onThought,
onFile,
onMessageEnd,
onMessageReplace,
onWorkflowStarted,
onWorkflowFinished,
onNodeStarted,
onNodeFinished,
onError,
}: IOtherOptions,
) => {
const options = Object.assign({}, baseOptions, {
method: 'POST',
}, fetchOptions)
@ -290,7 +409,7 @@ export const ssePost = (url: string, fetchOptions: any, { onData, onCompleted, o
onData?.(str, isFirstMessage, moreInfo)
}, () => {
onCompleted?.()
}, onThought, onMessageEnd, onMessageReplace, onFile)
}, onThought, onMessageEnd, onMessageReplace, onFile, onWorkflowStarted, onWorkflowFinished, onNodeStarted, onNodeFinished)
}).catch((e) => {
Toast.notify({ type: 'error', message: e })
onError?.(e)