From 65571e52544b22b7f24e942972323a1ca0766c5a Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 11 Sep 2025 19:03:51 +0800 Subject: [PATCH] Feat: dataflow supports text (#10058) ### What problem does this PR solve? dataflow supports text. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- rag/flow/parser/parser.py | 43 ++++++++++++++++--- .../tests/dsl_examples/general_pdf_all.json | 5 ++- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/rag/flow/parser/parser.py b/rag/flow/parser/parser.py index 26f5021f1..0770f5945 100644 --- a/rag/flow/parser/parser.py +++ b/rag/flow/parser/parser.py @@ -45,7 +45,10 @@ class ParserParam(ProcessParamBase): "ppt": [], "image": [], "email": [], - "text": [], + "text": [ + "text", + "json" + ], "audio": [], "video": [], } @@ -84,7 +87,12 @@ class ParserParam(ProcessParamBase): "parse_method": "ocr", }, "email": {}, - "text": {}, + "text": { + "suffix": [ + "txt" + ], + "output_format": "json", + }, "audio": {}, "video": {}, } @@ -119,6 +127,11 @@ class ParserParam(ProcessParamBase): image_parse_method = image_config.get("parse_method", "") self.check_valid_value(image_parse_method.lower(), "Parse method abnormal.", ["ocr"]) + text_config = self.setups.get("text", "") + if text_config: + text_output_format = text_config.get("output_format", "") + self.check_valid_value(text_output_format, "Text output format abnormal.", self.allowed_output_format["text"]) + def get_input_form(self) -> dict[str, dict]: return {} @@ -208,15 +221,13 @@ class Parser(ProcessBase): from rag.app.naive import Markdown as naive_markdown_parser from rag.nlp import concat_img - self.callback(random.randint(1, 5) / 100.0, "Start to work on a Word Processor Document") + self.callback(random.randint(1, 5) / 100.0, "Start to work on a markdown.") blob = from_upstream.blob name = from_upstream.name conf = self._param.setups["markdown"] self.set_output("output_format", conf["output_format"]) - print("markdown {conf=}", flush=True) - markdown_parser = naive_markdown_parser() sections, tables = markdown_parser(name, blob, separate_tables=False) @@ -240,13 +251,33 @@ class Parser(ProcessBase): self.set_output("json", json_results) + def _text(self, from_upstream: ParserFromUpstream): + from deepdoc.parser.utils import get_text + + self.callback(random.randint(1, 5) / 100.0, "Start to work on a text.") + + blob = from_upstream.blob + name = from_upstream.name + conf = self._param.setups["text"] + self.set_output("output_format", conf["output_format"]) + + # parse binary to text + text_content = get_text(name, binary=blob) + + if conf.get("output_format") == "json": + result = [{"text": text_content}] + self.set_output("json", result) + else: + result = text_content + self.set_output("text", result) async def _invoke(self, **kwargs): function_map = { "pdf": self._pdf, "markdown": self._markdown, "spreadsheet": self._spreadsheet, - "word": self._word + "word": self._word, + "text": self._text, } try: from_upstream = ParserFromUpstream.model_validate(kwargs) diff --git a/rag/flow/tests/dsl_examples/general_pdf_all.json b/rag/flow/tests/dsl_examples/general_pdf_all.json index 42eae3f61..6a13f116a 100644 --- a/rag/flow/tests/dsl_examples/general_pdf_all.json +++ b/rag/flow/tests/dsl_examples/general_pdf_all.json @@ -44,9 +44,12 @@ "markdown" ], "output_format": "json" + }, + "text": { + "suffix": ["txt"], + "output_format": "json" } } - } } }, "downstream": ["Chunker:0"],