Refa: only support MinerU-API now (#11977)

### What problem does this PR solve?

Only support MinerU-API now, still need to complete frontend for
pipeline to allow the configuration of MinerU options.

### Type of change

- [x] Refactoring
This commit is contained in:
Yongteng Lei
2025-12-17 12:58:48 +08:00
committed by GitHub
parent 5e05f43c3d
commit 03f9be7cbb
19 changed files with 273 additions and 624 deletions

View File

@ -16,19 +16,15 @@
import json
import logging
import os
import platform
import re
import subprocess
import sys
import tempfile
import threading
import time
import zipfile
from dataclasses import dataclass
from io import BytesIO
from os import PathLike
from pathlib import Path
from queue import Empty, Queue
from typing import Any, Callable, Optional
import numpy as np
@ -137,10 +133,8 @@ class MinerUParseOptions:
class MinerUParser(RAGFlowPdfParser):
def __init__(self, mineru_path: str = "mineru", mineru_api: str = "", mineru_server_url: str = ""):
self.mineru_path = Path(mineru_path)
self.mineru_api = mineru_api.rstrip("/")
self.mineru_server_url = mineru_server_url.rstrip("/")
self.using_api = False
self.outlines = []
self.logger = logging.getLogger(self.__class__.__name__)
@ -189,105 +183,59 @@ class MinerUParser(RAGFlowPdfParser):
def check_installation(self, backend: str = "pipeline", server_url: Optional[str] = None) -> tuple[bool, str]:
reason = ""
valid_backends = ["pipeline", "vlm-http-client", "vlm-transformers", "vlm-vllm-engine", "vlm-mlx-engine"]
valid_backends = ["pipeline", "vlm-http-client", "vlm-transformers", "vlm-vllm-engine", "vlm-mlx-engine", "vlm-vllm-async-engine", "vlm-lmdeploy-engine"]
if backend not in valid_backends:
reason = "[MinerU] Invalid backend '{backend}'. Valid backends are: {valid_backends}"
reason = f"[MinerU] Invalid backend '{backend}'. Valid backends are: {valid_backends}"
self.logger.warning(reason)
return False, reason
subprocess_kwargs = {
"capture_output": True,
"text": True,
"check": True,
"encoding": "utf-8",
"errors": "ignore",
}
if platform.system() == "Windows":
subprocess_kwargs["creationflags"] = getattr(subprocess, "CREATE_NO_WINDOW", 0)
if server_url is None:
server_url = self.mineru_server_url
if backend == "vlm-http-client" and server_url:
try:
server_accessible = self._is_http_endpoint_valid(server_url + "/openapi.json")
self.logger.info(f"[MinerU] vlm-http-client server check: {server_accessible}")
if server_accessible:
self.using_api = False # We are using http client, not API
return True, reason
else:
reason = f"[MinerU] vlm-http-client server not accessible: {server_url}"
self.logger.warning(f"[MinerU] vlm-http-client server not accessible: {server_url}")
return False, reason
except Exception as e:
self.logger.warning(f"[MinerU] vlm-http-client server check failed: {e}")
try:
response = requests.get(server_url, timeout=5)
self.logger.info(
f"[MinerU] vlm-http-client server connection check: success with status {response.status_code}")
self.using_api = False
return True, reason
except Exception as e:
reason = f"[MinerU] vlm-http-client server connection check failed: {server_url}: {e}"
self.logger.warning(f"[MinerU] vlm-http-client server connection check failed: {server_url}: {e}")
return False, reason
if not self.mineru_api:
reason = "[MinerU] MINERU_APISERVER not configured."
self.logger.warning(reason)
return False, reason
api_openapi = f"{self.mineru_api}/openapi.json"
try:
result = subprocess.run([str(self.mineru_path), "--version"], **subprocess_kwargs)
version_info = result.stdout.strip()
if version_info:
self.logger.info(f"[MinerU] Detected version: {version_info}")
else:
self.logger.info("[MinerU] Detected MinerU, but version info is empty.")
return True, reason
except subprocess.CalledProcessError as e:
self.logger.warning(f"[MinerU] Execution failed (exit code {e.returncode}).")
except FileNotFoundError:
self.logger.warning("[MinerU] MinerU not found. Please install it via: pip install -U 'mineru[core]'")
except Exception as e:
self.logger.error(f"[MinerU] Unexpected error during installation check: {e}")
# If executable check fails, try API check
try:
if self.mineru_api:
# check openapi.json
openapi_exists = self._is_http_endpoint_valid(self.mineru_api + "/openapi.json")
if not openapi_exists:
reason = "[MinerU] Failed to detect vaild MinerU API server"
return openapi_exists, reason
self.logger.info(f"[MinerU] Detected {self.mineru_api}/openapi.json: {openapi_exists}")
self.using_api = openapi_exists
return openapi_exists, reason
else:
reason = "[MinerU] api not exists. Setting MINERU_SERVER_URL if your backend is vlm-http-client."
self.logger.info(reason)
api_ok = self._is_http_endpoint_valid(api_openapi)
self.logger.info(f"[MinerU] API openapi.json reachable={api_ok} url={api_openapi}")
if not api_ok:
reason = f"[MinerU] MinerU API not accessible: {api_openapi}"
return False, reason
except Exception as e:
reason = f"[MinerU] Unexpected error during api check: {e}"
self.logger.error(f"[MinerU] Unexpected error during api check: {e}")
return False, reason
except Exception as exc:
reason = f"[MinerU] MinerU API check failed: {exc}"
self.logger.warning(reason)
return False, reason
if backend == "vlm-http-client":
resolved_server = server_url or self.mineru_server_url
if not resolved_server:
reason = "[MinerU] MINERU_SERVER_URL required for vlm-http-client backend."
self.logger.warning(reason)
return False, reason
try:
server_ok = self._is_http_endpoint_valid(resolved_server)
self.logger.info(f"[MinerU] vlm-http-client server check reachable={server_ok} url={resolved_server}")
except Exception as exc:
self.logger.warning(f"[MinerU] vlm-http-client server probe failed: {resolved_server}: {exc}")
return True, reason
def _run_mineru(
self, input_path: Path, output_dir: Path, options: MinerUParseOptions, callback: Optional[Callable] = None
):
if self.using_api:
self._run_mineru_api(input_path, output_dir, options, callback)
else:
self._run_mineru_executable(input_path, output_dir, options, callback)
def _run_mineru_api(self, input_path: Path, output_dir: Path, options: MinerUParseOptions,
callback: Optional[Callable] = None):
output_zip_path = os.path.join(str(output_dir), "output.zip")
self, input_path: Path, output_dir: Path, options: MinerUParseOptions, callback: Optional[Callable] = None
) -> Path:
return self._run_mineru_api(input_path, output_dir, options, callback)
def _run_mineru_api(
self, input_path: Path, output_dir: Path, options: MinerUParseOptions, callback: Optional[Callable] = None
) -> Path:
pdf_file_path = str(input_path)
if not os.path.exists(pdf_file_path):
raise RuntimeError(f"[MinerU] PDF file not exists: {pdf_file_path}")
pdf_file_name = Path(pdf_file_path).stem.strip()
output_path = os.path.join(str(output_dir), pdf_file_name, options.method)
os.makedirs(output_path, exist_ok=True)
output_path = tempfile.mkdtemp(prefix=f"{pdf_file_name}_{options.method}_", dir=str(output_dir))
output_zip_path = os.path.join(str(output_dir), f"{Path(output_path).name}.zip")
files = {"files": (pdf_file_name + ".pdf", open(pdf_file_path, "rb"), "application/pdf")}
@ -309,9 +257,19 @@ class MinerUParser(RAGFlowPdfParser):
"end_page_id": 99999,
}
if options.server_url:
data["server_url"] = options.server_url
elif self.mineru_server_url:
data["server_url"] = self.mineru_server_url
print("--------------------------------", flush=True)
print(f"{data=}", flush=True)
print(f"{options=}", flush=True)
print("--------------------------------", flush=True)
headers = {"Accept": "application/json"}
try:
self.logger.info(f"[MinerU] invoke api: {self.mineru_api}/file_parse")
self.logger.info(f"[MinerU] invoke api: {self.mineru_api}/file_parse backend={options.backend} server_url={data.get('server_url')}")
if callback:
callback(0.20, f"[MinerU] invoke api: {self.mineru_api}/file_parse")
response = requests.post(url=f"{self.mineru_api}/file_parse", files=files, data=data, headers=headers,
@ -333,65 +291,11 @@ class MinerUParser(RAGFlowPdfParser):
if callback:
callback(0.40, f"[MinerU] Unzip to {output_path}...")
else:
self.logger.warning("[MinerU] not zip returned from api%s " % response.headers.get("Content-Type"))
self.logger.warning(f"[MinerU] not zip returned from api: {response.headers.get('Content-Type')}")
except Exception as e:
raise RuntimeError(f"[MinerU] api failed with exception {e}")
self.logger.info("[MinerU] Api completed successfully.")
def _run_mineru_executable(
self, input_path: Path, output_dir: Path, options: MinerUParseOptions, callback: Optional[Callable] = None
):
cmd = [str(self.mineru_path), "-p", str(input_path), "-o", str(output_dir), "-m", options.method]
if options.backend:
cmd.extend(["-b", options.backend])
if options.lang:
cmd.extend(["-l", options.lang])
if options.server_url and options.backend == "vlm-http-client":
cmd.extend(["-u", options.server_url])
self.logger.info(f"[MinerU] Running command: {' '.join(cmd)}")
subprocess_kwargs = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"text": True,
"encoding": "utf-8",
"errors": "ignore",
"bufsize": 1,
}
if platform.system() == "Windows":
subprocess_kwargs["creationflags"] = getattr(subprocess, "CREATE_NO_WINDOW", 0)
process = subprocess.Popen(cmd, **subprocess_kwargs)
stdout_queue, stderr_queue = Queue(), Queue()
def enqueue_output(pipe, queue, prefix):
for line in iter(pipe.readline, ""):
if line.strip():
queue.put((prefix, line.strip()))
pipe.close()
threading.Thread(target=enqueue_output, args=(process.stdout, stdout_queue, "STDOUT"), daemon=True).start()
threading.Thread(target=enqueue_output, args=(process.stderr, stderr_queue, "STDERR"), daemon=True).start()
while process.poll() is None:
for q in (stdout_queue, stderr_queue):
try:
while True:
prefix, line = q.get_nowait()
if prefix == "STDOUT":
self.logger.info(f"[MinerU] {line}")
else:
self.logger.warning(f"[MinerU] {line}")
except Empty:
pass
time.sleep(0.1)
return_code = process.wait()
if return_code != 0:
raise RuntimeError(f"[MinerU] Process failed with exit code {return_code}")
self.logger.info("[MinerU] Command completed successfully.")
return Path(output_path)
def __images__(self, fnm, zoomin: int = 1, page_from=0, page_to=600, callback=None):
self.page_from = page_from
@ -554,25 +458,6 @@ class MinerUParser(RAGFlowPdfParser):
def _read_output(self, output_dir: Path, file_stem: str, method: str = "auto", backend: str = "pipeline") -> list[
dict[str, Any]]:
candidates = []
seen = set()
def add_candidate_path(p: Path):
if p not in seen:
seen.add(p)
candidates.append(p)
if backend.startswith("vlm-"):
add_candidate_path(output_dir / file_stem / "vlm")
if method:
add_candidate_path(output_dir / file_stem / method)
add_candidate_path(output_dir / file_stem / "auto")
else:
if method:
add_candidate_path(output_dir / file_stem / method)
add_candidate_path(output_dir / file_stem / "vlm")
add_candidate_path(output_dir / file_stem / "auto")
json_file = None
subdir = None
attempted = []
@ -588,33 +473,28 @@ class MinerUParser(RAGFlowPdfParser):
safe_stem = _sanitize_filename(file_stem)
allowed_names = {f"{file_stem}_content_list.json", f"{safe_stem}_content_list.json"}
self.logger.info(f"[MinerU] Expected output files: {', '.join(sorted(allowed_names))}")
self.logger.info(f"[MinerU] Searching output candidates: {', '.join(str(c) for c in candidates)}")
self.logger.info(f"[MinerU] Searching output in: {output_dir}")
for sub in candidates:
jf = sub / f"{file_stem}_content_list.json"
self.logger.info(f"[MinerU] Trying original path: {jf}")
attempted.append(jf)
if jf.exists():
subdir = sub
json_file = jf
break
# MinerU API sanitizes non-ASCII filenames inside the ZIP root and file names.
alt = sub / f"{safe_stem}_content_list.json"
jf = output_dir / f"{file_stem}_content_list.json"
self.logger.info(f"[MinerU] Trying original path: {jf}")
attempted.append(jf)
if jf.exists():
subdir = output_dir
json_file = jf
else:
alt = output_dir / f"{safe_stem}_content_list.json"
self.logger.info(f"[MinerU] Trying sanitized filename: {alt}")
attempted.append(alt)
if alt.exists():
subdir = sub
subdir = output_dir
json_file = alt
break
nested_alt = sub / safe_stem / f"{safe_stem}_content_list.json"
self.logger.info(f"[MinerU] Trying sanitized nested path: {nested_alt}")
attempted.append(nested_alt)
if nested_alt.exists():
subdir = nested_alt.parent
json_file = nested_alt
break
else:
nested_alt = output_dir / safe_stem / f"{safe_stem}_content_list.json"
self.logger.info(f"[MinerU] Trying sanitized nested path: {nested_alt}")
attempted.append(nested_alt)
if nested_alt.exists():
subdir = nested_alt.parent
json_file = nested_alt
if not json_file:
raise FileNotFoundError(f"[MinerU] Missing output file, tried: {', '.join(str(p) for p in attempted)}")
@ -680,12 +560,12 @@ class MinerUParser(RAGFlowPdfParser):
temp_pdf = None
created_tmp_dir = False
# Assuming the dict is defined as shown
lang = kwargs.get('lang', 'English')
mineru_lang_code = LANGUAGE_TO_MINERU_MAP.get(lang, 'ch') # Returns 'ch' if lang not found
mineru_method_raw_str = kwargs.get('parser_config', {}).get('mineru_parse_method', 'auto')
enable_formula = kwargs.get('parser_config', {}).get('mineru_formula_enable', True)
enable_table = kwargs.get('parser_config', {}).get('mineru_enable', True)
parser_cfg = kwargs.get('parser_config', {})
lang = parser_cfg.get('mineru_lang') or kwargs.get('lang', 'English')
mineru_lang_code = LANGUAGE_TO_MINERU_MAP.get(lang, 'ch') # Defaults to Chinese if not matched
mineru_method_raw_str = parser_cfg.get('mineru_parse_method', 'auto')
enable_formula = parser_cfg.get('mineru_formula_enable', True)
enable_table = parser_cfg.get('mineru_table_enable', True)
# remove spaces, or mineru crash, and _read_output fail too
file_path = Path(filepath)
@ -718,7 +598,7 @@ class MinerUParser(RAGFlowPdfParser):
out_dir = Path(tempfile.mkdtemp(prefix="mineru_pdf_"))
created_tmp_dir = True
self.logger.info(f"[MinerU] Output directory: {out_dir}")
self.logger.info(f"[MinerU] Output directory: {out_dir} backend={backend} api={self.mineru_api} server_url={server_url or self.mineru_server_url}")
if callback:
callback(0.15, f"[MinerU] Output directory: {out_dir}")
@ -735,8 +615,8 @@ class MinerUParser(RAGFlowPdfParser):
formula_enable=enable_formula,
table_enable=enable_table,
)
self._run_mineru(pdf, out_dir, options, callback=callback)
outputs = self._read_output(out_dir, pdf.stem, method=mineru_method_raw_str, backend=backend)
final_out_dir = self._run_mineru(pdf, out_dir, options, callback=callback)
outputs = self._read_output(final_out_dir, pdf.stem, method=mineru_method_raw_str, backend=backend)
self.logger.info(f"[MinerU] Parsed {len(outputs)} blocks from PDF.")
if callback:
callback(0.75, f"[MinerU] Parsed {len(outputs)} blocks from PDF.")