Fix: some cases Task return but not set progress (#8469)

### What problem does this PR solve?
https://github.com/infiniflow/ragflow/issues/8466
I go through the codes, current logic:
When do_handle_task raises an exception, handle_task will set the
progress, but for some cases do_handle_task internal will just return
but not set the right progress, at this cases the redis stream will been
acked but the task is running.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

---------

Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>
This commit is contained in:
Stephen Hu
2025-06-25 09:58:55 +08:00
committed by GitHub
parent af6850c8d8
commit 8d9d2cc0a9
2 changed files with 8 additions and 8 deletions

View File

@ -387,12 +387,12 @@ def queue_tasks(doc: dict, bucket: str, name: str, priority: int):
for task in parse_task_array: for task in parse_task_array:
ck_num += reuse_prev_task_chunks(task, prev_tasks, chunking_config) ck_num += reuse_prev_task_chunks(task, prev_tasks, chunking_config)
TaskService.filter_delete([Task.doc_id == doc["id"]]) TaskService.filter_delete([Task.doc_id == doc["id"]])
chunk_ids = [] pre_chunk_ids = []
for task in prev_tasks: for pre_task in prev_tasks:
if task["chunk_ids"]: if pre_task["chunk_ids"]:
chunk_ids.extend(task["chunk_ids"].split()) pre_chunk_ids.extend(pre_task["chunk_ids"].split())
if chunk_ids: if pre_chunk_ids:
settings.docStoreConn.delete({"id": chunk_ids}, search.index_name(chunking_config["tenant_id"]), settings.docStoreConn.delete({"id": pre_chunk_ids}, search.index_name(chunking_config["tenant_id"]),
chunking_config["kb_id"]) chunking_config["kb_id"])
DocumentService.update_by_id(doc["id"], {"chunk_num": ck_num}) DocumentService.update_by_id(doc["id"], {"chunk_num": ck_num})

View File

@ -543,6 +543,7 @@ async def do_handle_task(task):
# Either using graphrag or Standard chunking methods # Either using graphrag or Standard chunking methods
elif task.get("task_type", "") == "graphrag": elif task.get("task_type", "") == "graphrag":
if not task_parser_config.get("graphrag", {}).get("use_graphrag", False): if not task_parser_config.get("graphrag", {}).get("use_graphrag", False):
progress_callback(prog=-1.0, msg="Internal configuration error.")
return return
graphrag_conf = task["kb_parser_config"].get("graphrag", {}) graphrag_conf = task["kb_parser_config"].get("graphrag", {})
start_ts = timer() start_ts = timer()
@ -558,8 +559,6 @@ async def do_handle_task(task):
start_ts = timer() start_ts = timer()
chunks = await build_chunks(task, progress_callback) chunks = await build_chunks(task, progress_callback)
logging.info("Build document {}: {:.2f}s".format(task_document_name, timer() - start_ts)) logging.info("Build document {}: {:.2f}s".format(task_document_name, timer() - start_ts))
if chunks is None:
return
if not chunks: if not chunks:
progress_callback(1., msg=f"No chunk built from {task_document_name}") progress_callback(1., msg=f"No chunk built from {task_document_name}")
return return
@ -614,6 +613,7 @@ async def do_handle_task(task):
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for chunk_id in chunk_ids: for chunk_id in chunk_ids:
nursery.start_soon(delete_image, task_dataset_id, chunk_id) nursery.start_soon(delete_image, task_dataset_id, chunk_id)
progress_callback(-1, msg=f"Chunk updates failed since task {task['id']} is unknown.")
return return
logging.info("Indexing doc({}), page({}-{}), chunks({}), elapsed: {:.2f}".format(task_document_name, task_from_page, logging.info("Indexing doc({}), page({}-{}), chunks({}), elapsed: {:.2f}".format(task_document_name, task_from_page,