From f7074037efe7bee90de7e5a933499a484c92ea99 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Fri, 13 Jun 2025 17:32:40 +0800 Subject: [PATCH] Feat: Let number of task ahead be visible. (#8259) ### What problem does this PR solve? ![image](https://github.com/user-attachments/assets/d4ef0526-343a-426f-a85a-b05eb8b559a1) ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- api/apps/document_app.py | 5 +++-- api/db/services/document_service.py | 12 ++++++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/api/apps/document_app.py b/api/apps/document_app.py index 43c7f813b..d6be6d91b 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -67,12 +67,13 @@ def upload(): raise LookupError("Can't find this knowledgebase!") err, files = FileService.upload_document(kb, file_objs, current_user.id) + if err: + return get_json_result(data=files, message="\n".join(err), code=settings.RetCode.SERVER_ERROR) + if not files: return get_json_result(data=files, message="There seems to be an issue with your file format. Please verify it is correct and not corrupted.", code=settings.RetCode.DATA_ERROR) files = [f[0] for f in files] # remove the blob - if err: - return get_json_result(data=files, message="\n".join(err), code=settings.RetCode.SERVER_ERROR) return get_json_result(data=files) diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 3bc736e66..8b7bc6660 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -35,7 +35,7 @@ from api.db.services.common_service import CommonService from api.db.services.knowledgebase_service import KnowledgebaseService from api.utils import current_timestamp, get_format_time, get_uuid from rag.nlp import rag_tokenizer, search -from rag.settings import get_svr_queue_name +from rag.settings import get_svr_queue_name, SVR_CONSUMER_GROUP_NAME from rag.utils.redis_conn import REDIS_CONN from rag.utils.storage_factory import STORAGE_IMPL from rag.utils.doc_store_conn import OrderByExpr @@ -484,7 +484,8 @@ class DocumentService(CommonService): if t.progress == -1: bad += 1 prg += t.progress if t.progress >= 0 else 0 - msg.append(t.progress_msg) + if t.progress_msg.strip(): + msg.append(t.progress_msg) if t.task_type == "raptor": has_raptor = True elif t.task_type == "graphrag": @@ -514,6 +515,8 @@ class DocumentService(CommonService): info["progress"] = prg if msg: info["progress_msg"] = msg + else: + info["progress_msg"] = "%d tasks are ahead in the queue..."%get_queue_length(priority) cls.update_by_id(d["id"], info) except Exception as e: if str(e).find("'0'") < 0: @@ -562,6 +565,11 @@ def queue_raptor_o_graphrag_tasks(doc, ty, priority): assert REDIS_CONN.queue_product(get_svr_queue_name(priority), message=task), "Can't access Redis. Please check the Redis' status." +def get_queue_length(priority): + group_info = REDIS_CONN.queue_info(get_svr_queue_name(priority), SVR_CONSUMER_GROUP_NAME) + return int(group_info.get("lag", 0)) + + def doc_upload_and_parse(conversation_id, file_objs, user_id): from api.db.services.api_service import API4ConversationService from api.db.services.conversation_service import ConversationService