mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-02-06 18:45:08 +08:00
Fix: task cancel (#13034)
### What problem does this PR solve? Fix: task cancel #11745 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@ -617,7 +617,9 @@ async def run():
|
|||||||
return get_data_error_result(message="Document not found!")
|
return get_data_error_result(message="Document not found!")
|
||||||
|
|
||||||
if str(req["run"]) == TaskStatus.CANCEL.value:
|
if str(req["run"]) == TaskStatus.CANCEL.value:
|
||||||
if str(doc.run) == TaskStatus.RUNNING.value:
|
tasks = list(TaskService.query(doc_id=id))
|
||||||
|
has_unfinished_task = any((task.progress or 0) < 1 for task in tasks)
|
||||||
|
if str(doc.run) in [TaskStatus.RUNNING.value, TaskStatus.CANCEL.value] or has_unfinished_task:
|
||||||
cancel_all_task_of(id)
|
cancel_all_task_of(id)
|
||||||
else:
|
else:
|
||||||
return get_data_error_result(message="Cannot cancel a task that is not in RUNNING status")
|
return get_data_error_result(message="Cannot cancel a task that is not in RUNNING status")
|
||||||
|
|||||||
@ -165,6 +165,8 @@ def set_progress(task_id, from_page=0, to_page=-1, prog=None, msg="Processing...
|
|||||||
if cancel:
|
if cancel:
|
||||||
raise TaskCanceledException(msg)
|
raise TaskCanceledException(msg)
|
||||||
logging.info(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}")
|
logging.info(f"set_progress({task_id}), progress: {prog}, progress_msg: {msg}")
|
||||||
|
except TaskCanceledException:
|
||||||
|
raise
|
||||||
except DoesNotExist:
|
except DoesNotExist:
|
||||||
logging.warning(f"set_progress({task_id}) got exception DoesNotExist")
|
logging.warning(f"set_progress({task_id}) got exception DoesNotExist")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -693,6 +695,8 @@ async def run_dataflow(task: dict):
|
|||||||
for i, ck in enumerate(chunks):
|
for i, ck in enumerate(chunks):
|
||||||
v = vects[i].tolist()
|
v = vects[i].tolist()
|
||||||
ck["q_%d_vec" % len(v)] = v
|
ck["q_%d_vec" % len(v)] = v
|
||||||
|
except TaskCanceledException:
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
set_progress(task_id, prog=-1, msg=f"[ERROR]: {e}")
|
set_progress(task_id, prog=-1, msg=f"[ERROR]: {e}")
|
||||||
PipelineOperationLogService.create(document_id=doc_id, pipeline_id=dataflow_id,
|
PipelineOperationLogService.create(document_id=doc_id, pipeline_id=dataflow_id,
|
||||||
@ -1113,6 +1117,8 @@ async def do_handle_task(task):
|
|||||||
start_ts = timer()
|
start_ts = timer()
|
||||||
try:
|
try:
|
||||||
token_count, vector_size = await embedding(chunks, embedding_model, task_parser_config, progress_callback)
|
token_count, vector_size = await embedding(chunks, embedding_model, task_parser_config, progress_callback)
|
||||||
|
except TaskCanceledException:
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
error_message = "Generate embedding error:{}".format(str(e))
|
error_message = "Generate embedding error:{}".format(str(e))
|
||||||
progress_callback(-1, error_message)
|
progress_callback(-1, error_message)
|
||||||
@ -1130,13 +1136,17 @@ async def do_handle_task(task):
|
|||||||
|
|
||||||
async def _maybe_insert_chunks(_chunks):
|
async def _maybe_insert_chunks(_chunks):
|
||||||
if has_canceled(task_id):
|
if has_canceled(task_id):
|
||||||
return True
|
progress_callback(-1, msg="Task has been canceled.")
|
||||||
|
return False
|
||||||
insert_result = await insert_chunks(task_id, task_tenant_id, task_dataset_id, _chunks, progress_callback)
|
insert_result = await insert_chunks(task_id, task_tenant_id, task_dataset_id, _chunks, progress_callback)
|
||||||
return bool(insert_result)
|
return bool(insert_result)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not await _maybe_insert_chunks(chunks):
|
if not await _maybe_insert_chunks(chunks):
|
||||||
return
|
return
|
||||||
|
if has_canceled(task_id):
|
||||||
|
progress_callback(-1, msg="Task has been canceled.")
|
||||||
|
return
|
||||||
|
|
||||||
logging.info(
|
logging.info(
|
||||||
"Indexing doc({}), page({}-{}), chunks({}), elapsed: {:.2f}".format(
|
"Indexing doc({}), page({}-{}), chunks({}), elapsed: {:.2f}".format(
|
||||||
@ -1205,6 +1215,12 @@ async def handle_task():
|
|||||||
DONE_TASKS += 1
|
DONE_TASKS += 1
|
||||||
CURRENT_TASKS.pop(task_id, None)
|
CURRENT_TASKS.pop(task_id, None)
|
||||||
logging.info(f"handle_task done for task {json.dumps(task)}")
|
logging.info(f"handle_task done for task {json.dumps(task)}")
|
||||||
|
except TaskCanceledException as e:
|
||||||
|
DONE_TASKS += 1
|
||||||
|
CURRENT_TASKS.pop(task_id, None)
|
||||||
|
logging.info(
|
||||||
|
f"handle_task canceled for task {task_id}: {getattr(e, 'msg', str(e))}"
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
FAILED_TASKS += 1
|
FAILED_TASKS += 1
|
||||||
CURRENT_TASKS.pop(task_id, None)
|
CURRENT_TASKS.pop(task_id, None)
|
||||||
|
|||||||
Reference in New Issue
Block a user