mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-24 07:26:47 +08:00
Fix: fix task cancel (#12093)
### What problem does this PR solve? Fix: fix task cancel ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@ -1024,33 +1024,65 @@ async def do_handle_task(task):
|
||||
|
||||
chunk_count = len(set([chunk["id"] for chunk in chunks]))
|
||||
start_ts = timer()
|
||||
e = await insert_es(task_id, task_tenant_id, task_dataset_id, chunks, progress_callback)
|
||||
if not e:
|
||||
return
|
||||
|
||||
logging.info("Indexing doc({}), page({}-{}), chunks({}), elapsed: {:.2f}".format(task_document_name, task_from_page,
|
||||
task_to_page, len(chunks),
|
||||
timer() - start_ts))
|
||||
async def _maybe_insert_es(_chunks):
|
||||
if has_canceled(task_id):
|
||||
return True
|
||||
e = await insert_es(task_id, task_tenant_id, task_dataset_id, _chunks, progress_callback)
|
||||
return bool(e)
|
||||
|
||||
try:
|
||||
if not await _maybe_insert_es(chunks):
|
||||
return
|
||||
|
||||
DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0)
|
||||
logging.info(
|
||||
"Indexing doc({}), page({}-{}), chunks({}), elapsed: {:.2f}".format(
|
||||
task_document_name, task_from_page, task_to_page, len(chunks), timer() - start_ts
|
||||
)
|
||||
)
|
||||
|
||||
time_cost = timer() - start_ts
|
||||
progress_callback(msg="Indexing done ({:.2f}s).".format(time_cost))
|
||||
if toc_thread:
|
||||
d = toc_thread.result()
|
||||
if d:
|
||||
e = await insert_es(task_id, task_tenant_id, task_dataset_id, [d], progress_callback)
|
||||
if not e:
|
||||
return
|
||||
DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, 0, 1, 0)
|
||||
DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0)
|
||||
|
||||
task_time_cost = timer() - task_start_ts
|
||||
progress_callback(prog=1.0, msg="Task done ({:.2f}s)".format(task_time_cost))
|
||||
logging.info(
|
||||
"Chunk doc({}), page({}-{}), chunks({}), token({}), elapsed:{:.2f}".format(task_document_name, task_from_page,
|
||||
task_to_page, len(chunks),
|
||||
token_count, task_time_cost))
|
||||
progress_callback(msg="Indexing done ({:.2f}s).".format(timer() - start_ts))
|
||||
|
||||
if toc_thread:
|
||||
d = toc_thread.result()
|
||||
if d:
|
||||
if not await _maybe_insert_es([d]):
|
||||
return
|
||||
DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, 0, 1, 0)
|
||||
|
||||
if has_canceled(task_id):
|
||||
progress_callback(-1, msg="Task has been canceled.")
|
||||
return
|
||||
|
||||
task_time_cost = timer() - task_start_ts
|
||||
progress_callback(prog=1.0, msg="Task done ({:.2f}s)".format(task_time_cost))
|
||||
logging.info(
|
||||
"Chunk doc({}), page({}-{}), chunks({}), token({}), elapsed:{:.2f}".format(
|
||||
task_document_name, task_from_page, task_to_page, len(chunks), token_count, task_time_cost
|
||||
)
|
||||
)
|
||||
|
||||
finally:
|
||||
if has_canceled(task_id):
|
||||
try:
|
||||
exists = await asyncio.to_thread(
|
||||
settings.docStoreConn.indexExist,
|
||||
search.index_name(task_tenant_id),
|
||||
task_dataset_id,
|
||||
)
|
||||
if exists:
|
||||
await asyncio.to_thread(
|
||||
settings.docStoreConn.delete,
|
||||
{"doc_id": task_doc_id},
|
||||
search.index_name(task_tenant_id),
|
||||
task_dataset_id,
|
||||
)
|
||||
except Exception:
|
||||
logging.exception(
|
||||
f"Remove doc({task_doc_id}) from docStore failed when task({task_id}) canceled."
|
||||
)
|
||||
|
||||
async def handle_task():
|
||||
|
||||
|
||||
Reference in New Issue
Block a user