mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
Feat: Let number of task ahead be visible. (#8259)
### What problem does this PR solve?  ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@ -67,12 +67,13 @@ def upload():
|
|||||||
raise LookupError("Can't find this knowledgebase!")
|
raise LookupError("Can't find this knowledgebase!")
|
||||||
err, files = FileService.upload_document(kb, file_objs, current_user.id)
|
err, files = FileService.upload_document(kb, file_objs, current_user.id)
|
||||||
|
|
||||||
|
if err:
|
||||||
|
return get_json_result(data=files, message="\n".join(err), code=settings.RetCode.SERVER_ERROR)
|
||||||
|
|
||||||
if not files:
|
if not files:
|
||||||
return get_json_result(data=files, message="There seems to be an issue with your file format. Please verify it is correct and not corrupted.", code=settings.RetCode.DATA_ERROR)
|
return get_json_result(data=files, message="There seems to be an issue with your file format. Please verify it is correct and not corrupted.", code=settings.RetCode.DATA_ERROR)
|
||||||
files = [f[0] for f in files] # remove the blob
|
files = [f[0] for f in files] # remove the blob
|
||||||
|
|
||||||
if err:
|
|
||||||
return get_json_result(data=files, message="\n".join(err), code=settings.RetCode.SERVER_ERROR)
|
|
||||||
return get_json_result(data=files)
|
return get_json_result(data=files)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -35,7 +35,7 @@ from api.db.services.common_service import CommonService
|
|||||||
from api.db.services.knowledgebase_service import KnowledgebaseService
|
from api.db.services.knowledgebase_service import KnowledgebaseService
|
||||||
from api.utils import current_timestamp, get_format_time, get_uuid
|
from api.utils import current_timestamp, get_format_time, get_uuid
|
||||||
from rag.nlp import rag_tokenizer, search
|
from rag.nlp import rag_tokenizer, search
|
||||||
from rag.settings import get_svr_queue_name
|
from rag.settings import get_svr_queue_name, SVR_CONSUMER_GROUP_NAME
|
||||||
from rag.utils.redis_conn import REDIS_CONN
|
from rag.utils.redis_conn import REDIS_CONN
|
||||||
from rag.utils.storage_factory import STORAGE_IMPL
|
from rag.utils.storage_factory import STORAGE_IMPL
|
||||||
from rag.utils.doc_store_conn import OrderByExpr
|
from rag.utils.doc_store_conn import OrderByExpr
|
||||||
@ -484,7 +484,8 @@ class DocumentService(CommonService):
|
|||||||
if t.progress == -1:
|
if t.progress == -1:
|
||||||
bad += 1
|
bad += 1
|
||||||
prg += t.progress if t.progress >= 0 else 0
|
prg += t.progress if t.progress >= 0 else 0
|
||||||
msg.append(t.progress_msg)
|
if t.progress_msg.strip():
|
||||||
|
msg.append(t.progress_msg)
|
||||||
if t.task_type == "raptor":
|
if t.task_type == "raptor":
|
||||||
has_raptor = True
|
has_raptor = True
|
||||||
elif t.task_type == "graphrag":
|
elif t.task_type == "graphrag":
|
||||||
@ -514,6 +515,8 @@ class DocumentService(CommonService):
|
|||||||
info["progress"] = prg
|
info["progress"] = prg
|
||||||
if msg:
|
if msg:
|
||||||
info["progress_msg"] = msg
|
info["progress_msg"] = msg
|
||||||
|
else:
|
||||||
|
info["progress_msg"] = "%d tasks are ahead in the queue..."%get_queue_length(priority)
|
||||||
cls.update_by_id(d["id"], info)
|
cls.update_by_id(d["id"], info)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if str(e).find("'0'") < 0:
|
if str(e).find("'0'") < 0:
|
||||||
@ -562,6 +565,11 @@ def queue_raptor_o_graphrag_tasks(doc, ty, priority):
|
|||||||
assert REDIS_CONN.queue_product(get_svr_queue_name(priority), message=task), "Can't access Redis. Please check the Redis' status."
|
assert REDIS_CONN.queue_product(get_svr_queue_name(priority), message=task), "Can't access Redis. Please check the Redis' status."
|
||||||
|
|
||||||
|
|
||||||
|
def get_queue_length(priority):
|
||||||
|
group_info = REDIS_CONN.queue_info(get_svr_queue_name(priority), SVR_CONSUMER_GROUP_NAME)
|
||||||
|
return int(group_info.get("lag", 0))
|
||||||
|
|
||||||
|
|
||||||
def doc_upload_and_parse(conversation_id, file_objs, user_id):
|
def doc_upload_and_parse(conversation_id, file_objs, user_id):
|
||||||
from api.db.services.api_service import API4ConversationService
|
from api.db.services.api_service import API4ConversationService
|
||||||
from api.db.services.conversation_service import ConversationService
|
from api.db.services.conversation_service import ConversationService
|
||||||
|
|||||||
Reference in New Issue
Block a user