From 0d9c1f1c3c861ecfb9adcef3de94c266dc9cf9d0 Mon Sep 17 00:00:00 2001 From: Yongteng Lei Date: Wed, 10 Sep 2025 13:02:53 +0800 Subject: [PATCH] Feat: dataflow supports Spreadsheet and Word processor document (#9996) ### What problem does this PR solve? Dataflow supports Spreadsheet and Word processor document ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- deepdoc/parser/excel_parser.py | 24 +++-- rag/flow/chunker/chunker.py | 7 +- rag/flow/chunker/schema.py | 2 +- rag/flow/parser/parser.py | 101 ++++++++++++++---- .../tests/dsl_examples/general_pdf_all.json | 13 ++- rag/flow/tokenizer/schema.py | 2 +- rag/flow/tokenizer/tokenizer.py | 6 +- rag/llm/embedding_model.py | 6 +- rag/nlp/__init__.py | 8 +- 9 files changed, 126 insertions(+), 43 deletions(-) diff --git a/deepdoc/parser/excel_parser.py b/deepdoc/parser/excel_parser.py index 29bf4e2e6..315df7df3 100644 --- a/deepdoc/parser/excel_parser.py +++ b/deepdoc/parser/excel_parser.py @@ -22,10 +22,10 @@ from openpyxl import Workbook, load_workbook from rag.nlp import find_codec # copied from `/openpyxl/cell/cell.py` -ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]') +ILLEGAL_CHARACTERS_RE = re.compile(r"[\000-\010]|[\013-\014]|[\016-\037]") + class RAGFlowExcelParser: - @staticmethod def _load_excel_to_workbook(file_like_object): if isinstance(file_like_object, bytes): @@ -36,7 +36,7 @@ class RAGFlowExcelParser: file_head = file_like_object.read(4) file_like_object.seek(0) - if not (file_head.startswith(b'PK\x03\x04') or file_head.startswith(b'\xD0\xCF\x11\xE0')): + if not (file_head.startswith(b"PK\x03\x04") or file_head.startswith(b"\xd0\xcf\x11\xe0")): logging.info("Not an Excel file, converting CSV to Excel Workbook") try: @@ -48,7 +48,7 @@ class RAGFlowExcelParser: raise Exception(f"Failed to parse CSV and convert to Excel Workbook: {e_csv}") try: - return load_workbook(file_like_object,data_only= True) + return load_workbook(file_like_object, data_only=True) except Exception as e: logging.info(f"openpyxl load error: {e}, try pandas instead") try: @@ -59,7 +59,7 @@ class RAGFlowExcelParser: except Exception as ex: logging.info(f"pandas with default engine load error: {ex}, try calamine instead") file_like_object.seek(0) - df = pd.read_excel(file_like_object, engine='calamine') + df = pd.read_excel(file_like_object, engine="calamine") return RAGFlowExcelParser._dataframe_to_workbook(df) except Exception as e_pandas: raise Exception(f"pandas.read_excel error: {e_pandas}, original openpyxl error: {e}") @@ -116,9 +116,7 @@ class RAGFlowExcelParser: tb = "" tb += f"" tb += tb_rows_0 - for r in list( - rows[1 + chunk_i * chunk_rows: min(1 + (chunk_i + 1) * chunk_rows, len(rows))] - ): + for r in list(rows[1 + chunk_i * chunk_rows : min(1 + (chunk_i + 1) * chunk_rows, len(rows))]): tb += "" for i, c in enumerate(r): if c.value is None: @@ -133,8 +131,16 @@ class RAGFlowExcelParser: def markdown(self, fnm): import pandas as pd + file_like_object = BytesIO(fnm) if not isinstance(fnm, str) else fnm - df = pd.read_excel(file_like_object) + try: + file_like_object.seek(0) + df = pd.read_excel(file_like_object) + except Exception as e: + logging.warning(f"Parse spreadsheet error: {e}, trying to interpret as CSV file") + file_like_object.seek(0) + df = pd.read_csv(file_like_object) + df = df.replace(r"^\s*$", "", regex=True) return df.to_markdown(index=False) def __call__(self, fnm): diff --git a/rag/flow/chunker/chunker.py b/rag/flow/chunker/chunker.py index f853fc9e7..a8281c306 100644 --- a/rag/flow/chunker/chunker.py +++ b/rag/flow/chunker/chunker.py @@ -73,11 +73,13 @@ class Chunker(ProcessBase): def _general(self, from_upstream: ChunkerFromUpstream): self.callback(random.randint(1, 5) / 100.0, "Start to chunk via `General`.") - if from_upstream.output_format in ["markdown", "text"]: + if from_upstream.output_format in ["markdown", "text", "html"]: if from_upstream.output_format == "markdown": payload = from_upstream.markdown_result - else: # == "text" + elif from_upstream.output_format == "text": payload = from_upstream.text_result + else: # == "html" + payload = from_upstream.html_result if not payload: payload = "" @@ -90,6 +92,7 @@ class Chunker(ProcessBase): ) return [{"text": c} for c in cks] + # json sections, section_images = [], [] for o in from_upstream.json_result or []: sections.append((o.get("text", ""), o.get("position_tag", ""))) diff --git a/rag/flow/chunker/schema.py b/rag/flow/chunker/schema.py index 0f0e3042c..bfeff447d 100644 --- a/rag/flow/chunker/schema.py +++ b/rag/flow/chunker/schema.py @@ -29,7 +29,7 @@ class ChunkerFromUpstream(BaseModel): json_result: list[dict[str, Any]] | None = Field(default=None, alias="json") markdown_result: str | None = Field(default=None, alias="markdown") text_result: str | None = Field(default=None, alias="text") - html_result: str | None = Field(default=None, alias="html") + html_result: list[str] | None = Field(default=None, alias="html") model_config = ConfigDict(populate_by_name=True, extra="forbid") diff --git a/rag/flow/parser/parser.py b/rag/flow/parser/parser.py index fd65665fa..f70c4d958 100644 --- a/rag/flow/parser/parser.py +++ b/rag/flow/parser/parser.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging import random import trio @@ -29,8 +30,18 @@ class ParserParam(ProcessParamBase): def __init__(self): super().__init__() self.allowed_output_format = { - "pdf": ["json", "markdown"], - "excel": ["json", "markdown", "html"], + "pdf": [ + "json", + "markdown", + ], + "spreadsheet": [ + "json", + "markdown", + "html", + ], + "word": [ + "json", + ], "ppt": [], "image": [], "email": [], @@ -44,12 +55,25 @@ class ParserParam(ProcessParamBase): "parse_method": "deepdoc", # deepdoc/plain_text/vlm "vlm_name": "", "lang": "Chinese", - "suffix": ["pdf"], + "suffix": [ + "pdf", + ], "output_format": "json", }, - "excel": { + "spreadsheet": { "output_format": "html", - "suffix": ["xls", "xlsx", "csv"], + "suffix": [ + "xls", + "xlsx", + "csv", + ], + }, + "word": { + "suffix": [ + "doc", + "docx", + ], + "output_format": "json", }, "ppt": {}, "image": { @@ -76,10 +100,15 @@ class ParserParam(ProcessParamBase): pdf_output_format = pdf_config.get("output_format", "") self.check_valid_value(pdf_output_format, "PDF output format abnormal.", self.allowed_output_format["pdf"]) - excel_config = self.setups.get("excel", "") - if excel_config: - excel_output_format = excel_config.get("output_format", "") - self.check_valid_value(excel_output_format, "Excel output format abnormal.", self.allowed_output_format["excel"]) + spreadsheet_config = self.setups.get("spreadsheet", "") + if spreadsheet_config: + spreadsheet_output_format = spreadsheet_config.get("output_format", "") + self.check_valid_value(spreadsheet_output_format, "Spreadsheet output format abnormal.", self.allowed_output_format["spreadsheet"]) + + doc_config = self.setups.get("doc", "") + if doc_config: + doc_output_format = doc_config.get("output_format", "") + self.check_valid_value(doc_output_format, "Word processer document output format abnormal.", self.allowed_output_format["doc"]) image_config = self.setups.get("image", "") if image_config: @@ -93,10 +122,13 @@ class ParserParam(ProcessParamBase): class Parser(ProcessBase): component_name = "Parser" - def _pdf(self, blob): + def _pdf(self, from_upstream: ParserFromUpstream): self.callback(random.randint(1, 5) / 100.0, "Start to work on a PDF.") + + blob = from_upstream.blob conf = self._param.setups["pdf"] self.set_output("output_format", conf["output_format"]) + if conf.get("parse_method") == "deepdoc": bboxes = RAGFlowPdfParser().parse_into_bboxes(blob, callback=self.callback) elif conf.get("parse_method") == "plain_text": @@ -110,6 +142,7 @@ class Parser(ProcessBase): for t, poss in lines: pn, x0, x1, top, bott = poss.split(" ") bboxes.append({"page_number": int(pn), "x0": float(x0), "x1": float(x1), "top": float(top), "bottom": float(bott), "text": t}) + if conf.get("output_format") == "json": self.set_output("json", bboxes) if conf.get("output_format") == "markdown": @@ -123,23 +156,53 @@ class Parser(ProcessBase): mkdn += b.get("text", "") + "\n" self.set_output("markdown", mkdn) - def _excel(self, blob): - self.callback(random.randint(1, 5) / 100.0, "Start to work on a Excel.") - conf = self._param.setups["excel"] + def _spreadsheet(self, from_upstream: ParserFromUpstream): + self.callback(random.randint(1, 5) / 100.0, "Start to work on a Spreadsheet.") + + blob = from_upstream.blob + conf = self._param.setups["spreadsheet"] self.set_output("output_format", conf["output_format"]) - excel_parser = ExcelParser() + + print("spreadsheet {conf=}", flush=True) + spreadsheet_parser = ExcelParser() if conf.get("output_format") == "html": - html = excel_parser.html(blob, 1000000000) + html = spreadsheet_parser.html(blob, 1000000000) self.set_output("html", html) elif conf.get("output_format") == "json": - self.set_output("json", [{"text": txt} for txt in excel_parser(blob) if txt]) + self.set_output("json", [{"text": txt} for txt in spreadsheet_parser(blob) if txt]) elif conf.get("output_format") == "markdown": - self.set_output("markdown", excel_parser.markdown(blob)) + self.set_output("markdown", spreadsheet_parser.markdown(blob)) + + def _word(self, from_upstream: ParserFromUpstream): + from tika import parser as word_parser + + self.callback(random.randint(1, 5) / 100.0, "Start to work on a Word Processor Document") + + blob = from_upstream.blob + name = from_upstream.name + conf = self._param.setups["word"] + self.set_output("output_format", conf["output_format"]) + + print("word {conf=}", flush=True) + doc_parsed = word_parser.from_buffer(blob) + + sections = [] + if doc_parsed.get("content"): + sections = doc_parsed["content"].split("\n") + sections = [{"text": section} for section in sections if section] + else: + logging.warning(f"tika.parser got empty content from {name}.") + + # json + assert conf.get("output_format") == "json", "have to be json for doc" + if conf.get("output_format") == "json": + self.set_output("json", sections) async def _invoke(self, **kwargs): function_map = { "pdf": self._pdf, - "excel": self._excel, + "spreadsheet": self._spreadsheet, + "word": self._word, } try: from_upstream = ParserFromUpstream.model_validate(kwargs) @@ -150,5 +213,5 @@ class Parser(ProcessBase): for p_type, conf in self._param.setups.items(): if from_upstream.name.split(".")[-1].lower() not in conf.get("suffix", []): continue - await trio.to_thread.run_sync(function_map[p_type], from_upstream.blob) + await trio.to_thread.run_sync(function_map[p_type], from_upstream) break diff --git a/rag/flow/tests/dsl_examples/general_pdf_all.json b/rag/flow/tests/dsl_examples/general_pdf_all.json index 7142e5547..df713bb6d 100644 --- a/rag/flow/tests/dsl_examples/general_pdf_all.json +++ b/rag/flow/tests/dsl_examples/general_pdf_all.json @@ -23,13 +23,20 @@ ], "output_format": "json" }, - "excel": { - "output_format": "html", + "spreadsheet": { "suffix": [ "xls", "xlsx", "csv" - ] + ], + "output_format": "html" + }, + "word": { + "suffix": [ + "doc", + "docx" + ], + "output_format": "json" } } } diff --git a/rag/flow/tokenizer/schema.py b/rag/flow/tokenizer/schema.py index 508fa002c..d58725171 100644 --- a/rag/flow/tokenizer/schema.py +++ b/rag/flow/tokenizer/schema.py @@ -31,7 +31,7 @@ class TokenizerFromUpstream(BaseModel): json_result: list[dict[str, Any]] | None = Field(default=None, alias="json") markdown_result: str | None = Field(default=None, alias="markdown") text_result: str | None = Field(default=None, alias="text") - html_result: str | None = Field(default=None, alias="html") + html_result: list[str] | None = Field(default=None, alias="html") model_config = ConfigDict(populate_by_name=True, extra="forbid") diff --git a/rag/flow/tokenizer/tokenizer.py b/rag/flow/tokenizer/tokenizer.py index 5ad209776..5b43a9d82 100644 --- a/rag/flow/tokenizer/tokenizer.py +++ b/rag/flow/tokenizer/tokenizer.py @@ -117,11 +117,13 @@ class Tokenizer(ProcessBase): ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"]) if i % 100 == 99: self.callback(i * 1.0 / len(chunks) / parts) - elif from_upstream.output_format in ["markdown", "text"]: + elif from_upstream.output_format in ["markdown", "text", "html"]: if from_upstream.output_format == "markdown": payload = from_upstream.markdown_result - else: # == "text" + elif from_upstream.output_format == "text": payload = from_upstream.text_result + else: # == "html" + payload = from_upstream.html_result if not payload: return "" diff --git a/rag/llm/embedding_model.py b/rag/llm/embedding_model.py index 4a9f375a6..d39e0f0cc 100644 --- a/rag/llm/embedding_model.py +++ b/rag/llm/embedding_model.py @@ -751,6 +751,8 @@ class SILICONFLOWEmbed(Base): token_count = 0 for i in range(0, len(texts), batch_size): texts_batch = texts[i : i + batch_size] + texts_batch = [" " if not text.strip() else text for text in texts_batch] + payload = { "model": self.model_name, "input": texts_batch, @@ -935,7 +937,7 @@ class GiteeEmbed(SILICONFLOWEmbed): if not base_url: base_url = "https://ai.gitee.com/v1/embeddings" super().__init__(key, model_name, base_url) - + class DeepInfraEmbed(OpenAIEmbed): _FACTORY_NAME = "DeepInfra" @@ -951,4 +953,4 @@ class Ai302Embed(Base): def __init__(self, key, model_name, base_url="https://api.302.ai/v1/embeddings"): if not base_url: base_url = "https://api.302.ai/v1/embeddings" - super().__init__(key, model_name, base_url) \ No newline at end of file + super().__init__(key, model_name, base_url) diff --git a/rag/nlp/__init__.py b/rag/nlp/__init__.py index fc548ee61..2424ba033 100644 --- a/rag/nlp/__init__.py +++ b/rag/nlp/__init__.py @@ -518,7 +518,7 @@ def hierarchical_merge(bull, sections, depth): return res -def naive_merge(sections, chunk_token_num=128, delimiter="\n。;!?", overlapped_percent=0): +def naive_merge(sections: str | list, chunk_token_num=128, delimiter="\n。;!?", overlapped_percent=0): from deepdoc.parser.pdf_parser import RAGFlowPdfParser if not sections: return [] @@ -534,7 +534,7 @@ def naive_merge(sections, chunk_token_num=128, delimiter="\n。;!?", overl pos = "" if tnum < 8: pos = "" - # Ensure that the length of the merged chunk does not exceed chunk_token_num + # Ensure that the length of the merged chunk does not exceed chunk_token_num if cks[-1] == "" or tk_nums[-1] > chunk_token_num * (100 - overlapped_percent)/100.: if cks: overlapped = RAGFlowPdfParser.remove_tag(cks[-1]) @@ -638,10 +638,10 @@ def concat_img(img1, img2): return img2 if not img1 and not img2: return None - + if img1 is img2: return img1 - + if isinstance(img1, Image.Image) and isinstance(img2, Image.Image): pixel_data1 = img1.tobytes() pixel_data2 = img2.tobytes()
{sheetname}