mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
Feat: add initial support for Mindmap (#10310)
### What problem does this PR solve? Add initial support for Mindmap. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>
This commit is contained in:
@ -522,12 +522,13 @@ def run_graphrag():
|
|||||||
return get_error_data_result(message="Invalid Knowledgebase ID")
|
return get_error_data_result(message="Invalid Knowledgebase ID")
|
||||||
|
|
||||||
task_id = kb.graphrag_task_id
|
task_id = kb.graphrag_task_id
|
||||||
ok, task = TaskService.get_by_id(task_id)
|
if task_id:
|
||||||
if not ok:
|
ok, task = TaskService.get_by_id(task_id)
|
||||||
logging.warning(f"A valid GraphRAG task id is expected for kb {kb_id}")
|
if not ok:
|
||||||
|
logging.warning(f"A valid GraphRAG task id is expected for kb {kb_id}")
|
||||||
|
|
||||||
if task and task.progress not in [-1, 1]:
|
if task and task.progress not in [-1, 1]:
|
||||||
return get_error_data_result(message=f"Task {task_id} in progress with status {task.progress}. A Graph Task is already running.")
|
return get_error_data_result(message=f"Task {task_id} in progress with status {task.progress}. A Graph Task is already running.")
|
||||||
|
|
||||||
documents, _ = DocumentService.get_by_kb_id(
|
documents, _ = DocumentService.get_by_kb_id(
|
||||||
kb_id=kb_id,
|
kb_id=kb_id,
|
||||||
@ -567,7 +568,7 @@ def trace_graphrag():
|
|||||||
|
|
||||||
task_id = kb.graphrag_task_id
|
task_id = kb.graphrag_task_id
|
||||||
if not task_id:
|
if not task_id:
|
||||||
return get_error_data_result(message="GraphRAG Task ID Not Found")
|
return get_json_result(data={})
|
||||||
|
|
||||||
ok, task = TaskService.get_by_id(task_id)
|
ok, task = TaskService.get_by_id(task_id)
|
||||||
if not ok:
|
if not ok:
|
||||||
@ -590,12 +591,13 @@ def run_raptor():
|
|||||||
return get_error_data_result(message="Invalid Knowledgebase ID")
|
return get_error_data_result(message="Invalid Knowledgebase ID")
|
||||||
|
|
||||||
task_id = kb.raptor_task_id
|
task_id = kb.raptor_task_id
|
||||||
ok, task = TaskService.get_by_id(task_id)
|
if task_id:
|
||||||
if not ok:
|
ok, task = TaskService.get_by_id(task_id)
|
||||||
logging.warning(f"A valid RAPTOR task id is expected for kb {kb_id}")
|
if not ok:
|
||||||
|
logging.warning(f"A valid RAPTOR task id is expected for kb {kb_id}")
|
||||||
|
|
||||||
if task and task.progress not in [-1, 1]:
|
if task and task.progress not in [-1, 1]:
|
||||||
return get_error_data_result(message=f"Task {task_id} in progress with status {task.progress}. A RAPTOR Task is already running.")
|
return get_error_data_result(message=f"Task {task_id} in progress with status {task.progress}. A RAPTOR Task is already running.")
|
||||||
|
|
||||||
documents, _ = DocumentService.get_by_kb_id(
|
documents, _ = DocumentService.get_by_kb_id(
|
||||||
kb_id=kb_id,
|
kb_id=kb_id,
|
||||||
@ -635,10 +637,79 @@ def trace_raptor():
|
|||||||
|
|
||||||
task_id = kb.raptor_task_id
|
task_id = kb.raptor_task_id
|
||||||
if not task_id:
|
if not task_id:
|
||||||
return get_error_data_result(message="RAPTOR Task ID Not Found")
|
return get_json_result(data={})
|
||||||
|
|
||||||
ok, task = TaskService.get_by_id(task_id)
|
ok, task = TaskService.get_by_id(task_id)
|
||||||
if not ok:
|
if not ok:
|
||||||
return get_error_data_result(message="RAPTOR Task Not Found or Error Occurred")
|
return get_error_data_result(message="RAPTOR Task Not Found or Error Occurred")
|
||||||
|
|
||||||
return get_json_result(data=task.to_dict())
|
return get_json_result(data=task.to_dict())
|
||||||
|
|
||||||
|
|
||||||
|
@manager.route("/run_mindmap", methods=["POST"]) # noqa: F821
|
||||||
|
@login_required
|
||||||
|
def run_mindmap():
|
||||||
|
req = request.json
|
||||||
|
|
||||||
|
kb_id = req.get("kb_id", "")
|
||||||
|
if not kb_id:
|
||||||
|
return get_error_data_result(message='Lack of "KB ID"')
|
||||||
|
|
||||||
|
ok, kb = KnowledgebaseService.get_by_id(kb_id)
|
||||||
|
if not ok:
|
||||||
|
return get_error_data_result(message="Invalid Knowledgebase ID")
|
||||||
|
|
||||||
|
task_id = kb.mindmap_task_id
|
||||||
|
if task_id:
|
||||||
|
ok, task = TaskService.get_by_id(task_id)
|
||||||
|
if not ok:
|
||||||
|
logging.warning(f"A valid Mindmap task id is expected for kb {kb_id}")
|
||||||
|
|
||||||
|
if task and task.progress not in [-1, 1]:
|
||||||
|
return get_error_data_result(message=f"Task {task_id} in progress with status {task.progress}. A Mindmap Task is already running.")
|
||||||
|
|
||||||
|
documents, _ = DocumentService.get_by_kb_id(
|
||||||
|
kb_id=kb_id,
|
||||||
|
page_number=0,
|
||||||
|
items_per_page=0,
|
||||||
|
orderby="create_time",
|
||||||
|
desc=False,
|
||||||
|
keywords="",
|
||||||
|
run_status=[],
|
||||||
|
types=[],
|
||||||
|
suffix=[],
|
||||||
|
)
|
||||||
|
if not documents:
|
||||||
|
return get_error_data_result(message=f"No documents in Knowledgebase {kb_id}")
|
||||||
|
|
||||||
|
sample_document = documents[0]
|
||||||
|
document_ids = [document["id"] for document in documents]
|
||||||
|
|
||||||
|
task_id = queue_raptor_o_graphrag_tasks(doc=sample_document, ty="mindmap", priority=0, fake_doc_id=GRAPH_RAPTOR_FAKE_DOC_ID, doc_ids=list(document_ids))
|
||||||
|
|
||||||
|
if not KnowledgebaseService.update_by_id(kb.id, {"mindmap_task_id": task_id}):
|
||||||
|
logging.warning(f"Cannot save mindmap_task_id for kb {kb_id}")
|
||||||
|
|
||||||
|
return get_json_result(data={"mindmap_task_id": task_id})
|
||||||
|
|
||||||
|
|
||||||
|
@manager.route("/trace_mindmap", methods=["GET"]) # noqa: F821
|
||||||
|
@login_required
|
||||||
|
def trace_mindmap():
|
||||||
|
kb_id = request.args.get("kb_id", "")
|
||||||
|
if not kb_id:
|
||||||
|
return get_error_data_result(message='Lack of "KB ID"')
|
||||||
|
|
||||||
|
ok, kb = KnowledgebaseService.get_by_id(kb_id)
|
||||||
|
if not ok:
|
||||||
|
return get_error_data_result(message="Invalid Knowledgebase ID")
|
||||||
|
|
||||||
|
task_id = kb.mindmap_task_id
|
||||||
|
if not task_id:
|
||||||
|
return get_json_result(data={})
|
||||||
|
|
||||||
|
ok, task = TaskService.get_by_id(task_id)
|
||||||
|
if not ok:
|
||||||
|
return get_error_data_result(message="Mindmap Task Not Found or Error Occurred")
|
||||||
|
|
||||||
|
return get_json_result(data=task.to_dict())
|
||||||
|
|||||||
@ -127,9 +127,10 @@ class PipelineTaskType(StrEnum):
|
|||||||
DOWNLOAD = "Download"
|
DOWNLOAD = "Download"
|
||||||
RAPTOR = "RAPTOR"
|
RAPTOR = "RAPTOR"
|
||||||
GRAPH_RAG = "GraphRAG"
|
GRAPH_RAG = "GraphRAG"
|
||||||
|
MINDMAP = "Mindmap"
|
||||||
|
|
||||||
|
|
||||||
VALID_PIPELINE_TASK_TYPES = {PipelineTaskType.PARSE, PipelineTaskType.DOWNLOAD, PipelineTaskType.RAPTOR, PipelineTaskType.GRAPH_RAG}
|
VALID_PIPELINE_TASK_TYPES = {PipelineTaskType.PARSE, PipelineTaskType.DOWNLOAD, PipelineTaskType.RAPTOR, PipelineTaskType.GRAPH_RAG, PipelineTaskType.MINDMAP}
|
||||||
|
|
||||||
|
|
||||||
KNOWLEDGEBASE_FOLDER_NAME=".knowledgebase"
|
KNOWLEDGEBASE_FOLDER_NAME=".knowledgebase"
|
||||||
|
|||||||
@ -651,7 +651,11 @@ class Knowledgebase(DataBaseModel):
|
|||||||
pagerank = IntegerField(default=0, index=False)
|
pagerank = IntegerField(default=0, index=False)
|
||||||
|
|
||||||
graphrag_task_id = CharField(max_length=32, null=True, help_text="Graph RAG task ID", index=True)
|
graphrag_task_id = CharField(max_length=32, null=True, help_text="Graph RAG task ID", index=True)
|
||||||
|
graphrag_task_finish_at = DateTimeField(null=True)
|
||||||
raptor_task_id = CharField(max_length=32, null=True, help_text="RAPTOR task ID", index=True)
|
raptor_task_id = CharField(max_length=32, null=True, help_text="RAPTOR task ID", index=True)
|
||||||
|
raptor_task_finish_at = DateTimeField(null=True)
|
||||||
|
mindmap_task_id = CharField(max_length=32, null=True, help_text="Mindmap task ID", index=True)
|
||||||
|
mindmap_task_finish_at = DateTimeField(null=True)
|
||||||
|
|
||||||
status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True)
|
status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True)
|
||||||
|
|
||||||
@ -1084,4 +1088,20 @@ def migrate_db():
|
|||||||
migrate(migrator.add_column("knowledgebase", "raptor_task_id", CharField(max_length=32, null=True, help_text="RAPTOR task ID", index=True)))
|
migrate(migrator.add_column("knowledgebase", "raptor_task_id", CharField(max_length=32, null=True, help_text="RAPTOR task ID", index=True)))
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
try:
|
||||||
|
migrate(migrator.add_column("knowledgebase", "graphrag_task_finish_at", DateTimeField(null=True)))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
migrate(migrator.add_column("knowledgebase", "raptor_task_finish_at", CharField(null=True)))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
migrate(migrator.add_column("knowledgebase", "mindmap_task_id", CharField(max_length=32, null=True, help_text="Mindmap task ID", index=True)))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
migrate(migrator.add_column("knowledgebase", "mindmap_task_finish_at", CharField(null=True)))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
logging.disable(logging.NOTSET)
|
logging.disable(logging.NOTSET)
|
||||||
|
|||||||
@ -636,8 +636,6 @@ class DocumentService(CommonService):
|
|||||||
prg = 0
|
prg = 0
|
||||||
finished = True
|
finished = True
|
||||||
bad = 0
|
bad = 0
|
||||||
has_raptor = False
|
|
||||||
has_graphrag = False
|
|
||||||
e, doc = DocumentService.get_by_id(d["id"])
|
e, doc = DocumentService.get_by_id(d["id"])
|
||||||
status = doc.run # TaskStatus.RUNNING.value
|
status = doc.run # TaskStatus.RUNNING.value
|
||||||
priority = 0
|
priority = 0
|
||||||
@ -649,25 +647,14 @@ class DocumentService(CommonService):
|
|||||||
prg += t.progress if t.progress >= 0 else 0
|
prg += t.progress if t.progress >= 0 else 0
|
||||||
if t.progress_msg.strip():
|
if t.progress_msg.strip():
|
||||||
msg.append(t.progress_msg)
|
msg.append(t.progress_msg)
|
||||||
if t.task_type == "raptor":
|
|
||||||
has_raptor = True
|
|
||||||
elif t.task_type == "graphrag":
|
|
||||||
has_graphrag = True
|
|
||||||
priority = max(priority, t.priority)
|
priority = max(priority, t.priority)
|
||||||
prg /= len(tsks)
|
prg /= len(tsks)
|
||||||
if finished and bad:
|
if finished and bad:
|
||||||
prg = -1
|
prg = -1
|
||||||
status = TaskStatus.FAIL.value
|
status = TaskStatus.FAIL.value
|
||||||
elif finished:
|
elif finished:
|
||||||
if (d["parser_config"].get("raptor") or {}).get("use_raptor") and not has_raptor:
|
prg = 1
|
||||||
queue_raptor_o_graphrag_tasks(d, "raptor", priority)
|
status = TaskStatus.DONE.value
|
||||||
prg = 0.98 * len(tsks) / (len(tsks) + 1)
|
|
||||||
elif (d["parser_config"].get("graphrag") or {}).get("use_graphrag") and not has_graphrag:
|
|
||||||
queue_raptor_o_graphrag_tasks(d, "graphrag", priority)
|
|
||||||
prg = 0.98 * len(tsks) / (len(tsks) + 1)
|
|
||||||
else:
|
|
||||||
prg = 1
|
|
||||||
status = TaskStatus.DONE.value
|
|
||||||
|
|
||||||
msg = "\n".join(sorted(msg))
|
msg = "\n".join(sorted(msg))
|
||||||
info = {
|
info = {
|
||||||
@ -679,7 +666,7 @@ class DocumentService(CommonService):
|
|||||||
info["progress"] = prg
|
info["progress"] = prg
|
||||||
if msg:
|
if msg:
|
||||||
info["progress_msg"] = msg
|
info["progress_msg"] = msg
|
||||||
if msg.endswith("created task graphrag") or msg.endswith("created task raptor"):
|
if msg.endswith("created task graphrag") or msg.endswith("created task raptor") or msg.endswith("created task mindmap"):
|
||||||
info["progress_msg"] += "\n%d tasks are ahead in the queue..."%get_queue_length(priority)
|
info["progress_msg"] += "\n%d tasks are ahead in the queue..."%get_queue_length(priority)
|
||||||
else:
|
else:
|
||||||
info["progress_msg"] = "%d tasks are ahead in the queue..."%get_queue_length(priority)
|
info["progress_msg"] = "%d tasks are ahead in the queue..."%get_queue_length(priority)
|
||||||
@ -770,7 +757,8 @@ def queue_raptor_o_graphrag_tasks(doc, ty, priority, fake_doc_id="", doc_ids=[])
|
|||||||
"from_page": 100000000,
|
"from_page": 100000000,
|
||||||
"to_page": 100000000,
|
"to_page": 100000000,
|
||||||
"task_type": ty,
|
"task_type": ty,
|
||||||
"progress_msg": datetime.now().strftime("%H:%M:%S") + " created task " + ty
|
"progress_msg": datetime.now().strftime("%H:%M:%S") + " created task " + ty,
|
||||||
|
"begin_at": datetime.now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
task = new_task()
|
task = new_task()
|
||||||
@ -780,7 +768,7 @@ def queue_raptor_o_graphrag_tasks(doc, ty, priority, fake_doc_id="", doc_ids=[])
|
|||||||
task["digest"] = hasher.hexdigest()
|
task["digest"] = hasher.hexdigest()
|
||||||
bulk_insert_into_db(Task, [task], True)
|
bulk_insert_into_db(Task, [task], True)
|
||||||
|
|
||||||
if ty in ["graphrag", "raptor"]:
|
if ty in ["graphrag", "raptor", "mindmap"]:
|
||||||
task["doc_ids"] = doc_ids
|
task["doc_ids"] = doc_ids
|
||||||
DocumentService.begin2parse(doc["id"])
|
DocumentService.begin2parse(doc["id"])
|
||||||
assert REDIS_CONN.queue_product(get_svr_queue_name(priority), message=task), "Can't access Redis. Please check the Redis' status."
|
assert REDIS_CONN.queue_product(get_svr_queue_name(priority), message=task), "Can't access Redis. Please check the Redis' status."
|
||||||
|
|||||||
@ -230,6 +230,12 @@ class KnowledgebaseService(CommonService):
|
|||||||
UserCanvas.avatar.alias("pipeline_avatar"),
|
UserCanvas.avatar.alias("pipeline_avatar"),
|
||||||
cls.model.parser_config,
|
cls.model.parser_config,
|
||||||
cls.model.pagerank,
|
cls.model.pagerank,
|
||||||
|
cls.model.graphrag_task_id,
|
||||||
|
cls.model.graphrag_task_finish_at,
|
||||||
|
cls.model.raptor_task_id,
|
||||||
|
cls.model.raptor_task_finish_at,
|
||||||
|
cls.model.mindmap_task_id,
|
||||||
|
cls.model.mindmap_task_finish_at,
|
||||||
cls.model.create_time,
|
cls.model.create_time,
|
||||||
cls.model.update_time
|
cls.model.update_time
|
||||||
]
|
]
|
||||||
|
|||||||
@ -15,12 +15,12 @@
|
|||||||
#
|
#
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from peewee import fn
|
from peewee import fn
|
||||||
|
|
||||||
from api.db import VALID_PIPELINE_TASK_TYPES
|
from api.db import VALID_PIPELINE_TASK_TYPES, PipelineTaskType
|
||||||
from api.db.db_models import DB, PipelineOperationLog, Document
|
from api.db.db_models import DB, Document, PipelineOperationLog
|
||||||
from api.db.services.canvas_service import UserCanvasService
|
from api.db.services.canvas_service import UserCanvasService
|
||||||
from api.db.services.common_service import CommonService
|
from api.db.services.common_service import CommonService
|
||||||
from api.db.services.document_service import DocumentService
|
from api.db.services.document_service import DocumentService
|
||||||
@ -120,6 +120,24 @@ class PipelineOperationLogService(CommonService):
|
|||||||
if task_type not in VALID_PIPELINE_TASK_TYPES:
|
if task_type not in VALID_PIPELINE_TASK_TYPES:
|
||||||
raise ValueError(f"Invalid task type: {task_type}")
|
raise ValueError(f"Invalid task type: {task_type}")
|
||||||
|
|
||||||
|
if task_type in [PipelineTaskType.GRAPH_RAG, PipelineTaskType.RAPTOR, PipelineTaskType.MINDMAP]:
|
||||||
|
finish_at = document.process_begin_at + timedelta(seconds=document.process_duration)
|
||||||
|
if task_type == PipelineTaskType.GRAPH_RAG:
|
||||||
|
KnowledgebaseService.update_by_id(
|
||||||
|
document.kb_id,
|
||||||
|
{"graphrag_task_finish_at": finish_at},
|
||||||
|
)
|
||||||
|
elif task_type == PipelineTaskType.RAPTOR:
|
||||||
|
KnowledgebaseService.update_by_id(
|
||||||
|
document.kb_id,
|
||||||
|
{"raptor_task_finish_at": finish_at},
|
||||||
|
)
|
||||||
|
elif task_type == PipelineTaskType.MINDMAP:
|
||||||
|
KnowledgebaseService.update_by_id(
|
||||||
|
document.kb_id,
|
||||||
|
{"mindmap_task_finish_at": finish_at},
|
||||||
|
)
|
||||||
|
|
||||||
log = dict(
|
log = dict(
|
||||||
id=get_uuid(),
|
id=get_uuid(),
|
||||||
document_id=document_id, # GRAPH_RAPTOR_FAKE_DOC_ID or real document_id
|
document_id=document_id, # GRAPH_RAPTOR_FAKE_DOC_ID or real document_id
|
||||||
@ -189,16 +207,17 @@ class PipelineOperationLogService(CommonService):
|
|||||||
@classmethod
|
@classmethod
|
||||||
@DB.connection_context()
|
@DB.connection_context()
|
||||||
def get_documents_info(cls, id):
|
def get_documents_info(cls, id):
|
||||||
fields = [
|
fields = [Document.id, Document.name, Document.progress]
|
||||||
Document.id,
|
return (
|
||||||
Document.name,
|
cls.model.select(*fields)
|
||||||
Document.progress
|
.join(Document, on=(cls.model.document_id == Document.id))
|
||||||
]
|
.where(
|
||||||
return cls.model.select(*fields).join(Document, on=(cls.model.document_id == Document.id)).where(
|
cls.model.id == id,
|
||||||
cls.model.id == id,
|
Document.progress > 0,
|
||||||
Document.progress > 0,
|
Document.progress < 1,
|
||||||
Document.progress < 1
|
)
|
||||||
).dicts()
|
.dicts()
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@DB.connection_context()
|
@DB.connection_context()
|
||||||
@ -223,4 +242,3 @@ class PipelineOperationLogService(CommonService):
|
|||||||
logs = logs.paginate(page_number, items_per_page)
|
logs = logs.paginate(page_number, items_per_page)
|
||||||
|
|
||||||
return list(logs.dicts()), count
|
return list(logs.dicts()), count
|
||||||
|
|
||||||
|
|||||||
@ -298,21 +298,23 @@ class TaskService(CommonService):
|
|||||||
((prog == -1) | (prog > cls.model.progress))
|
((prog == -1) | (prog > cls.model.progress))
|
||||||
)
|
)
|
||||||
).execute()
|
).execute()
|
||||||
return
|
else:
|
||||||
|
with DB.lock("update_progress", -1):
|
||||||
|
if info["progress_msg"]:
|
||||||
|
progress_msg = trim_header_by_lines(task.progress_msg + "\n" + info["progress_msg"], 3000)
|
||||||
|
cls.model.update(progress_msg=progress_msg).where(cls.model.id == id).execute()
|
||||||
|
if "progress" in info:
|
||||||
|
prog = info["progress"]
|
||||||
|
cls.model.update(progress=prog).where(
|
||||||
|
(cls.model.id == id) &
|
||||||
|
(
|
||||||
|
(cls.model.progress != -1) &
|
||||||
|
((prog == -1) | (prog > cls.model.progress))
|
||||||
|
)
|
||||||
|
).execute()
|
||||||
|
|
||||||
with DB.lock("update_progress", -1):
|
process_duration = (datetime.now() - task.begin_at).total_seconds()
|
||||||
if info["progress_msg"]:
|
cls.model.update(process_duration=process_duration).where(cls.model.id == id).execute()
|
||||||
progress_msg = trim_header_by_lines(task.progress_msg + "\n" + info["progress_msg"], 3000)
|
|
||||||
cls.model.update(progress_msg=progress_msg).where(cls.model.id == id).execute()
|
|
||||||
if "progress" in info:
|
|
||||||
prog = info["progress"]
|
|
||||||
cls.model.update(progress=prog).where(
|
|
||||||
(cls.model.id == id) &
|
|
||||||
(
|
|
||||||
(cls.model.progress != -1) &
|
|
||||||
((prog == -1) | (prog > cls.model.progress))
|
|
||||||
)
|
|
||||||
).execute()
|
|
||||||
|
|
||||||
|
|
||||||
def queue_tasks(doc: dict, bucket: str, name: str, priority: int):
|
def queue_tasks(doc: dict, bucket: str, name: str, priority: int):
|
||||||
@ -336,7 +338,14 @@ def queue_tasks(doc: dict, bucket: str, name: str, priority: int):
|
|||||||
- Previous task chunks may be reused if available
|
- Previous task chunks may be reused if available
|
||||||
"""
|
"""
|
||||||
def new_task():
|
def new_task():
|
||||||
return {"id": get_uuid(), "doc_id": doc["id"], "progress": 0.0, "from_page": 0, "to_page": 100000000}
|
return {
|
||||||
|
"id": get_uuid(),
|
||||||
|
"doc_id": doc["id"],
|
||||||
|
"progress": 0.0,
|
||||||
|
"from_page": 0,
|
||||||
|
"to_page": 100000000,
|
||||||
|
"begin_at": datetime.now(),
|
||||||
|
}
|
||||||
|
|
||||||
parse_task_array = []
|
parse_task_array = []
|
||||||
|
|
||||||
@ -487,6 +496,7 @@ def queue_dataflow(tenant_id:str, flow_id:str, task_id:str, doc_id:str=CANVAS_DE
|
|||||||
to_page=100000000,
|
to_page=100000000,
|
||||||
task_type="dataflow" if not rerun else "dataflow_rerun",
|
task_type="dataflow" if not rerun else "dataflow_rerun",
|
||||||
priority=priority,
|
priority=priority,
|
||||||
|
begin_at=datetime.now(),
|
||||||
)
|
)
|
||||||
if doc_id not in [CANVAS_DEBUG_DOC_ID, GRAPH_RAPTOR_FAKE_DOC_ID]:
|
if doc_id not in [CANVAS_DEBUG_DOC_ID, GRAPH_RAPTOR_FAKE_DOC_ID]:
|
||||||
TaskService.model.delete().where(TaskService.model.doc_id == doc_id).execute()
|
TaskService.model.delete().where(TaskService.model.doc_id == doc_id).execute()
|
||||||
|
|||||||
@ -93,6 +93,7 @@ TASK_TYPE_TO_PIPELINE_TASK_TYPE = {
|
|||||||
"dataflow" : PipelineTaskType.PARSE,
|
"dataflow" : PipelineTaskType.PARSE,
|
||||||
"raptor": PipelineTaskType.RAPTOR,
|
"raptor": PipelineTaskType.RAPTOR,
|
||||||
"graphrag": PipelineTaskType.GRAPH_RAG,
|
"graphrag": PipelineTaskType.GRAPH_RAG,
|
||||||
|
"mindmap": PipelineTaskType.MINDMAP,
|
||||||
}
|
}
|
||||||
|
|
||||||
UNACKED_ITERATOR = None
|
UNACKED_ITERATOR = None
|
||||||
@ -227,7 +228,7 @@ async def collect():
|
|||||||
canceled = False
|
canceled = False
|
||||||
if msg.get("doc_id", "") in [GRAPH_RAPTOR_FAKE_DOC_ID, CANVAS_DEBUG_DOC_ID]:
|
if msg.get("doc_id", "") in [GRAPH_RAPTOR_FAKE_DOC_ID, CANVAS_DEBUG_DOC_ID]:
|
||||||
task = msg
|
task = msg
|
||||||
if task["task_type"] in ["graphrag", "raptor"] and msg.get("doc_ids", []):
|
if task["task_type"] in ["graphrag", "raptor", "mindmap"] and msg.get("doc_ids", []):
|
||||||
task = TaskService.get_task(msg["id"], msg["doc_ids"])
|
task = TaskService.get_task(msg["id"], msg["doc_ids"])
|
||||||
task["doc_ids"] = msg["doc_ids"]
|
task["doc_ids"] = msg["doc_ids"]
|
||||||
else:
|
else:
|
||||||
@ -822,6 +823,10 @@ async def do_handle_task(task):
|
|||||||
logging.info(f"GraphRAG task result for task {task}:\n{result}")
|
logging.info(f"GraphRAG task result for task {task}:\n{result}")
|
||||||
progress_callback(prog=1.0, msg="Knowledge Graph done ({:.2f}s)".format(timer() - start_ts))
|
progress_callback(prog=1.0, msg="Knowledge Graph done ({:.2f}s)".format(timer() - start_ts))
|
||||||
return
|
return
|
||||||
|
elif task_type == "mindmap":
|
||||||
|
progress_callback(1, "place holder")
|
||||||
|
pass
|
||||||
|
return
|
||||||
else:
|
else:
|
||||||
# Standard chunking methods
|
# Standard chunking methods
|
||||||
start_ts = timer()
|
start_ts = timer()
|
||||||
@ -898,7 +903,7 @@ async def handle_task():
|
|||||||
logging.exception(f"handle_task got exception for task {json.dumps(task)}")
|
logging.exception(f"handle_task got exception for task {json.dumps(task)}")
|
||||||
finally:
|
finally:
|
||||||
task_document_ids = []
|
task_document_ids = []
|
||||||
if task_type in ["graphrag", "raptor"]:
|
if task_type in ["graphrag", "raptor", "mindmap"]:
|
||||||
task_document_ids = task["doc_ids"]
|
task_document_ids = task["doc_ids"]
|
||||||
if not task.get("dataflow_id", ""):
|
if not task.get("dataflow_id", ""):
|
||||||
PipelineOperationLogService.record_pipeline_operation(document_id=task["doc_id"], pipeline_id="", task_type=pipeline_task_type, fake_document_ids=task_document_ids)
|
PipelineOperationLogService.record_pipeline_operation(document_id=task["doc_id"], pipeline_id="", task_type=pipeline_task_type, fake_document_ids=task_document_ids)
|
||||||
|
|||||||
Reference in New Issue
Block a user