From d5a44e913d68e78d0334390f3a2bd9ea73d8a4a4 Mon Sep 17 00:00:00 2001 From: Magicbook1108 Date: Tue, 23 Dec 2025 09:38:25 +0800 Subject: [PATCH] Fix: fix task cancel (#12093) ### What problem does this PR solve? Fix: fix task cancel ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- rag/svr/task_executor.py | 76 ++++++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index c7b821fbe..b36b6dd53 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -1024,33 +1024,65 @@ async def do_handle_task(task): chunk_count = len(set([chunk["id"] for chunk in chunks])) start_ts = timer() - e = await insert_es(task_id, task_tenant_id, task_dataset_id, chunks, progress_callback) - if not e: - return - logging.info("Indexing doc({}), page({}-{}), chunks({}), elapsed: {:.2f}".format(task_document_name, task_from_page, - task_to_page, len(chunks), - timer() - start_ts)) + async def _maybe_insert_es(_chunks): + if has_canceled(task_id): + return True + e = await insert_es(task_id, task_tenant_id, task_dataset_id, _chunks, progress_callback) + return bool(e) + + try: + if not await _maybe_insert_es(chunks): + return - DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0) + logging.info( + "Indexing doc({}), page({}-{}), chunks({}), elapsed: {:.2f}".format( + task_document_name, task_from_page, task_to_page, len(chunks), timer() - start_ts + ) + ) - time_cost = timer() - start_ts - progress_callback(msg="Indexing done ({:.2f}s).".format(time_cost)) - if toc_thread: - d = toc_thread.result() - if d: - e = await insert_es(task_id, task_tenant_id, task_dataset_id, [d], progress_callback) - if not e: - return - DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, 0, 1, 0) + DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0) - task_time_cost = timer() - task_start_ts - progress_callback(prog=1.0, msg="Task done ({:.2f}s)".format(task_time_cost)) - logging.info( - "Chunk doc({}), page({}-{}), chunks({}), token({}), elapsed:{:.2f}".format(task_document_name, task_from_page, - task_to_page, len(chunks), - token_count, task_time_cost)) + progress_callback(msg="Indexing done ({:.2f}s).".format(timer() - start_ts)) + if toc_thread: + d = toc_thread.result() + if d: + if not await _maybe_insert_es([d]): + return + DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, 0, 1, 0) + + if has_canceled(task_id): + progress_callback(-1, msg="Task has been canceled.") + return + + task_time_cost = timer() - task_start_ts + progress_callback(prog=1.0, msg="Task done ({:.2f}s)".format(task_time_cost)) + logging.info( + "Chunk doc({}), page({}-{}), chunks({}), token({}), elapsed:{:.2f}".format( + task_document_name, task_from_page, task_to_page, len(chunks), token_count, task_time_cost + ) + ) + + finally: + if has_canceled(task_id): + try: + exists = await asyncio.to_thread( + settings.docStoreConn.indexExist, + search.index_name(task_tenant_id), + task_dataset_id, + ) + if exists: + await asyncio.to_thread( + settings.docStoreConn.delete, + {"doc_id": task_doc_id}, + search.index_name(task_tenant_id), + task_dataset_id, + ) + except Exception: + logging.exception( + f"Remove doc({task_doc_id}) from docStore failed when task({task_id}) canceled." + ) async def handle_task():