From 9b8971a9de11361e488253da75eea707e4598b4c Mon Sep 17 00:00:00 2001 From: buua436 Date: Mon, 8 Dec 2025 09:42:20 +0800 Subject: [PATCH] Fix:toc in pipeline (#11785) ### What problem does this PR solve? change: Fix toc in pipeline ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- rag/flow/extractor/extractor.py | 19 +++++++++++-------- rag/flow/splitter/splitter.py | 2 +- rag/svr/task_executor.py | 3 ++- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/rag/flow/extractor/extractor.py b/rag/flow/extractor/extractor.py index 45698b204..1b97fd1ee 100644 --- a/rag/flow/extractor/extractor.py +++ b/rag/flow/extractor/extractor.py @@ -15,9 +15,8 @@ import json import logging import random -from copy import deepcopy, copy +from copy import deepcopy -import trio import xxhash from agent.component.llm import LLMParam, LLM @@ -38,13 +37,13 @@ class ExtractorParam(ProcessParamBase, LLMParam): class Extractor(ProcessBase, LLM): component_name = "Extractor" - def _build_TOC(self, docs): - self.callback(message="Start to generate table of content ...") + async def _build_TOC(self, docs): + self.callback(0.2,message="Start to generate table of content ...") docs = sorted(docs, key=lambda d:( d.get("page_num_int", 0)[0] if isinstance(d.get("page_num_int", 0), list) else d.get("page_num_int", 0), d.get("top_int", 0)[0] if isinstance(d.get("top_int", 0), list) else d.get("top_int", 0) )) - toc: list[dict] = trio.run(run_toc_from_text, [d["text"] for d in docs], self.chat_mdl) + toc = await run_toc_from_text([d["text"] for d in docs], self.chat_mdl) logging.info("------------ T O C -------------\n"+json.dumps(toc, ensure_ascii=False, indent=' ')) ii = 0 while ii < len(toc): @@ -61,7 +60,8 @@ class Extractor(ProcessBase, LLM): ii += 1 if toc: - d = copy.deepcopy(docs[-1]) + d = deepcopy(docs[-1]) + d["doc_id"] = self._canvas._doc_id d["content_with_weight"] = json.dumps(toc, ensure_ascii=False) d["toc_kwd"] = "toc" d["available_int"] = 0 @@ -85,11 +85,14 @@ class Extractor(ProcessBase, LLM): if chunks: if self._param.field_name == "toc": - toc = self._build_TOC(chunks) + for ck in chunks: + ck["doc_id"] = self._canvas._doc_id + ck["id"] = xxhash.xxh64((ck["text"] + str(ck["doc_id"])).encode("utf-8")).hexdigest() + toc =await self._build_TOC(chunks) chunks.append(toc) self.set_output("chunks", chunks) return - + prog = 0 for i, ck in enumerate(chunks): args[chunks_key] = ck["text"] diff --git a/rag/flow/splitter/splitter.py b/rag/flow/splitter/splitter.py index c790790cb..1ef06839d 100644 --- a/rag/flow/splitter/splitter.py +++ b/rag/flow/splitter/splitter.py @@ -125,7 +125,7 @@ class Splitter(ProcessBase): { "text": RAGFlowPdfParser.remove_tag(c), "image": img, - "positions": [[pos[0][-1]+1, *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)] + "positions": [[pos[0][-1], *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)] } for c, img in zip(chunks, images) if c.strip() ] diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 8cf1bf290..b08aa7524 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -592,7 +592,8 @@ async def run_dataflow(task: dict): ck["docnm_kwd"] = task["name"] ck["create_time"] = str(datetime.now()).replace("T", " ")[:19] ck["create_timestamp_flt"] = datetime.now().timestamp() - ck["id"] = xxhash.xxh64((ck["text"] + str(ck["doc_id"])).encode("utf-8")).hexdigest() + if not ck.get("id"): + ck["id"] = xxhash.xxh64((ck["text"] + str(ck["doc_id"])).encode("utf-8")).hexdigest() if "questions" in ck: if "question_tks" not in ck: ck["question_kwd"] = ck["questions"].split("\n")