Feat: dataflow supports Spreadsheet and Word processor document (#9996)

### What problem does this PR solve?

Dataflow supports Spreadsheet and Word processor document

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Yongteng Lei
2025-09-10 13:02:53 +08:00
committed by GitHub
parent e650f0d368
commit 0d9c1f1c3c
9 changed files with 126 additions and 43 deletions

View File

@ -22,10 +22,10 @@ from openpyxl import Workbook, load_workbook
from rag.nlp import find_codec from rag.nlp import find_codec
# copied from `/openpyxl/cell/cell.py` # copied from `/openpyxl/cell/cell.py`
ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]') ILLEGAL_CHARACTERS_RE = re.compile(r"[\000-\010]|[\013-\014]|[\016-\037]")
class RAGFlowExcelParser: class RAGFlowExcelParser:
@staticmethod @staticmethod
def _load_excel_to_workbook(file_like_object): def _load_excel_to_workbook(file_like_object):
if isinstance(file_like_object, bytes): if isinstance(file_like_object, bytes):
@ -36,7 +36,7 @@ class RAGFlowExcelParser:
file_head = file_like_object.read(4) file_head = file_like_object.read(4)
file_like_object.seek(0) file_like_object.seek(0)
if not (file_head.startswith(b'PK\x03\x04') or file_head.startswith(b'\xD0\xCF\x11\xE0')): if not (file_head.startswith(b"PK\x03\x04") or file_head.startswith(b"\xd0\xcf\x11\xe0")):
logging.info("Not an Excel file, converting CSV to Excel Workbook") logging.info("Not an Excel file, converting CSV to Excel Workbook")
try: try:
@ -48,7 +48,7 @@ class RAGFlowExcelParser:
raise Exception(f"Failed to parse CSV and convert to Excel Workbook: {e_csv}") raise Exception(f"Failed to parse CSV and convert to Excel Workbook: {e_csv}")
try: try:
return load_workbook(file_like_object,data_only= True) return load_workbook(file_like_object, data_only=True)
except Exception as e: except Exception as e:
logging.info(f"openpyxl load error: {e}, try pandas instead") logging.info(f"openpyxl load error: {e}, try pandas instead")
try: try:
@ -59,7 +59,7 @@ class RAGFlowExcelParser:
except Exception as ex: except Exception as ex:
logging.info(f"pandas with default engine load error: {ex}, try calamine instead") logging.info(f"pandas with default engine load error: {ex}, try calamine instead")
file_like_object.seek(0) file_like_object.seek(0)
df = pd.read_excel(file_like_object, engine='calamine') df = pd.read_excel(file_like_object, engine="calamine")
return RAGFlowExcelParser._dataframe_to_workbook(df) return RAGFlowExcelParser._dataframe_to_workbook(df)
except Exception as e_pandas: except Exception as e_pandas:
raise Exception(f"pandas.read_excel error: {e_pandas}, original openpyxl error: {e}") raise Exception(f"pandas.read_excel error: {e_pandas}, original openpyxl error: {e}")
@ -116,9 +116,7 @@ class RAGFlowExcelParser:
tb = "" tb = ""
tb += f"<table><caption>{sheetname}</caption>" tb += f"<table><caption>{sheetname}</caption>"
tb += tb_rows_0 tb += tb_rows_0
for r in list( for r in list(rows[1 + chunk_i * chunk_rows : min(1 + (chunk_i + 1) * chunk_rows, len(rows))]):
rows[1 + chunk_i * chunk_rows: min(1 + (chunk_i + 1) * chunk_rows, len(rows))]
):
tb += "<tr>" tb += "<tr>"
for i, c in enumerate(r): for i, c in enumerate(r):
if c.value is None: if c.value is None:
@ -133,8 +131,16 @@ class RAGFlowExcelParser:
def markdown(self, fnm): def markdown(self, fnm):
import pandas as pd import pandas as pd
file_like_object = BytesIO(fnm) if not isinstance(fnm, str) else fnm file_like_object = BytesIO(fnm) if not isinstance(fnm, str) else fnm
df = pd.read_excel(file_like_object) try:
file_like_object.seek(0)
df = pd.read_excel(file_like_object)
except Exception as e:
logging.warning(f"Parse spreadsheet error: {e}, trying to interpret as CSV file")
file_like_object.seek(0)
df = pd.read_csv(file_like_object)
df = df.replace(r"^\s*$", "", regex=True)
return df.to_markdown(index=False) return df.to_markdown(index=False)
def __call__(self, fnm): def __call__(self, fnm):

View File

@ -73,11 +73,13 @@ class Chunker(ProcessBase):
def _general(self, from_upstream: ChunkerFromUpstream): def _general(self, from_upstream: ChunkerFromUpstream):
self.callback(random.randint(1, 5) / 100.0, "Start to chunk via `General`.") self.callback(random.randint(1, 5) / 100.0, "Start to chunk via `General`.")
if from_upstream.output_format in ["markdown", "text"]: if from_upstream.output_format in ["markdown", "text", "html"]:
if from_upstream.output_format == "markdown": if from_upstream.output_format == "markdown":
payload = from_upstream.markdown_result payload = from_upstream.markdown_result
else: # == "text" elif from_upstream.output_format == "text":
payload = from_upstream.text_result payload = from_upstream.text_result
else: # == "html"
payload = from_upstream.html_result
if not payload: if not payload:
payload = "" payload = ""
@ -90,6 +92,7 @@ class Chunker(ProcessBase):
) )
return [{"text": c} for c in cks] return [{"text": c} for c in cks]
# json
sections, section_images = [], [] sections, section_images = [], []
for o in from_upstream.json_result or []: for o in from_upstream.json_result or []:
sections.append((o.get("text", ""), o.get("position_tag", ""))) sections.append((o.get("text", ""), o.get("position_tag", "")))

View File

@ -29,7 +29,7 @@ class ChunkerFromUpstream(BaseModel):
json_result: list[dict[str, Any]] | None = Field(default=None, alias="json") json_result: list[dict[str, Any]] | None = Field(default=None, alias="json")
markdown_result: str | None = Field(default=None, alias="markdown") markdown_result: str | None = Field(default=None, alias="markdown")
text_result: str | None = Field(default=None, alias="text") text_result: str | None = Field(default=None, alias="text")
html_result: str | None = Field(default=None, alias="html") html_result: list[str] | None = Field(default=None, alias="html")
model_config = ConfigDict(populate_by_name=True, extra="forbid") model_config = ConfigDict(populate_by_name=True, extra="forbid")

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging
import random import random
import trio import trio
@ -29,8 +30,18 @@ class ParserParam(ProcessParamBase):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.allowed_output_format = { self.allowed_output_format = {
"pdf": ["json", "markdown"], "pdf": [
"excel": ["json", "markdown", "html"], "json",
"markdown",
],
"spreadsheet": [
"json",
"markdown",
"html",
],
"word": [
"json",
],
"ppt": [], "ppt": [],
"image": [], "image": [],
"email": [], "email": [],
@ -44,12 +55,25 @@ class ParserParam(ProcessParamBase):
"parse_method": "deepdoc", # deepdoc/plain_text/vlm "parse_method": "deepdoc", # deepdoc/plain_text/vlm
"vlm_name": "", "vlm_name": "",
"lang": "Chinese", "lang": "Chinese",
"suffix": ["pdf"], "suffix": [
"pdf",
],
"output_format": "json", "output_format": "json",
}, },
"excel": { "spreadsheet": {
"output_format": "html", "output_format": "html",
"suffix": ["xls", "xlsx", "csv"], "suffix": [
"xls",
"xlsx",
"csv",
],
},
"word": {
"suffix": [
"doc",
"docx",
],
"output_format": "json",
}, },
"ppt": {}, "ppt": {},
"image": { "image": {
@ -76,10 +100,15 @@ class ParserParam(ProcessParamBase):
pdf_output_format = pdf_config.get("output_format", "") pdf_output_format = pdf_config.get("output_format", "")
self.check_valid_value(pdf_output_format, "PDF output format abnormal.", self.allowed_output_format["pdf"]) self.check_valid_value(pdf_output_format, "PDF output format abnormal.", self.allowed_output_format["pdf"])
excel_config = self.setups.get("excel", "") spreadsheet_config = self.setups.get("spreadsheet", "")
if excel_config: if spreadsheet_config:
excel_output_format = excel_config.get("output_format", "") spreadsheet_output_format = spreadsheet_config.get("output_format", "")
self.check_valid_value(excel_output_format, "Excel output format abnormal.", self.allowed_output_format["excel"]) self.check_valid_value(spreadsheet_output_format, "Spreadsheet output format abnormal.", self.allowed_output_format["spreadsheet"])
doc_config = self.setups.get("doc", "")
if doc_config:
doc_output_format = doc_config.get("output_format", "")
self.check_valid_value(doc_output_format, "Word processer document output format abnormal.", self.allowed_output_format["doc"])
image_config = self.setups.get("image", "") image_config = self.setups.get("image", "")
if image_config: if image_config:
@ -93,10 +122,13 @@ class ParserParam(ProcessParamBase):
class Parser(ProcessBase): class Parser(ProcessBase):
component_name = "Parser" component_name = "Parser"
def _pdf(self, blob): def _pdf(self, from_upstream: ParserFromUpstream):
self.callback(random.randint(1, 5) / 100.0, "Start to work on a PDF.") self.callback(random.randint(1, 5) / 100.0, "Start to work on a PDF.")
blob = from_upstream.blob
conf = self._param.setups["pdf"] conf = self._param.setups["pdf"]
self.set_output("output_format", conf["output_format"]) self.set_output("output_format", conf["output_format"])
if conf.get("parse_method") == "deepdoc": if conf.get("parse_method") == "deepdoc":
bboxes = RAGFlowPdfParser().parse_into_bboxes(blob, callback=self.callback) bboxes = RAGFlowPdfParser().parse_into_bboxes(blob, callback=self.callback)
elif conf.get("parse_method") == "plain_text": elif conf.get("parse_method") == "plain_text":
@ -110,6 +142,7 @@ class Parser(ProcessBase):
for t, poss in lines: for t, poss in lines:
pn, x0, x1, top, bott = poss.split(" ") pn, x0, x1, top, bott = poss.split(" ")
bboxes.append({"page_number": int(pn), "x0": float(x0), "x1": float(x1), "top": float(top), "bottom": float(bott), "text": t}) bboxes.append({"page_number": int(pn), "x0": float(x0), "x1": float(x1), "top": float(top), "bottom": float(bott), "text": t})
if conf.get("output_format") == "json": if conf.get("output_format") == "json":
self.set_output("json", bboxes) self.set_output("json", bboxes)
if conf.get("output_format") == "markdown": if conf.get("output_format") == "markdown":
@ -123,23 +156,53 @@ class Parser(ProcessBase):
mkdn += b.get("text", "") + "\n" mkdn += b.get("text", "") + "\n"
self.set_output("markdown", mkdn) self.set_output("markdown", mkdn)
def _excel(self, blob): def _spreadsheet(self, from_upstream: ParserFromUpstream):
self.callback(random.randint(1, 5) / 100.0, "Start to work on a Excel.") self.callback(random.randint(1, 5) / 100.0, "Start to work on a Spreadsheet.")
conf = self._param.setups["excel"]
blob = from_upstream.blob
conf = self._param.setups["spreadsheet"]
self.set_output("output_format", conf["output_format"]) self.set_output("output_format", conf["output_format"])
excel_parser = ExcelParser()
print("spreadsheet {conf=}", flush=True)
spreadsheet_parser = ExcelParser()
if conf.get("output_format") == "html": if conf.get("output_format") == "html":
html = excel_parser.html(blob, 1000000000) html = spreadsheet_parser.html(blob, 1000000000)
self.set_output("html", html) self.set_output("html", html)
elif conf.get("output_format") == "json": elif conf.get("output_format") == "json":
self.set_output("json", [{"text": txt} for txt in excel_parser(blob) if txt]) self.set_output("json", [{"text": txt} for txt in spreadsheet_parser(blob) if txt])
elif conf.get("output_format") == "markdown": elif conf.get("output_format") == "markdown":
self.set_output("markdown", excel_parser.markdown(blob)) self.set_output("markdown", spreadsheet_parser.markdown(blob))
def _word(self, from_upstream: ParserFromUpstream):
from tika import parser as word_parser
self.callback(random.randint(1, 5) / 100.0, "Start to work on a Word Processor Document")
blob = from_upstream.blob
name = from_upstream.name
conf = self._param.setups["word"]
self.set_output("output_format", conf["output_format"])
print("word {conf=}", flush=True)
doc_parsed = word_parser.from_buffer(blob)
sections = []
if doc_parsed.get("content"):
sections = doc_parsed["content"].split("\n")
sections = [{"text": section} for section in sections if section]
else:
logging.warning(f"tika.parser got empty content from {name}.")
# json
assert conf.get("output_format") == "json", "have to be json for doc"
if conf.get("output_format") == "json":
self.set_output("json", sections)
async def _invoke(self, **kwargs): async def _invoke(self, **kwargs):
function_map = { function_map = {
"pdf": self._pdf, "pdf": self._pdf,
"excel": self._excel, "spreadsheet": self._spreadsheet,
"word": self._word,
} }
try: try:
from_upstream = ParserFromUpstream.model_validate(kwargs) from_upstream = ParserFromUpstream.model_validate(kwargs)
@ -150,5 +213,5 @@ class Parser(ProcessBase):
for p_type, conf in self._param.setups.items(): for p_type, conf in self._param.setups.items():
if from_upstream.name.split(".")[-1].lower() not in conf.get("suffix", []): if from_upstream.name.split(".")[-1].lower() not in conf.get("suffix", []):
continue continue
await trio.to_thread.run_sync(function_map[p_type], from_upstream.blob) await trio.to_thread.run_sync(function_map[p_type], from_upstream)
break break

View File

@ -23,13 +23,20 @@
], ],
"output_format": "json" "output_format": "json"
}, },
"excel": { "spreadsheet": {
"output_format": "html",
"suffix": [ "suffix": [
"xls", "xls",
"xlsx", "xlsx",
"csv" "csv"
] ],
"output_format": "html"
},
"word": {
"suffix": [
"doc",
"docx"
],
"output_format": "json"
} }
} }
} }

View File

@ -31,7 +31,7 @@ class TokenizerFromUpstream(BaseModel):
json_result: list[dict[str, Any]] | None = Field(default=None, alias="json") json_result: list[dict[str, Any]] | None = Field(default=None, alias="json")
markdown_result: str | None = Field(default=None, alias="markdown") markdown_result: str | None = Field(default=None, alias="markdown")
text_result: str | None = Field(default=None, alias="text") text_result: str | None = Field(default=None, alias="text")
html_result: str | None = Field(default=None, alias="html") html_result: list[str] | None = Field(default=None, alias="html")
model_config = ConfigDict(populate_by_name=True, extra="forbid") model_config = ConfigDict(populate_by_name=True, extra="forbid")

View File

@ -117,11 +117,13 @@ class Tokenizer(ProcessBase):
ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"]) ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"])
if i % 100 == 99: if i % 100 == 99:
self.callback(i * 1.0 / len(chunks) / parts) self.callback(i * 1.0 / len(chunks) / parts)
elif from_upstream.output_format in ["markdown", "text"]: elif from_upstream.output_format in ["markdown", "text", "html"]:
if from_upstream.output_format == "markdown": if from_upstream.output_format == "markdown":
payload = from_upstream.markdown_result payload = from_upstream.markdown_result
else: # == "text" elif from_upstream.output_format == "text":
payload = from_upstream.text_result payload = from_upstream.text_result
else: # == "html"
payload = from_upstream.html_result
if not payload: if not payload:
return "" return ""

View File

@ -751,6 +751,8 @@ class SILICONFLOWEmbed(Base):
token_count = 0 token_count = 0
for i in range(0, len(texts), batch_size): for i in range(0, len(texts), batch_size):
texts_batch = texts[i : i + batch_size] texts_batch = texts[i : i + batch_size]
texts_batch = [" " if not text.strip() else text for text in texts_batch]
payload = { payload = {
"model": self.model_name, "model": self.model_name,
"input": texts_batch, "input": texts_batch,
@ -935,7 +937,7 @@ class GiteeEmbed(SILICONFLOWEmbed):
if not base_url: if not base_url:
base_url = "https://ai.gitee.com/v1/embeddings" base_url = "https://ai.gitee.com/v1/embeddings"
super().__init__(key, model_name, base_url) super().__init__(key, model_name, base_url)
class DeepInfraEmbed(OpenAIEmbed): class DeepInfraEmbed(OpenAIEmbed):
_FACTORY_NAME = "DeepInfra" _FACTORY_NAME = "DeepInfra"
@ -951,4 +953,4 @@ class Ai302Embed(Base):
def __init__(self, key, model_name, base_url="https://api.302.ai/v1/embeddings"): def __init__(self, key, model_name, base_url="https://api.302.ai/v1/embeddings"):
if not base_url: if not base_url:
base_url = "https://api.302.ai/v1/embeddings" base_url = "https://api.302.ai/v1/embeddings"
super().__init__(key, model_name, base_url) super().__init__(key, model_name, base_url)

View File

@ -518,7 +518,7 @@ def hierarchical_merge(bull, sections, depth):
return res return res
def naive_merge(sections, chunk_token_num=128, delimiter="\n。;!?", overlapped_percent=0): def naive_merge(sections: str | list, chunk_token_num=128, delimiter="\n。;!?", overlapped_percent=0):
from deepdoc.parser.pdf_parser import RAGFlowPdfParser from deepdoc.parser.pdf_parser import RAGFlowPdfParser
if not sections: if not sections:
return [] return []
@ -534,7 +534,7 @@ def naive_merge(sections, chunk_token_num=128, delimiter="\n。", overl
pos = "" pos = ""
if tnum < 8: if tnum < 8:
pos = "" pos = ""
# Ensure that the length of the merged chunk does not exceed chunk_token_num # Ensure that the length of the merged chunk does not exceed chunk_token_num
if cks[-1] == "" or tk_nums[-1] > chunk_token_num * (100 - overlapped_percent)/100.: if cks[-1] == "" or tk_nums[-1] > chunk_token_num * (100 - overlapped_percent)/100.:
if cks: if cks:
overlapped = RAGFlowPdfParser.remove_tag(cks[-1]) overlapped = RAGFlowPdfParser.remove_tag(cks[-1])
@ -638,10 +638,10 @@ def concat_img(img1, img2):
return img2 return img2
if not img1 and not img2: if not img1 and not img2:
return None return None
if img1 is img2: if img1 is img2:
return img1 return img1
if isinstance(img1, Image.Image) and isinstance(img2, Image.Image): if isinstance(img1, Image.Image) and isinstance(img2, Image.Image):
pixel_data1 = img1.tobytes() pixel_data1 = img1.tobytes()
pixel_data2 = img2.tobytes() pixel_data2 = img2.tobytes()