From 301ed76aa40c36f301060c77d84a04dd468b67f0 Mon Sep 17 00:00:00 2001 From: Magicbook1108 Date: Fri, 6 Feb 2026 14:48:24 +0800 Subject: [PATCH] Fix: task cancel (#13034) ### What problem does this PR solve? Fix: task cancel #11745 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- api/apps/document_app.py | 4 +++- rag/svr/task_executor.py | 18 +++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/api/apps/document_app.py b/api/apps/document_app.py index 776db58c7..c5b1aea33 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -617,7 +617,9 @@ async def run(): return get_data_error_result(message="Document not found!") if str(req["run"]) == TaskStatus.CANCEL.value: - if str(doc.run) == TaskStatus.RUNNING.value: + tasks = list(TaskService.query(doc_id=id)) + has_unfinished_task = any((task.progress or 0) < 1 for task in tasks) + if str(doc.run) in [TaskStatus.RUNNING.value, TaskStatus.CANCEL.value] or has_unfinished_task: cancel_all_task_of(id) else: return get_data_error_result(message="Cannot cancel a task that is not in RUNNING status") diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index ca3e390ce..7af52adf8 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -165,6 +165,8 @@ def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing... if cancel: raise TaskCanceledException(msg) logging.info(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}") + except TaskCanceledException: + raise except DoesNotExist: logging.warning(f"set_progress({task_id}) got exception DoesNotExist") except Exception as e: @@ -693,6 +695,8 @@ async def run_dataflow(task: dict): for i, ck in enumerate(chunks): v = vects[i].tolist() ck["q_%d_vec" % len(v)] = v + except TaskCanceledException: + raise except Exception as e: set_progress(task_id, prog=-1, msg=f"[ERROR]: {e}") PipelineOperationLogService.create(document_id=doc_id, pipeline_id=dataflow_id, @@ -1113,6 +1117,8 @@ async def do_handle_task(task): start_ts = timer() try: token_count, vector_size = await embedding(chunks, embedding_model, task_parser_config, progress_callback) + except TaskCanceledException: + raise except Exception as e: error_message = "Generate embedding error:{}".format(str(e)) progress_callback(-1, error_message) @@ -1130,13 +1136,17 @@ async def do_handle_task(task): async def _maybe_insert_chunks(_chunks): if has_canceled(task_id): - return True + progress_callback(-1, msg="Task has been canceled.") + return False insert_result = await insert_chunks(task_id, task_tenant_id, task_dataset_id, _chunks, progress_callback) return bool(insert_result) try: if not await _maybe_insert_chunks(chunks): return + if has_canceled(task_id): + progress_callback(-1, msg="Task has been canceled.") + return logging.info( "Indexing doc({}), page({}-{}), chunks({}), elapsed: {:.2f}".format( @@ -1205,6 +1215,12 @@ async def handle_task(): DONE_TASKS += 1 CURRENT_TASKS.pop(task_id, None) logging.info(f"handle_task done for task {json.dumps(task)}") + except TaskCanceledException as e: + DONE_TASKS += 1 + CURRENT_TASKS.pop(task_id, None) + logging.info( + f"handle_task canceled for task {task_id}: {getattr(e, 'msg', str(e))}" + ) except Exception as e: FAILED_TASKS += 1 CURRENT_TASKS.pop(task_id, None)