mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-01-27 21:56:35 +08:00
Fix: Cancel tasks before document or datasets deletion to prevent queue blocking (#12799)
### What problem does this PR solve?
When deleting the knowledge base, the records in the Document and
Knowledgebase tables are immediately deleted
But there are still a large number of pending task messages in the Redis
queue (asynchronous queue) if you did not click on stopping tasks before
deleting knowledge base.
TaskService.get_task() uses a JOIN query to associate three tables (Task
← Document ← Knowledgebase)
Since Document/Knowledgebase have been deleted, the JOIN returns an
empty result, even though the Task records still exist
task-executor considers the task does not exist ("collect task xxx is
unknown"), can only skip and warn
log:2026-01-23 16:43:21,716 WARNING 1190179 collect task
110fbf70f5bd11f0945a23b0930487df is unknown
2026-01-23 16:43:21,818 WARNING 1190179 collect task
11146bc4f5bd11f0945a23b0930487df is unknown
2026-01-23 16:43:21,918 WARNING 1190179 collect task
111c3336f5bd11f0945a23b0930487df is unknown
2026-01-23 16:43:22,021 WARNING 1190179 collect task
112471b8f5bd11f0945a23b0930487df is unknown
2026-01-23 16:43:26,719 WARNING 1190179 collect task
112e855ef5bd11f0945a23b0930487df is unknown
2026-01-23 16:43:26,734 WARNING 1190179 collect task
1134380af5bd11f0945a23b0930487df is unknown
2026-01-23 16:43:26,834 WARNING 1190179 collect task
1138cb2cf5bd11f0945a23b0930487df is unknown
As a consequence, a large number of such tasks occupy the queue
processing capacity, causing new tasks to queue and wait
<img width="1910" height="947"
alt="9a00f2e0-9112-4dbb-b357-7f66b8eb5acf"
src="https://github.com/user-attachments/assets/0e1227c2-a2df-4ef3-ba8f-e04c3f6ef0e1"
/>
Solution
Add logic to stop all ongoing tasks before deleting the knowledge base
and Tasks
### Type of change
- Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
@ -338,10 +338,17 @@ class DocumentService(CommonService):
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def remove_document(cls, doc, tenant_id):
|
||||
from api.db.services.task_service import TaskService
|
||||
from api.db.services.task_service import TaskService, cancel_all_task_of
|
||||
cls.clear_chunk_num(doc.id)
|
||||
|
||||
# Delete tasks first
|
||||
# Cancel all running tasks first Using preset function in task_service.py --- set cancel flag in Redis
|
||||
try:
|
||||
cancel_all_task_of(doc.id)
|
||||
logging.info(f"Cancelled all tasks for document {doc.id}")
|
||||
except Exception as e:
|
||||
logging.warning(f"Failed to cancel tasks for document {doc.id}: {e}")
|
||||
|
||||
# Delete tasks from database
|
||||
try:
|
||||
TaskService.filter_delete([Task.doc_id == doc.id])
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user