# # Copyright 2025 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import io import json import os import random from functools import partial import trio import numpy as np from PIL import Image from api.db import LLMType from api.db.services.file2document_service import File2DocumentService from api.db.services.file_service import FileService from api.db.services.llm_service import LLMBundle from common.misc_utils import get_uuid from common.base64_image import image2id from deepdoc.parser import ExcelParser from deepdoc.parser.mineru_parser import MinerUParser from deepdoc.parser.pdf_parser import PlainParser, RAGFlowPdfParser, VisionParser from deepdoc.parser.tcadp_parser import TCADPParser from rag.app.naive import Docx from rag.flow.base import ProcessBase, ProcessParamBase from rag.flow.parser.schema import ParserFromUpstream from rag.llm.cv_model import Base as VLM from rag.utils.storage_factory import STORAGE_IMPL class ParserParam(ProcessParamBase): def __init__(self): super().__init__() self.allowed_output_format = { "pdf": [ "json", "markdown", ], "spreadsheet": [ "json", "markdown", "html", ], "word": [ "json", "markdown", ], "slides": [ "json", ], "image": [ "text" ], "email": ["text", "json"], "text&markdown": [ "text", "json" ], "audio": [ "json" ], "video": [], } self.setups = { "pdf": { "parse_method": "deepdoc", # deepdoc/plain_text/tcadp_parser/vlm "lang": "Chinese", "suffix": [ "pdf", ], "output_format": "json", }, "spreadsheet": { "output_format": "html", "suffix": [ "xls", "xlsx", "csv", ], }, "word": { "suffix": [ "doc", "docx", ], "output_format": "json", }, "text&markdown": { "suffix": ["md", "markdown", "mdx", "txt"], "output_format": "json", }, "slides": { "suffix": [ "pptx", ], "output_format": "json", }, "image": { "parse_method": "ocr", "llm_id": "", "lang": "Chinese", "system_prompt": "", "suffix": ["jpg", "jpeg", "png", "gif"], "output_format": "text", }, "email": { "suffix": [ "eml", "msg" ], "fields": ["from", "to", "cc", "bcc", "date", "subject", "body", "attachments", "metadata"], "output_format": "json", }, "audio": { "suffix":[ "da", "wave", "wav", "mp3", "aac", "flac", "ogg", "aiff", "au", "midi", "wma", "realaudio", "vqf", "oggvorbis", "ape" ], "output_format": "text", }, "video": { "suffix":[ "mp4", "avi", "mkv" ], "output_format": "text", }, } def check(self): pdf_config = self.setups.get("pdf", {}) if pdf_config: pdf_parse_method = pdf_config.get("parse_method", "") self.check_empty(pdf_parse_method, "Parse method abnormal.") if pdf_parse_method.lower() not in ["deepdoc", "plain_text", "mineru", "tcadp parser"]: self.check_empty(pdf_config.get("lang", ""), "PDF VLM language") pdf_output_format = pdf_config.get("output_format", "") self.check_valid_value(pdf_output_format, "PDF output format abnormal.", self.allowed_output_format["pdf"]) spreadsheet_config = self.setups.get("spreadsheet", "") if spreadsheet_config: spreadsheet_output_format = spreadsheet_config.get("output_format", "") self.check_valid_value(spreadsheet_output_format, "Spreadsheet output format abnormal.", self.allowed_output_format["spreadsheet"]) doc_config = self.setups.get("word", "") if doc_config: doc_output_format = doc_config.get("output_format", "") self.check_valid_value(doc_output_format, "Word processer document output format abnormal.", self.allowed_output_format["word"]) slides_config = self.setups.get("slides", "") if slides_config: slides_output_format = slides_config.get("output_format", "") self.check_valid_value(slides_output_format, "Slides output format abnormal.", self.allowed_output_format["slides"]) image_config = self.setups.get("image", "") if image_config: image_parse_method = image_config.get("parse_method", "") if image_parse_method not in ["ocr"]: self.check_empty(image_config.get("lang", ""), "Image VLM language") text_config = self.setups.get("text&markdown", "") if text_config: text_output_format = text_config.get("output_format", "") self.check_valid_value(text_output_format, "Text output format abnormal.", self.allowed_output_format["text&markdown"]) audio_config = self.setups.get("audio", "") if audio_config: self.check_empty(audio_config.get("llm_id"), "Audio VLM") video_config = self.setups.get("video", "") if video_config: self.check_empty(video_config.get("llm_id"), "Video VLM") email_config = self.setups.get("email", "") if email_config: email_output_format = email_config.get("output_format", "") self.check_valid_value(email_output_format, "Email output format abnormal.", self.allowed_output_format["email"]) def get_input_form(self) -> dict[str, dict]: return {} class Parser(ProcessBase): component_name = "Parser" def _pdf(self, name, blob): self.callback(random.randint(1, 5) / 100.0, "Start to work on a PDF.") conf = self._param.setups["pdf"] self.set_output("output_format", conf["output_format"]) if conf.get("parse_method").lower() == "deepdoc": bboxes = RAGFlowPdfParser().parse_into_bboxes(blob, callback=self.callback) elif conf.get("parse_method").lower() == "plain_text": lines, _ = PlainParser()(blob) bboxes = [{"text": t} for t, _ in lines] elif conf.get("parse_method").lower() == "mineru": mineru_executable = os.environ.get("MINERU_EXECUTABLE", "mineru") mineru_api = os.environ.get("MINERU_APISERVER", "http://host.docker.internal:9987") pdf_parser = MinerUParser(mineru_path=mineru_executable, mineru_api=mineru_api) if not pdf_parser.check_installation(): raise RuntimeError("MinerU not found. Please install it via: pip install -U 'mineru[core]'.") lines, _ = pdf_parser.parse_pdf( filepath=name, binary=blob, callback=self.callback, output_dir=os.environ.get("MINERU_OUTPUT_DIR", ""), delete_output=bool(int(os.environ.get("MINERU_DELETE_OUTPUT", 1))), ) bboxes = [] for t, poss in lines: box = { "image": pdf_parser.crop(poss, 1), "positions": [[pos[0][-1], *pos[1:]] for pos in pdf_parser.extract_positions(poss)], "text": t, } bboxes.append(box) elif conf.get("parse_method").lower() == "tcadp parser": # ADP is a document parsing tool using Tencent Cloud API tcadp_parser = TCADPParser() sections, _ = tcadp_parser.parse_pdf( filepath=name, binary=blob, callback=self.callback, file_type="PDF", file_start_page=1, file_end_page=1000 ) bboxes = [] for section, position_tag in sections: if position_tag: # Extract position information from TCADP's position tag # Format: @@{page_number}\t{x0}\t{x1}\t{top}\t{bottom}## import re match = re.match(r"@@([0-9-]+)\t([0-9.]+)\t([0-9.]+)\t([0-9.]+)\t([0-9.]+)##", position_tag) if match: pn, x0, x1, top, bott = match.groups() bboxes.append({ "page_number": int(pn.split('-')[0]), # Take the first page number "x0": float(x0), "x1": float(x1), "top": float(top), "bottom": float(bott), "text": section }) else: # If no position info, add as text without position bboxes.append({"text": section}) else: bboxes.append({"text": section}) else: vision_model = LLMBundle(self._canvas._tenant_id, LLMType.IMAGE2TEXT, llm_name=conf.get("parse_method"), lang=self._param.setups["pdf"].get("lang")) lines, _ = VisionParser(vision_model=vision_model)(blob, callback=self.callback) bboxes = [] for t, poss in lines: for pn, x0, x1, top, bott in RAGFlowPdfParser.extract_positions(poss): bboxes.append({"page_number": int(pn[0]), "x0": float(x0), "x1": float(x1), "top": float(top), "bottom": float(bott), "text": t}) if conf.get("output_format") == "json": self.set_output("json", bboxes) if conf.get("output_format") == "markdown": mkdn = "" for b in bboxes: if b.get("layout_type", "") == "title": mkdn += "\n## " if b.get("layout_type", "") == "figure": mkdn += "\n![Image]({})".format(VLM.image2base64(b["image"])) continue mkdn += b.get("text", "") + "\n" self.set_output("markdown", mkdn) def _spreadsheet(self, name, blob): self.callback(random.randint(1, 5) / 100.0, "Start to work on a Spreadsheet.") conf = self._param.setups["spreadsheet"] self.set_output("output_format", conf["output_format"]) spreadsheet_parser = ExcelParser() if conf.get("output_format") == "html": htmls = spreadsheet_parser.html(blob, 1000000000) self.set_output("html", htmls[0]) elif conf.get("output_format") == "json": self.set_output("json", [{"text": txt} for txt in spreadsheet_parser(blob) if txt]) elif conf.get("output_format") == "markdown": self.set_output("markdown", spreadsheet_parser.markdown(blob)) def _word(self, name, blob): self.callback(random.randint(1, 5) / 100.0, "Start to work on a Word Processor Document") conf = self._param.setups["word"] self.set_output("output_format", conf["output_format"]) docx_parser = Docx() if conf.get("output_format") == "json": sections, tbls = docx_parser(name, binary=blob) sections = [{"text": section[0], "image": section[1]} for section in sections if section] sections.extend([{"text": tb, "image": None} for ((_,tb), _) in tbls]) self.set_output("json", sections) elif conf.get("output_format") == "markdown": markdown_text = docx_parser.to_markdown(name, binary=blob) self.set_output("markdown", markdown_text) def _slides(self, name, blob): from deepdoc.parser.ppt_parser import RAGFlowPptParser as ppt_parser self.callback(random.randint(1, 5) / 100.0, "Start to work on a PowerPoint Document") conf = self._param.setups["slides"] self.set_output("output_format", conf["output_format"]) ppt_parser = ppt_parser() txts = ppt_parser(blob, 0, 100000, None) sections = [{"text": section} for section in txts if section.strip()] # json assert conf.get("output_format") == "json", "have to be json for ppt" if conf.get("output_format") == "json": self.set_output("json", sections) def _markdown(self, name, blob): from functools import reduce from rag.app.naive import Markdown as naive_markdown_parser from rag.nlp import concat_img self.callback(random.randint(1, 5) / 100.0, "Start to work on a markdown.") conf = self._param.setups["text&markdown"] self.set_output("output_format", conf["output_format"]) markdown_parser = naive_markdown_parser() sections, tables = markdown_parser(name, blob, separate_tables=False) if conf.get("output_format") == "json": json_results = [] for section_text, _ in sections: json_result = { "text": section_text, } images = markdown_parser.get_pictures(section_text) if section_text else None if images: # If multiple images found, combine them using concat_img combined_image = reduce(concat_img, images) if len(images) > 1 else images[0] json_result["image"] = combined_image json_results.append(json_result) self.set_output("json", json_results) else: self.set_output("text", "\n".join([section_text for section_text, _ in sections])) def _image(self, name, blob): from deepdoc.vision import OCR self.callback(random.randint(1, 5) / 100.0, "Start to work on an image.") conf = self._param.setups["image"] self.set_output("output_format", conf["output_format"]) img = Image.open(io.BytesIO(blob)).convert("RGB") if conf["parse_method"] == "ocr": # use ocr, recognize chars only ocr = OCR() bxs = ocr(np.array(img)) # return boxes and recognize result txt = "\n".join([t[0] for _, t in bxs if t[0]]) else: lang = conf["lang"] # use VLM to describe the picture cv_model = LLMBundle(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, llm_name=conf["parse_method"], lang=lang) img_binary = io.BytesIO() img.save(img_binary, format="JPEG") img_binary.seek(0) system_prompt = conf.get("system_prompt") if system_prompt: txt = cv_model.describe_with_prompt(img_binary.read(), system_prompt) else: txt = cv_model.describe(img_binary.read()) self.set_output("text", txt) def _audio(self, name, blob): import os import tempfile self.callback(random.randint(1, 5) / 100.0, "Start to work on an audio.") conf = self._param.setups["audio"] self.set_output("output_format", conf["output_format"]) _, ext = os.path.splitext(name) with tempfile.NamedTemporaryFile(suffix=ext) as tmpf: tmpf.write(blob) tmpf.flush() tmp_path = os.path.abspath(tmpf.name) seq2txt_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.SPEECH2TEXT) txt = seq2txt_mdl.transcription(tmp_path) self.set_output("text", txt) def _video(self, name, blob): self.callback(random.randint(1, 5) / 100.0, "Start to work on an video.") conf = self._param.setups["video"] self.set_output("output_format", conf["output_format"]) cv_mdl = LLMBundle(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, llm_name=conf["llm_id"]) txt = cv_mdl.chat(system="", history=[], gen_conf={}, video_bytes=blob, filename=name) self.set_output("text", txt) def _email(self, name, blob): self.callback(random.randint(1, 5) / 100.0, "Start to work on an email.") email_content = {} conf = self._param.setups["email"] self.set_output("output_format", conf["output_format"]) target_fields = conf["fields"] _, ext = os.path.splitext(name) if ext == ".eml": # handle eml file from email import policy from email.parser import BytesParser msg = BytesParser(policy=policy.default).parse(io.BytesIO(blob)) email_content['metadata'] = {} # handle header info for header, value in msg.items(): # get fields like from, to, cc, bcc, date, subject if header.lower() in target_fields: email_content[header.lower()] = value # get metadata elif header.lower() not in ["from", "to", "cc", "bcc", "date", "subject"]: email_content["metadata"][header.lower()] = value # get body if "body" in target_fields: body_text, body_html = [], [] def _add_content(m, content_type): def _decode_payload(payload, charset, target_list): try: target_list.append(payload.decode(charset)) except (UnicodeDecodeError, LookupError): for enc in ["utf-8", "gb2312", "gbk", "gb18030", "latin1"]: try: target_list.append(payload.decode(enc)) break except UnicodeDecodeError: continue else: target_list.append(payload.decode("utf-8", errors="ignore")) if content_type == "text/plain": payload = msg.get_payload(decode=True) charset = msg.get_content_charset() or "utf-8" _decode_payload(payload, charset, body_text) elif content_type == "text/html": payload = msg.get_payload(decode=True) charset = msg.get_content_charset() or "utf-8" _decode_payload(payload, charset, body_html) elif "multipart" in content_type: if m.is_multipart(): for part in m.iter_parts(): _add_content(part, part.get_content_type()) _add_content(msg, msg.get_content_type()) email_content["text"] = "\n".join(body_text) email_content["text_html"] = "\n".join(body_html) # get attachment if "attachments" in target_fields: attachments = [] for part in msg.iter_attachments(): content_disposition = part.get("Content-Disposition") if content_disposition: dispositions = content_disposition.strip().split(";") if dispositions[0].lower() == "attachment": filename = part.get_filename() payload = part.get_payload(decode=True).decode(part.get_content_charset()) attachments.append({ "filename": filename, "payload": payload, }) email_content["attachments"] = attachments else: # handle msg file import extract_msg print("handle a msg file.") msg = extract_msg.Message(blob) # handle header info basic_content = { "from": msg.sender, "to": msg.to, "cc": msg.cc, "bcc": msg.bcc, "date": msg.date, "subject": msg.subject, } email_content.update({k: v for k, v in basic_content.items() if k in target_fields}) # get metadata email_content['metadata'] = { 'message_id': msg.messageId, 'in_reply_to': msg.inReplyTo, } # get body if "body" in target_fields: email_content["text"] = msg.body[0] if isinstance(msg.body, list) and msg.body else msg.body if not email_content["text"] and msg.htmlBody: email_content["text"] = msg.htmlBody[0] if isinstance(msg.htmlBody, list) and msg.htmlBody else msg.htmlBody # get attachments if "attachments" in target_fields: attachments = [] for t in msg.attachments: attachments.append({ "filename": t.name, "payload": t.data.decode("utf-8") }) email_content["attachments"] = attachments if conf["output_format"] == "json": self.set_output("json", [email_content]) else: content_txt = '' for k, v in email_content.items(): if isinstance(v, str): # basic info content_txt += f'{k}:{v}' + "\n" elif isinstance(v, dict): # metadata content_txt += f'{k}:{json.dumps(v)}' + "\n" elif isinstance(v, list): # attachments or others for fb in v: if isinstance(fb, dict): # attachments content_txt += f'{fb["filename"]}:{fb["payload"]}' + "\n" else: # str, usually plain text content_txt += fb self.set_output("text", content_txt) async def _invoke(self, **kwargs): function_map = { "pdf": self._pdf, "text&markdown": self._markdown, "spreadsheet": self._spreadsheet, "slides": self._slides, "word": self._word, "image": self._image, "audio": self._audio, "video": self._video, "email": self._email, } try: from_upstream = ParserFromUpstream.model_validate(kwargs) except Exception as e: self.set_output("_ERROR", f"Input error: {str(e)}") return name = from_upstream.name if self._canvas._doc_id: b, n = File2DocumentService.get_storage_address(doc_id=self._canvas._doc_id) blob = STORAGE_IMPL.get(b, n) else: blob = FileService.get_blob(from_upstream.file["created_by"], from_upstream.file["id"]) done = False for p_type, conf in self._param.setups.items(): if from_upstream.name.split(".")[-1].lower() not in conf.get("suffix", []): continue await trio.to_thread.run_sync(function_map[p_type], name, blob) done = True break if not done: raise Exception("No suitable for file extension: `.%s`" % from_upstream.name.split(".")[-1].lower()) outs = self.output() async with trio.open_nursery() as nursery: for d in outs.get("json", []): nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put, tenant_id=self._canvas._tenant_id), get_uuid())