Fix: pipeline debug... (#10206)

### What problem does this PR solve?

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Kevin Hu
2025-09-24 11:12:08 +08:00
committed by GitHub
parent 8f465525f7
commit 5715ca6b74
12 changed files with 71 additions and 162 deletions

View File

@ -173,6 +173,6 @@ class HierarchicalMerger(ProcessBase):
]
async with trio.open_nursery() as nursery:
for d in cks:
nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), "_image_temps", get_uuid())
nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), get_uuid())
self.callback(1, "Done.")

View File

@ -59,11 +59,8 @@ class ParserParam(ProcessParamBase):
"image": [
"text"
],
"email": [
"text",
"json"
],
"text": [
"email": ["text", "json"],
"text&markdown": [
"text",
"json"
],
@ -102,7 +99,6 @@ class ParserParam(ProcessParamBase):
"output_format": "json",
},
"slides": {
"parse_method": "presentation",
"suffix": [
"pptx",
],
@ -122,12 +118,6 @@ class ParserParam(ProcessParamBase):
"fields": ["from", "to", "cc", "bcc", "date", "subject", "body", "attachments", "metadata"],
"output_format": "json",
},
"text": {
"suffix": [
"txt"
],
"output_format": "json",
},
"audio": {
"suffix":[
"da",
@ -168,10 +158,10 @@ class ParserParam(ProcessParamBase):
spreadsheet_output_format = spreadsheet_config.get("output_format", "")
self.check_valid_value(spreadsheet_output_format, "Spreadsheet output format abnormal.", self.allowed_output_format["spreadsheet"])
doc_config = self.setups.get("doc", "")
doc_config = self.setups.get("word", "")
if doc_config:
doc_output_format = doc_config.get("output_format", "")
self.check_valid_value(doc_output_format, "Word processer document output format abnormal.", self.allowed_output_format["doc"])
self.check_valid_value(doc_output_format, "Word processer document output format abnormal.", self.allowed_output_format["word"])
slides_config = self.setups.get("slides", "")
if slides_config:
@ -181,17 +171,13 @@ class ParserParam(ProcessParamBase):
image_config = self.setups.get("image", "")
if image_config:
image_parse_method = image_config.get("parse_method", "")
self.check_valid_value(image_parse_method.lower(), "Parse method abnormal.", ["ocr", "vlm"])
if image_parse_method not in ["ocr"]:
self.check_empty(image_config.get("llm_id"), "VLM")
self.check_empty(image_config.get("lang", ""), "Language")
image_language = image_config.get("lang", "")
self.check_empty(image_language, "Language")
text_config = self.setups.get("text", "")
text_config = self.setups.get("text&markdown", "")
if text_config:
text_output_format = text_config.get("output_format", "")
self.check_valid_value(text_output_format, "Text output format abnormal.", self.allowed_output_format["text"])
self.check_valid_value(text_output_format, "Text output format abnormal.", self.allowed_output_format["text&markdown"])
audio_config = self.setups.get("audio", "")
if audio_config:
@ -216,9 +202,9 @@ class Parser(ProcessBase):
conf = self._param.setups["pdf"]
self.set_output("output_format", conf["output_format"])
if conf.get("parse_method") == "deepdoc":
if conf.get("parse_method").lower() == "deepdoc":
bboxes = RAGFlowPdfParser().parse_into_bboxes(blob, callback=self.callback)
elif conf.get("parse_method") == "plain_text":
elif conf.get("parse_method").lower() == "plain_text":
lines, _ = PlainParser()(blob)
bboxes = [{"text": t} for t, _ in lines]
else:
@ -299,7 +285,7 @@ class Parser(ProcessBase):
from rag.nlp import concat_img
self.callback(random.randint(1, 5) / 100.0, "Start to work on a markdown.")
conf = self._param.setups["markdown"]
conf = self._param.setups["text&markdown"]
self.set_output("output_format", conf["output_format"])
markdown_parser = naive_markdown_parser()
@ -326,25 +312,22 @@ class Parser(ProcessBase):
self.set_output("text", "\n".join([section_text for section_text, _ in sections]))
def _image(self, from_upstream: ParserFromUpstream):
def _image(self, name, blob):
from deepdoc.vision import OCR
self.callback(random.randint(1, 5) / 100.0, "Start to work on an image.")
blob = from_upstream.blob
conf = self._param.setups["image"]
self.set_output("output_format", conf["output_format"])
img = Image.open(io.BytesIO(blob)).convert("RGB")
lang = conf["lang"]
if conf["parse_method"] == "ocr":
# use ocr, recognize chars only
ocr = OCR()
bxs = ocr(np.array(img)) # return boxes and recognize result
txt = "\n".join([t[0] for _, t in bxs if t[0]])
else:
lang = conf["lang"]
# use VLM to describe the picture
cv_model = LLMBundle(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, llm_name=conf["llm_id"], lang=lang)
img_binary = io.BytesIO()
@ -519,13 +502,18 @@ class Parser(ProcessBase):
else:
blob = FileService.get_blob(from_upstream.file["created_by"], from_upstream.file["id"])
done = False
for p_type, conf in self._param.setups.items():
if from_upstream.name.split(".")[-1].lower() not in conf.get("suffix", []):
continue
await trio.to_thread.run_sync(function_map[p_type], name, blob)
done = True
break
if not done:
raise Exception("No suitable for file extension: `.%s`" % from_upstream.name.split(".")[-1].lower())
outs = self.output()
async with trio.open_nursery() as nursery:
for d in outs.get("json", []):
nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), "_image_temps", get_uuid())
nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), get_uuid())

View File

@ -19,29 +19,38 @@ import logging
import random
import time
from timeit import default_timer as timer
import trio
from agent.canvas import Graph
from api.db import PipelineTaskType
from api.db.services.document_service import DocumentService
from api.db.services.task_service import has_canceled
from api.db.services.pipeline_operation_log_service import PipelineOperationLogService
from rag.utils.redis_conn import REDIS_CONN
class Pipeline(Graph):
def __init__(self, dsl: str, tenant_id=None, doc_id=None, task_id=None, flow_id=None):
def __init__(self, dsl: str|dict, tenant_id=None, doc_id=None, task_id=None, flow_id=None):
if isinstance(dsl, dict):
dsl = json.dumps(dsl, ensure_ascii=False)
super().__init__(dsl, tenant_id, task_id)
if self._doc_id == "x":
self._doc_id = None
self._doc_id = doc_id
self._flow_id = flow_id
self._kb_id = None
if doc_id:
if self._doc_id:
self._kb_id = DocumentService.get_knowledgebase_id(doc_id)
assert self._kb_id, f"Can't find KB of this document: {doc_id}"
if not self._kb_id:
self._doc_id = None
def callback(self, component_name: str, progress: float | int | None = None, message: str = "") -> None:
from rag.svr.task_executor import TaskCanceledException
log_key = f"{self._flow_id}-{self.task_id}-logs"
timestamp = timer()
if has_canceled(self.task_id):
progress = -1
message += "[CANCEL]"
try:
bin = REDIS_CONN.get(log_key)
obj = json.loads(bin.encode("utf-8"))
@ -91,6 +100,9 @@ class Pipeline(Graph):
except Exception as e:
logging.exception(e)
if has_canceled(self.task_id):
raise TaskCanceledException(message)
def fetch_logs(self):
log_key = f"{self._flow_id}-{self.task_id}-logs"
try:

View File

@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import random
from functools import partial
@ -78,7 +77,7 @@ class Splitter(ProcessBase):
deli,
self._param.overlapped_percent,
)
self.set_output("chunks", [{"text": c} for c in cks])
self.set_output("chunks", [{"text": c.strip()} for c in cks if c.strip()])
self.callback(1, "Done.")
return
@ -106,7 +105,6 @@ class Splitter(ProcessBase):
]
async with trio.open_nursery() as nursery:
for d in cks:
nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), "_image_temps", get_uuid())
print("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n", json.dumps(cks, ensure_ascii=False, indent=2))
nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), get_uuid())
self.set_output("chunks", cks)
self.callback(1, "Done.")

View File

@ -114,9 +114,8 @@ class Tokenizer(ProcessBase):
if from_upstream.chunks:
chunks = from_upstream.chunks
for i, ck in enumerate(chunks):
if ck.get("docnm_kwd"): # from presentation method
ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", ck["docnm_kwd"]))
ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"])
ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", from_upstream.name))
ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"])
if ck.get("questions"):
ck["question_tks"] = rag_tokenizer.tokenize("\n".join(ck["questions"]))
if ck.get("keywords"):
@ -125,6 +124,7 @@ class Tokenizer(ProcessBase):
ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"])
if i % 100 == 99:
self.callback(i * 1.0 / len(chunks) / parts)
elif from_upstream.output_format in ["markdown", "text", "html"]:
if from_upstream.output_format == "markdown":
payload = from_upstream.markdown_result
@ -138,18 +138,16 @@ class Tokenizer(ProcessBase):
ck = {"text": payload}
if "full_text" in self._param.search_method:
if ck.get("docnm_kwd"): # from presentation method
ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", ck["docnm_kwd"]))
ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"])
ck["content_ltks"] = rag_tokenizer.tokenize(kwargs.get(kwargs["output_format"], ""))
ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", from_upstream.name))
ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"])
ck["content_ltks"] = rag_tokenizer.tokenize(payload)
ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"])
chunks = [ck]
else:
chunks = from_upstream.json_result
for i, ck in enumerate(chunks):
if ck.get("docnm_kwd"): # from presentation method
ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", ck["docnm_kwd"]))
ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"])
ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", from_upstream.name))
ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"])
ck["content_ltks"] = rag_tokenizer.tokenize(ck["text"])
ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"])
if i % 100 == 99: