feat: Support for conversational streaming (#809)

### What problem does this PR solve?

feat: Support for conversational streaming
#709

### Type of change


- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
balibabu
2024-05-16 20:15:02 +08:00
committed by GitHub
parent 95f809187e
commit c6c9dbde64
21 changed files with 424 additions and 255 deletions

View File

@ -154,6 +154,9 @@ export const useRemoveConversation = () => {
return removeConversation;
};
/*
@deprecated
*/
export const useCompleteConversation = () => {
const dispatch = useDispatch();
@ -283,20 +286,4 @@ export const useFetchSharedConversation = () => {
return fetchSharedConversation;
};
export const useCompleteSharedConversation = () => {
const dispatch = useDispatch();
const completeSharedConversation = useCallback(
(payload: any) => {
return dispatch<any>({
type: 'chatModel/completeExternalConversation',
payload: payload,
});
},
[dispatch],
);
return completeSharedConversation;
};
//#endregion

View File

@ -1,13 +1,14 @@
import { Authorization } from '@/constants/authorization';
import { LanguageTranslationMap } from '@/constants/common';
import { Pagination } from '@/interfaces/common';
import { IAnswer } from '@/interfaces/database/chat';
import { IKnowledgeFile } from '@/interfaces/database/knowledge';
import { IChangeParserConfigRequestBody } from '@/interfaces/request/document';
import api from '@/utils/api';
import authorizationUtil from '@/utils/authorizationUtil';
import { getSearchValue } from '@/utils/commonUtil';
import { getAuthorization } from '@/utils/authorizationUtil';
import { PaginationProps } from 'antd';
import axios from 'axios';
import { EventSourceParserStream } from 'eventsource-parser/stream';
import { useCallback, useEffect, useMemo, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { useDispatch } from 'umi';
@ -138,62 +139,60 @@ export const useFetchAppConf = () => {
return appConf;
};
export const useConnectWithSse = (url: string) => {
const [content, setContent] = useState<string>('');
export const useSendMessageWithSse = (
url: string = api.completeConversation,
) => {
const [answer, setAnswer] = useState<IAnswer>({} as IAnswer);
const [done, setDone] = useState(true);
const connect = useCallback(() => {
const source = new EventSource(
url || '/sse/createSseEmitter?clientId=123456',
);
source.onopen = function () {
console.log('Connection to the server was opened.');
};
source.onmessage = function (event: any) {
setContent(event.data);
};
source.onerror = function (error) {
console.error('Error occurred:', error);
};
}, [url]);
return { connect, content };
};
export const useConnectWithSseNext = () => {
const [content, setContent] = useState<string>('');
const sharedId = getSearchValue('shared_id');
const authorization = sharedId
? 'Bearer ' + sharedId
: authorizationUtil.getAuthorization();
const send = useCallback(
async (body: any) => {
const response = await fetch(api.completeConversation, {
method: 'POST',
headers: {
[Authorization]: authorization,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
const reader = response?.body
?.pipeThrough(new TextDecoderStream())
.getReader();
try {
setDone(false);
const response = await fetch(url, {
method: 'POST',
headers: {
[Authorization]: getAuthorization(),
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
// const reader = response.body.getReader();
const reader = response?.body
?.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.getReader();
while (true) {
const { value, done } = await reader?.read();
console.log('Received', value);
setContent(value);
if (done) break;
while (true) {
const x = await reader?.read();
if (x) {
const { done, value } = x;
try {
const val = JSON.parse(value?.data || '');
const d = val?.data;
if (typeof d !== 'boolean') {
console.info('data:', d);
setAnswer(d);
}
} catch (e) {
console.warn(e);
}
if (done) {
console.info('done');
break;
}
}
}
console.info('done?');
setDone(true);
return response;
} catch (e) {
setDone(true);
console.warn(e);
}
return response;
},
[authorization],
[url],
);
return { send, content };
return { send, answer, done };
};