# # Copyright 2025 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import json import logging import platform import re import subprocess import sys import tempfile import threading import time from io import BytesIO from os import PathLike from pathlib import Path from queue import Empty, Queue from typing import Any, Callable, Optional import numpy as np import pdfplumber from PIL import Image from strenum import StrEnum from deepdoc.parser.pdf_parser import RAGFlowPdfParser LOCK_KEY_pdfplumber = "global_shared_lock_pdfplumber" if LOCK_KEY_pdfplumber not in sys.modules: sys.modules[LOCK_KEY_pdfplumber] = threading.Lock() class MinerUContentType(StrEnum): IMAGE = "image" TABLE = "table" TEXT = "text" EQUATION = "equation" class MinerUParser(RAGFlowPdfParser): def __init__(self, mineru_path: str = "mineru"): self.mineru_path = Path(mineru_path) self.logger = logging.getLogger(self.__class__.__name__) def check_installation(self) -> bool: subprocess_kwargs = { "capture_output": True, "text": True, "check": True, "encoding": "utf-8", "errors": "ignore", } if platform.system() == "Windows": subprocess_kwargs["creationflags"] = getattr(subprocess, "CREATE_NO_WINDOW", 0) try: result = subprocess.run([str(self.mineru_path), "--version"], **subprocess_kwargs) version_info = result.stdout.strip() if version_info: logging.info(f"[MinerU] Detected version: {version_info}") else: logging.info("[MinerU] Detected MinerU, but version info is empty.") return True except subprocess.CalledProcessError as e: logging.warning(f"[MinerU] Execution failed (exit code {e.returncode}).") except FileNotFoundError: logging.warning("[MinerU] MinerU not found. Please install it via: pip install -U 'mineru[core]'") except Exception as e: logging.error(f"[MinerU] Unexpected error during installation check: {e}") return False def _run_mineru(self, input_path: Path, output_dir: Path, method: str = "auto", lang: Optional[str] = None): cmd = [str(self.mineru_path), "-p", str(input_path), "-o", str(output_dir), "-m", method] if lang: cmd.extend(["-l", lang]) self.logger.info(f"[MinerU] Running command: {' '.join(cmd)}") subprocess_kwargs = { "stdout": subprocess.PIPE, "stderr": subprocess.PIPE, "text": True, "encoding": "utf-8", "errors": "ignore", "bufsize": 1, } if platform.system() == "Windows": subprocess_kwargs["creationflags"] = getattr(subprocess, "CREATE_NO_WINDOW", 0) process = subprocess.Popen(cmd, **subprocess_kwargs) stdout_queue, stderr_queue = Queue(), Queue() def enqueue_output(pipe, queue, prefix): for line in iter(pipe.readline, ""): if line.strip(): queue.put((prefix, line.strip())) pipe.close() threading.Thread(target=enqueue_output, args=(process.stdout, stdout_queue, "STDOUT"), daemon=True).start() threading.Thread(target=enqueue_output, args=(process.stderr, stderr_queue, "STDERR"), daemon=True).start() while process.poll() is None: for q in (stdout_queue, stderr_queue): try: while True: prefix, line = q.get_nowait() if prefix == "STDOUT": self.logger.info(f"[MinerU] {line}") else: self.logger.warning(f"[MinerU] {line}") except Empty: pass time.sleep(0.1) return_code = process.wait() if return_code != 0: raise RuntimeError(f"[MinerU] Process failed with exit code {return_code}") self.logger.info("[MinerU] Command completed successfully.") def __images__(self, fnm, zoomin: int = 1, page_from=0, page_to=600, callback=None): self.page_from = page_from self.page_to = page_to try: with pdfplumber.open(fnm) if isinstance(fnm, (str, PathLike)) else pdfplumber.open(BytesIO(fnm)) as pdf: self.pdf = pdf self.page_images = [p.to_image(resolution=72 * zoomin, antialias=True).original for _, p in enumerate(self.pdf.pages[page_from:page_to])] except Exception as e: self.page_images = None self.total_page = 0 logging.exception(e) def _line_tag(self, bx): pn = [bx["page_idx"] + 1] positions = bx["bbox"] x0, top, x1, bott = positions if hasattr(self, "page_images") and self.page_images and len(self.page_images) > bx["page_idx"]: page_width, page_height = self.page_images[bx["page_idx"]].size x0 = (x0 / 1000.0) * page_width x1 = (x1 / 1000.0) * page_width top = (top / 1000.0) * page_height bott = (bott / 1000.0) * page_height return "@@{}\t{:.1f}\t{:.1f}\t{:.1f}\t{:.1f}##".format("-".join([str(p) for p in pn]), x0, x1, top, bott) def crop(self, text, ZM=1, need_position=False): imgs = [] poss = self.extract_positions(text) if not poss: if need_position: return None, None return max_width = max(np.max([right - left for (_, left, right, _, _) in poss]), 6) GAP = 6 pos = poss[0] poss.insert(0, ([pos[0][0]], pos[1], pos[2], max(0, pos[3] - 120), max(pos[3] - GAP, 0))) pos = poss[-1] poss.append(([pos[0][-1]], pos[1], pos[2], min(self.page_images[pos[0][-1]].size[1], pos[4] + GAP), min(self.page_images[pos[0][-1]].size[1], pos[4] + 120))) positions = [] for ii, (pns, left, right, top, bottom) in enumerate(poss): right = left + max_width if bottom <= top: bottom = top + 2 for pn in pns[1:]: bottom += self.page_images[pn - 1].size[1] img0 = self.page_images[pns[0]] x0, y0, x1, y1 = int(left), int(top), int(right), int(min(bottom, img0.size[1])) crop0 = img0.crop((x0, y0, x1, y1)) imgs.append(crop0) if 0 < ii < len(poss) - 1: positions.append((pns[0] + self.page_from, x0, x1, y0, y1)) bottom -= img0.size[1] for pn in pns[1:]: page = self.page_images[pn] x0, y0, x1, y1 = int(left), 0, int(right), int(min(bottom, page.size[1])) cimgp = page.crop((x0, y0, x1, y1)) imgs.append(cimgp) if 0 < ii < len(poss) - 1: positions.append((pn + self.page_from, x0, x1, y0, y1)) bottom -= page.size[1] if not imgs: if need_position: return None, None return height = 0 for img in imgs: height += img.size[1] + GAP height = int(height) width = int(np.max([i.size[0] for i in imgs])) pic = Image.new("RGB", (width, height), (245, 245, 245)) height = 0 for ii, img in enumerate(imgs): if ii == 0 or ii + 1 == len(imgs): img = img.convert("RGBA") overlay = Image.new("RGBA", img.size, (0, 0, 0, 0)) overlay.putalpha(128) img = Image.alpha_composite(img, overlay).convert("RGB") pic.paste(img, (0, int(height))) height += img.size[1] + GAP if need_position: return pic, positions return pic @staticmethod def extract_positions(txt: str): poss = [] for tag in re.findall(r"@@[0-9-]+\t[0-9.\t]+##", txt): pn, left, right, top, bottom = tag.strip("#").strip("@").split("\t") left, right, top, bottom = float(left), float(right), float(top), float(bottom) poss.append(([int(p) - 1 for p in pn.split("-")], left, right, top, bottom)) return poss def _read_output(self, output_dir: Path, file_stem: str, method: str = "auto") -> list[dict[str, Any]]: subdir = output_dir / file_stem / method json_file = subdir / f"{file_stem}_content_list.json" if not json_file.exists(): raise FileNotFoundError(f"[MinerU] Missing output file: {json_file}") with open(json_file, "r", encoding="utf-8") as f: data = json.load(f) for item in data: for key in ("img_path", "table_img_path", "equation_img_path"): if key in item and item[key]: item[key] = str((subdir / item[key]).resolve()) return data def _transfer_to_sections(self, outputs: list[dict[str, Any]]): sections = [] for output in outputs: match output["type"]: case MinerUContentType.TEXT: section = output["text"] case MinerUContentType.TABLE: section = output["table_body"] + "\n".join(output["table_caption"]) + "\n".join(output["table_footnote"]) case MinerUContentType.IMAGE: section = "".join(output["image_caption"]) + "\n" + "".join(output["image_footnote"]) case MinerUContentType.EQUATION: section = output["text"] if section: sections.append((section, self._line_tag(output))) return sections def _transfer_to_tables(self, outputs: list[dict[str, Any]]): return [] def parse_pdf( self, filepath: str | PathLike[str], binary: BytesIO | bytes, callback: Optional[Callable] = None, *, output_dir: Optional[str] = None, lang: Optional[str] = None, method: str = "auto", delete_output: bool = True, ) -> tuple: import shutil temp_pdf = None created_tmp_dir = False if binary: temp_dir = Path(tempfile.mkdtemp(prefix="mineru_bin_pdf_")) temp_pdf = temp_dir / Path(filepath).name with open(temp_pdf, "wb") as f: f.write(binary) pdf = temp_pdf self.logger.info(f"[MinerU] Received binary PDF -> {temp_pdf}") if callback: callback(0.15, f"[MinerU] Received binary PDF -> {temp_pdf}") else: pdf = Path(filepath) if not pdf.exists(): if callback: callback(-1, f"[MinerU] PDF not found: {pdf}") raise FileNotFoundError(f"[MinerU] PDF not found: {pdf}") if output_dir: out_dir = Path(output_dir) out_dir.mkdir(parents=True, exist_ok=True) else: out_dir = Path(tempfile.mkdtemp(prefix="mineru_pdf_")) created_tmp_dir = True self.logger.info(f"[MinerU] Output directory: {out_dir}") if callback: callback(0.15, f"[MinerU] Output directory: {out_dir}") self.__images__(pdf, zoomin=1) try: self._run_mineru(pdf, out_dir, method=method, lang=lang) outputs = self._read_output(out_dir, pdf.stem, method=method) self.logger.info(f"[MinerU] Parsed {len(outputs)} blocks from PDF.") if callback: callback(0.75, f"[MinerU] Parsed {len(outputs)} blocks from PDF.") return self._transfer_to_sections(outputs), self._transfer_to_tables(outputs) finally: if temp_pdf and temp_pdf.exists(): try: temp_pdf.unlink() temp_pdf.parent.rmdir() except Exception: pass if delete_output and created_tmp_dir and out_dir.exists(): try: shutil.rmtree(out_dir) except Exception: pass if __name__ == "__main__": parser = MinerUParser("mineru") print("MinerU available:", parser.check_installation()) filepath = "" with open(filepath, "rb") as file: outputs = parser.parse_pdf(filepath=filepath, binary=file.read()) for output in outputs: print(output)