diff --git a/deepdoc/parser/mineru_parser.py b/deepdoc/parser/mineru_parser.py new file mode 100644 index 000000000..61d385d3f --- /dev/null +++ b/deepdoc/parser/mineru_parser.py @@ -0,0 +1,344 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import json +import logging +import platform +import re +import subprocess +import sys +import tempfile +import threading +import time +from io import BytesIO +from os import PathLike +from pathlib import Path +from queue import Empty, Queue +from typing import Any, Callable, Optional + +import numpy as np +import pdfplumber +from PIL import Image +from strenum import StrEnum + +from deepdoc.parser.pdf_parser import RAGFlowPdfParser + +LOCK_KEY_pdfplumber = "global_shared_lock_pdfplumber" +if LOCK_KEY_pdfplumber not in sys.modules: + sys.modules[LOCK_KEY_pdfplumber] = threading.Lock() + + +class MinerUContentType(StrEnum): + IMAGE = "image" + TABLE = "table" + TEXT = "text" + EQUATION = "equation" + + +class MinerUParser(RAGFlowPdfParser): + def __init__(self, mineru_path: str = "mineru"): + self.mineru_path = Path(mineru_path) + self.logger = logging.getLogger(self.__class__.__name__) + + def check_installation(self) -> bool: + subprocess_kwargs = { + "capture_output": True, + "text": True, + "check": True, + "encoding": "utf-8", + "errors": "ignore", + } + + if platform.system() == "Windows": + subprocess_kwargs["creationflags"] = getattr(subprocess, "CREATE_NO_WINDOW", 0) + + try: + result = subprocess.run([str(self.mineru_path), "--version"], **subprocess_kwargs) + version_info = result.stdout.strip() + if version_info: + logging.info(f"[MinerU] Detected version: {version_info}") + else: + logging.info("[MinerU] Detected MinerU, but version info is empty.") + return True + except subprocess.CalledProcessError as e: + logging.warning(f"[MinerU] Execution failed (exit code {e.returncode}).") + except FileNotFoundError: + logging.warning("[MinerU] MinerU not found. Please install it via: pip install -U 'mineru[core]'") + except Exception as e: + logging.error(f"[MinerU] Unexpected error during installation check: {e}") + return False + + def _run_mineru(self, input_path: Path, output_dir: Path, method: str = "auto", lang: Optional[str] = None): + cmd = [str(self.mineru_path), "-p", str(input_path), "-o", str(output_dir), "-m", method] + if lang: + cmd.extend(["-l", lang]) + + self.logger.info(f"[MinerU] Running command: {' '.join(cmd)}") + + subprocess_kwargs = { + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + "text": True, + "encoding": "utf-8", + "errors": "ignore", + "bufsize": 1, + } + + if platform.system() == "Windows": + subprocess_kwargs["creationflags"] = getattr(subprocess, "CREATE_NO_WINDOW", 0) + + process = subprocess.Popen(cmd, **subprocess_kwargs) + stdout_queue, stderr_queue = Queue(), Queue() + + def enqueue_output(pipe, queue, prefix): + for line in iter(pipe.readline, ""): + if line.strip(): + queue.put((prefix, line.strip())) + pipe.close() + + threading.Thread(target=enqueue_output, args=(process.stdout, stdout_queue, "STDOUT"), daemon=True).start() + threading.Thread(target=enqueue_output, args=(process.stderr, stderr_queue, "STDERR"), daemon=True).start() + + while process.poll() is None: + for q in (stdout_queue, stderr_queue): + try: + while True: + prefix, line = q.get_nowait() + if prefix == "STDOUT": + self.logger.info(f"[MinerU] {line}") + else: + self.logger.warning(f"[MinerU] {line}") + except Empty: + pass + time.sleep(0.1) + + return_code = process.wait() + if return_code != 0: + raise RuntimeError(f"[MinerU] Process failed with exit code {return_code}") + self.logger.info("[MinerU] Command completed successfully.") + + def __images__(self, fnm, zoomin: int = 1, page_from=0, page_to=600, callback=None): + self.page_from = page_from + self.page_to = page_to + try: + with pdfplumber.open(fnm) if isinstance(fnm, (str, PathLike)) else pdfplumber.open(BytesIO(fnm)) as pdf: + self.pdf = pdf + self.page_images = [p.to_image(resolution=72 * zoomin, antialias=True).original for _, p in enumerate(self.pdf.pages[page_from:page_to])] + except Exception as e: + self.page_images = None + self.total_page = 0 + logging.exception(e) + + def _line_tag(self, bx): + pn = [bx["page_idx"] + 1] + positions = bx["bbox"] + x0, top, x1, bott = positions + + if hasattr(self, "page_images") and self.page_images and len(self.page_images) > bx["page_idx"]: + page_width, page_height = self.page_images[bx["page_idx"]].size + x0 = (x0 / 1000.0) * page_width + x1 = (x1 / 1000.0) * page_width + top = (top / 1000.0) * page_height + bott = (bott / 1000.0) * page_height + + return "@@{}\t{:.1f}\t{:.1f}\t{:.1f}\t{:.1f}##".format("-".join([str(p) for p in pn]), x0, x1, top, bott) + + def crop(self, text, ZM=1, need_position=False): + imgs = [] + poss = self.extract_positions(text) + if not poss: + if need_position: + return None, None + return + + max_width = max(np.max([right - left for (_, left, right, _, _) in poss]), 6) + GAP = 6 + pos = poss[0] + poss.insert(0, ([pos[0][0]], pos[1], pos[2], max(0, pos[3] - 120), max(pos[3] - GAP, 0))) + pos = poss[-1] + poss.append(([pos[0][-1]], pos[1], pos[2], min(self.page_images[pos[0][-1]].size[1], pos[4] + GAP), min(self.page_images[pos[0][-1]].size[1], pos[4] + 120))) + + positions = [] + for ii, (pns, left, right, top, bottom) in enumerate(poss): + right = left + max_width + + if bottom <= top: + bottom = top + 2 + + for pn in pns[1:]: + bottom += self.page_images[pn - 1].size[1] + + img0 = self.page_images[pns[0]] + x0, y0, x1, y1 = int(left), int(top), int(right), int(min(bottom, img0.size[1])) + crop0 = img0.crop((x0, y0, x1, y1)) + imgs.append(crop0) + if 0 < ii < len(poss) - 1: + positions.append((pns[0] + self.page_from, x0, x1, y0, y1)) + + bottom -= img0.size[1] + for pn in pns[1:]: + page = self.page_images[pn] + x0, y0, x1, y1 = int(left), 0, int(right), int(min(bottom, page.size[1])) + cimgp = page.crop((x0, y0, x1, y1)) + imgs.append(cimgp) + if 0 < ii < len(poss) - 1: + positions.append((pn + self.page_from, x0, x1, y0, y1)) + bottom -= page.size[1] + + if not imgs: + if need_position: + return None, None + return + + height = 0 + for img in imgs: + height += img.size[1] + GAP + height = int(height) + width = int(np.max([i.size[0] for i in imgs])) + pic = Image.new("RGB", (width, height), (245, 245, 245)) + height = 0 + for ii, img in enumerate(imgs): + if ii == 0 or ii + 1 == len(imgs): + img = img.convert("RGBA") + overlay = Image.new("RGBA", img.size, (0, 0, 0, 0)) + overlay.putalpha(128) + img = Image.alpha_composite(img, overlay).convert("RGB") + pic.paste(img, (0, int(height))) + height += img.size[1] + GAP + + if need_position: + return pic, positions + return pic + + @staticmethod + def extract_positions(txt: str): + poss = [] + for tag in re.findall(r"@@[0-9-]+\t[0-9.\t]+##", txt): + pn, left, right, top, bottom = tag.strip("#").strip("@").split("\t") + left, right, top, bottom = float(left), float(right), float(top), float(bottom) + poss.append(([int(p) - 1 for p in pn.split("-")], left, right, top, bottom)) + return poss + + def _read_output(self, output_dir: Path, file_stem: str, method: str = "auto") -> list[dict[str, Any]]: + subdir = output_dir / file_stem / method + json_file = subdir / f"{file_stem}_content_list.json" + + if not json_file.exists(): + raise FileNotFoundError(f"[MinerU] Missing output file: {json_file}") + + with open(json_file, "r", encoding="utf-8") as f: + data = json.load(f) + + for item in data: + for key in ("img_path", "table_img_path", "equation_img_path"): + if key in item and item[key]: + item[key] = str((subdir / item[key]).resolve()) + return data + + def _transfer_to_sections(self, outputs: list[dict[str, Any]]): + sections = [] + for output in outputs: + match output["type"]: + case MinerUContentType.TEXT: + section = output["text"] + case MinerUContentType.TABLE: + section = output["table_body"] + "\n".join(output["table_caption"]) + "\n".join(output["table_footnote"]) + case MinerUContentType.IMAGE: + section = "".join(output["image_caption"]) + "\n" + "".join(output["image_footnote"]) + case MinerUContentType.EQUATION: + section = output["text"] + + if section: + sections.append((section, self._line_tag(output))) + return sections + + def _transfer_to_tables(self, outputs: list[dict[str, Any]]): + return [] + + def parse_pdf( + self, + filepath: str | PathLike[str], + binary: BytesIO | bytes, + callback: Optional[Callable] = None, + *, + output_dir: Optional[str] = None, + lang: Optional[str] = None, + method: str = "auto", + delete_output: bool = True, + ) -> tuple: + import shutil + + temp_pdf = None + created_tmp_dir = False + + if binary: + temp_dir = Path(tempfile.mkdtemp(prefix="mineru_bin_pdf_")) + temp_pdf = temp_dir / Path(filepath).name + with open(temp_pdf, "wb") as f: + f.write(binary) + pdf = temp_pdf + self.logger.info(f"[MinerU] Received binary PDF -> {temp_pdf}") + if callback: + callback(0.15, f"[MinerU] Received binary PDF -> {temp_pdf}") + else: + pdf = Path(filepath) + if not pdf.exists(): + if callback: + callback(-1, f"[MinerU] PDF not found: {pdf}") + raise FileNotFoundError(f"[MinerU] PDF not found: {pdf}") + + if output_dir: + out_dir = Path(output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + else: + out_dir = Path(tempfile.mkdtemp(prefix="mineru_pdf_")) + created_tmp_dir = True + + self.logger.info(f"[MinerU] Output directory: {out_dir}") + if callback: + callback(0.15, f"[MinerU] Output directory: {out_dir}") + + self.__images__(pdf, zoomin=1) + + try: + self._run_mineru(pdf, out_dir, method=method, lang=lang) + outputs = self._read_output(out_dir, pdf.stem, method=method) + self.logger.info(f"[MinerU] Parsed {len(outputs)} blocks from PDF.") + if callback: + callback(0.75, f"[MinerU] Parsed {len(outputs)} blocks from PDF.") + return self._transfer_to_sections(outputs), self._transfer_to_tables(outputs) + finally: + if temp_pdf and temp_pdf.exists(): + try: + temp_pdf.unlink() + temp_pdf.parent.rmdir() + except Exception: + pass + if delete_output and created_tmp_dir and out_dir.exists(): + try: + shutil.rmtree(out_dir) + except Exception: + pass + + +if __name__ == "__main__": + parser = MinerUParser("mineru") + print("MinerU available:", parser.check_installation()) + + filepath = "" + with open(filepath, "rb") as file: + outputs = parser.parse_pdf(filepath=filepath, binary=file.read()) + for output in outputs: + print(output) diff --git a/rag/app/naive.py b/rag/app/naive.py index 370b503ad..e32b8be18 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -15,6 +15,7 @@ # import logging +import os import re from functools import reduce from io import BytesIO @@ -32,6 +33,7 @@ from api.db import LLMType from api.db.services.llm_service import LLMBundle from deepdoc.parser import DocxParser, ExcelParser, HtmlParser, JsonParser, MarkdownElementExtractor, MarkdownParser, PdfParser, TxtParser from deepdoc.parser.figure_parser import VisionFigureParser, vision_figure_parser_figure_data_wrapper +from deepdoc.parser.mineru_parser import MinerUParser from deepdoc.parser.pdf_parser import PlainParser, VisionParser from rag.nlp import concat_img, find_codec, naive_merge, naive_merge_with_images, naive_merge_docx, rag_tokenizer, tokenize_chunks, tokenize_chunks_with_images, tokenize_table @@ -517,7 +519,22 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, res = tokenize_table(tables, doc, is_english) callback(0.8, "Finish parsing.") + elif layout_recognizer == "MinerU": + mineru_executable = os.environ.get("MINERU_EXECUTABLE", "mineru") + pdf_parser = MinerUParser(mineru_path=mineru_executable) + if not pdf_parser.check_installation(): + callback(-1, "MinerU not found.") + return res + sections, tables = pdf_parser.parse_pdf( + filepath=filename, + binary=binary, + callback=callback, + output_dir=os.environ.get("MINERU_OUTPUT_DIR", ""), + delete_output=bool(int(os.environ.get("MINERU_DELETE_OUTPUT", 1))), + ) + parser_config["chunk_token_num"] = 0 + callback(0.8, "Finish parsing.") else: if layout_recognizer == "Plain Text": pdf_parser = PlainParser()