Feat: add foundational support for RAPTOR dataset pipeline logs (#10277)

### What problem does this PR solve?

Add foundational support for RAPTOR dataset pipeline logs.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Yongteng Lei
2025-09-25 16:46:24 +08:00
committed by GitHub
parent a1147ce609
commit c1151519a0
4 changed files with 161 additions and 19 deletions

View File

@ -224,7 +224,7 @@ async def collect():
canceled = False
if msg.get("doc_id", "") == GRAPH_RAPTOR_FAKE_DOC_ID:
task = msg
if task["task_type"] == "graphrag" and msg.get("doc_ids", []):
if task["task_type"] in ["graphrag", "raptor"] and msg.get("doc_ids", []):
print(f"hack {msg['doc_ids']=}=",flush=True)
task = TaskService.get_task(msg["id"], msg["doc_ids"])
task["doc_ids"] = msg["doc_ids"]
@ -636,6 +636,52 @@ async def run_raptor(row, chat_mdl, embd_mdl, vector_size, callback=None):
return res, tk_count
@timeout(3600)
async def run_raptor_for_kb(row, chat_mdl, embd_mdl, vector_size, callback=None, doc_ids=[]):
fake_doc_id = GRAPH_RAPTOR_FAKE_DOC_ID
chunks = []
vctr_nm = "q_%d_vec"%vector_size
for doc_id in doc_ids:
for d in settings.retrievaler.chunk_list(doc_id, row["tenant_id"], [str(row["kb_id"])],
fields=["content_with_weight", vctr_nm],
sort_by_position=True):
chunks.append((d["content_with_weight"], np.array(d[vctr_nm])))
raptor = Raptor(
row["parser_config"]["raptor"].get("max_cluster", 64),
chat_mdl,
embd_mdl,
row["parser_config"]["raptor"]["prompt"],
row["parser_config"]["raptor"]["max_token"],
row["parser_config"]["raptor"]["threshold"]
)
original_length = len(chunks)
chunks = await raptor(chunks, row["parser_config"]["raptor"]["random_seed"], callback)
doc = {
"doc_id": fake_doc_id,
"kb_id": [str(row["kb_id"])],
"docnm_kwd": row["name"],
"title_tks": rag_tokenizer.tokenize(row["name"])
}
if row["pagerank"]:
doc[PAGERANK_FLD] = int(row["pagerank"])
res = []
tk_count = 0
for content, vctr in chunks[original_length:]:
d = copy.deepcopy(doc)
d["id"] = xxhash.xxh64((content + str(fake_doc_id)).encode("utf-8")).hexdigest()
d["create_time"] = str(datetime.now()).replace("T", " ")[:19]
d["create_timestamp_flt"] = datetime.now().timestamp()
d[vctr_nm] = vctr.tolist()
d["content_with_weight"] = content
d["content_ltks"] = rag_tokenizer.tokenize(content)
d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])
res.append(d)
tk_count += num_tokens_from_string(content)
return res, tk_count
async def delete_image(kb_id, chunk_id):
try:
async with minio_limiter:
@ -731,7 +777,15 @@ async def do_handle_task(task):
chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language)
# run RAPTOR
async with kg_limiter:
chunks, token_count = await run_raptor(task, chat_model, embedding_model, vector_size, progress_callback)
# chunks, token_count = await run_raptor(task, chat_model, embedding_model, vector_size, progress_callback)
chunks, token_count = await run_raptor_for_kb(
row=task,
chat_mdl=chat_model,
embd_mdl=embedding_model,
vector_size=vector_size,
callback=progress_callback,
doc_ids=task.get("doc_ids", []),
)
# Either using graphrag or Standard chunking methods
elif task_type == "graphrag":
if not task_parser_config.get("graphrag", {}).get("use_graphrag", False):
@ -834,7 +888,7 @@ async def handle_task():
logging.exception(f"handle_task got exception for task {json.dumps(task)}")
finally:
task_document_ids = []
if task_type in ["graphrag"]:
if task_type in ["graphrag", "raptor"]:
task_document_ids = task["doc_ids"]
if task["doc_id"] != CANVAS_DEBUG_DOC_ID:
PipelineOperationLogService.record_pipeline_operation(document_id=task["doc_id"], pipeline_id=task.get("dataflow_id", "") or "", task_type=pipeline_task_type, fake_document_ids=task_document_ids)