From 9a486e0f512dc781a6b5e4c10be4d8fa1c7f035c Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Mon, 3 Nov 2025 19:26:09 +0800 Subject: [PATCH] Move some funcs from api to rag module (#10972) ### What problem does this PR solve? As title ### Type of change - [x] Refactoring Signed-off-by: Jin Hai --- api/utils/file_utils.py | 251 -------------------------------------- rag/app/naive.py | 2 +- rag/utils/file_utils.py | 263 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 264 insertions(+), 252 deletions(-) create mode 100644 rag/utils/file_utils.py diff --git a/api/utils/file_utils.py b/api/utils/file_utils.py index 6aa8e7f37..5f0fa70f4 100644 --- a/api/utils/file_utils.py +++ b/api/utils/file_utils.py @@ -17,26 +17,14 @@ # Standard library imports import base64 -import hashlib -import io import re import shutil import subprocess import sys import tempfile import threading -import zipfile -import requests -import PyPDF2 -from docx import Document from io import BytesIO -from requests.exceptions import Timeout, RequestException -# Typing -from typing import List, Union, Tuple, Optional, Dict - -# Third-party imports -import olefile import pdfplumber from PIL import Image @@ -176,242 +164,3 @@ def read_potential_broken_pdf(blob): return repaired return blob - - - -def _is_zip(h: bytes) -> bool: - return h.startswith(b"PK\x03\x04") or h.startswith(b"PK\x05\x06") or h.startswith(b"PK\x07\x08") - -def _is_pdf(h: bytes) -> bool: - return h.startswith(b"%PDF-") - -def _is_ole(h: bytes) -> bool: - return h.startswith(b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1") - -def _sha10(b: bytes) -> str: - return hashlib.sha256(b).hexdigest()[:10] - -def _guess_ext(b: bytes) -> str: - h = b[:8] - if _is_zip(h): - try: - with zipfile.ZipFile(io.BytesIO(b), "r") as z: - names = [n.lower() for n in z.namelist()] - if any(n.startswith("word/") for n in names): - return ".docx" - if any(n.startswith("ppt/") for n in names): - return ".pptx" - if any(n.startswith("xl/") for n in names): - return ".xlsx" - except Exception: - pass - return ".zip" - if _is_pdf(h): - return ".pdf" - if _is_ole(h): - return ".doc" - return ".bin" - -# Try to extract the real embedded payload from OLE's Ole10Native -def _extract_ole10native_payload(data: bytes) -> bytes: - try: - pos = 0 - if len(data) < 4: - return data - _ = int.from_bytes(data[pos:pos+4], "little") - pos += 4 - # filename/src/tmp (NUL-terminated ANSI) - for _ in range(3): - z = data.index(b"\x00", pos) - pos = z + 1 - # skip unknown 4 bytes - pos += 4 - if pos + 4 > len(data): - return data - size = int.from_bytes(data[pos:pos+4], "little") - pos += 4 - if pos + size <= len(data): - return data[pos:pos+size] - except Exception: - pass - return data - -def extract_embed_file(target: Union[bytes, bytearray]) -> List[Tuple[str, bytes]]: - """ - Only extract the 'first layer' of embedding, returning raw (filename, bytes). - """ - top = bytes(target) - head = top[:8] - out: List[Tuple[str, bytes]] = [] - seen = set() - - def push(b: bytes, name_hint: str = ""): - h10 = _sha10(b) - if h10 in seen: - return - seen.add(h10) - ext = _guess_ext(b) - # If name_hint has an extension use its basename; else fallback to guessed ext - if "." in name_hint: - fname = name_hint.split("/")[-1] - else: - fname = f"{h10}{ext}" - out.append((fname, b)) - - # OOXML/ZIP container (docx/xlsx/pptx) - if _is_zip(head): - try: - with zipfile.ZipFile(io.BytesIO(top), "r") as z: - embed_dirs = ( - "word/embeddings/", "word/objects/", "word/activex/", - "xl/embeddings/", "ppt/embeddings/" - ) - for name in z.namelist(): - low = name.lower() - if any(low.startswith(d) for d in embed_dirs): - try: - b = z.read(name) - push(b, name) - except Exception: - pass - except Exception: - pass - return out - - # OLE container (doc/ppt/xls) - if _is_ole(head): - try: - with olefile.OleFileIO(io.BytesIO(top)) as ole: - for entry in ole.listdir(): - p = "/".join(entry) - try: - data = ole.openstream(entry).read() - except Exception: - continue - if not data: - continue - if "Ole10Native" in p or "ole10native" in p.lower(): - data = _extract_ole10native_payload(data) - push(data, p) - except Exception: - pass - return out - - return out - - -def extract_links_from_docx(docx_bytes: bytes): - """ - Extract all hyperlinks from a Word (.docx) document binary stream. - - Args: - docx_bytes (bytes): Raw bytes of a .docx file. - - Returns: - set[str]: A set of unique hyperlink URLs. - """ - links = set() - with BytesIO(docx_bytes) as bio: - document = Document(bio) - - # Each relationship may represent a hyperlink, image, footer, etc. - for rel in document.part.rels.values(): - if rel.reltype == ( - "http://schemas.openxmlformats.org/officeDocument/2006/relationships/hyperlink" - ): - links.add(rel.target_ref) - - return links - - -def extract_links_from_pdf(pdf_bytes: bytes): - """ - Extract all clickable hyperlinks from a PDF binary stream. - - Args: - pdf_bytes (bytes): Raw bytes of a PDF file. - - Returns: - set[str]: A set of unique hyperlink URLs (unordered). - """ - links = set() - with BytesIO(pdf_bytes) as bio: - pdf = PyPDF2.PdfReader(bio) - - for page in pdf.pages: - annots = page.get("/Annots") - if not annots or isinstance(annots, PyPDF2.generic.IndirectObject): - continue - for annot in annots: - obj = annot.get_object() - a = obj.get("/A") - if a and a.get("/URI"): - links.add(a["/URI"]) - - return links - - -_GLOBAL_SESSION: Optional[requests.Session] = None -def _get_session(headers: Optional[Dict[str, str]] = None) -> requests.Session: - """Get or create a global reusable session.""" - global _GLOBAL_SESSION - if _GLOBAL_SESSION is None: - _GLOBAL_SESSION = requests.Session() - _GLOBAL_SESSION.headers.update({ - "User-Agent": ( - "Mozilla/5.0 (X11; Linux x86_64) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/121.0 Safari/537.36" - ) - }) - if headers: - _GLOBAL_SESSION.headers.update(headers) - return _GLOBAL_SESSION - - -def extract_html( - url: str, - timeout: float = 60.0, - headers: Optional[Dict[str, str]] = None, - max_retries: int = 2, -) -> Tuple[Optional[bytes], Dict[str, str]]: - """ - Extract the full HTML page as raw bytes from a given URL. - Automatically reuses a persistent HTTP session and applies robust timeout & retry logic. - - Args: - url (str): Target webpage URL. - timeout (float): Request timeout in seconds (applies to connect + read). - headers (dict, optional): Extra HTTP headers. - max_retries (int): Number of retries on timeout or transient errors. - - Returns: - tuple(bytes|None, dict): - - html_bytes: Raw HTML content (or None if failed) - - metadata: HTTP info (status_code, content_type, final_url, error if any) - """ - sess = _get_session(headers=headers) - metadata = {"final_url": url, "status_code": "", "content_type": "", "error": ""} - - for attempt in range(1, max_retries + 1): - try: - resp = sess.get(url, timeout=timeout) - resp.raise_for_status() - - html_bytes = resp.content - metadata.update({ - "final_url": resp.url, - "status_code": str(resp.status_code), - "content_type": resp.headers.get("Content-Type", ""), - }) - return html_bytes, metadata - - except Timeout: - metadata["error"] = f"Timeout after {timeout}s (attempt {attempt}/{max_retries})" - if attempt >= max_retries: - continue - except RequestException as e: - metadata["error"] = f"Request failed: {e}" - continue - - return None, metadata \ No newline at end of file diff --git a/rag/app/naive.py b/rag/app/naive.py index a4bdbdae3..0c05f9dc0 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -30,7 +30,7 @@ from tika import parser from api.db import LLMType from api.db.services.llm_service import LLMBundle -from api.utils.file_utils import extract_embed_file, extract_links_from_pdf, extract_links_from_docx, extract_html +from rag.utils.file_utils import extract_embed_file, extract_links_from_pdf, extract_links_from_docx, extract_html from deepdoc.parser import DocxParser, ExcelParser, HtmlParser, JsonParser, MarkdownElementExtractor, MarkdownParser, PdfParser, TxtParser from deepdoc.parser.figure_parser import VisionFigureParser,vision_figure_parser_docx_wrapper,vision_figure_parser_pdf_wrapper from deepdoc.parser.pdf_parser import PlainParser, VisionParser diff --git a/rag/utils/file_utils.py b/rag/utils/file_utils.py new file mode 100644 index 000000000..18cdc35e1 --- /dev/null +++ b/rag/utils/file_utils.py @@ -0,0 +1,263 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import io +import hashlib +import zipfile +import requests +from requests.exceptions import Timeout, RequestException +from io import BytesIO +from typing import List, Union, Tuple, Optional, Dict +import PyPDF2 +from docx import Document +import olefile + +def _is_zip(h: bytes) -> bool: + return h.startswith(b"PK\x03\x04") or h.startswith(b"PK\x05\x06") or h.startswith(b"PK\x07\x08") + +def _is_pdf(h: bytes) -> bool: + return h.startswith(b"%PDF-") + +def _is_ole(h: bytes) -> bool: + return h.startswith(b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1") + +def _sha10(b: bytes) -> str: + return hashlib.sha256(b).hexdigest()[:10] + +def _guess_ext(b: bytes) -> str: + h = b[:8] + if _is_zip(h): + try: + with zipfile.ZipFile(io.BytesIO(b), "r") as z: + names = [n.lower() for n in z.namelist()] + if any(n.startswith("word/") for n in names): + return ".docx" + if any(n.startswith("ppt/") for n in names): + return ".pptx" + if any(n.startswith("xl/") for n in names): + return ".xlsx" + except Exception: + pass + return ".zip" + if _is_pdf(h): + return ".pdf" + if _is_ole(h): + return ".doc" + return ".bin" + +# Try to extract the real embedded payload from OLE's Ole10Native +def _extract_ole10native_payload(data: bytes) -> bytes: + try: + pos = 0 + if len(data) < 4: + return data + _ = int.from_bytes(data[pos:pos+4], "little") + pos += 4 + # filename/src/tmp (NUL-terminated ANSI) + for _ in range(3): + z = data.index(b"\x00", pos) + pos = z + 1 + # skip unknown 4 bytes + pos += 4 + if pos + 4 > len(data): + return data + size = int.from_bytes(data[pos:pos+4], "little") + pos += 4 + if pos + size <= len(data): + return data[pos:pos+size] + except Exception: + pass + return data + +def extract_embed_file(target: Union[bytes, bytearray]) -> List[Tuple[str, bytes]]: + """ + Only extract the 'first layer' of embedding, returning raw (filename, bytes). + """ + top = bytes(target) + head = top[:8] + out: List[Tuple[str, bytes]] = [] + seen = set() + + def push(b: bytes, name_hint: str = ""): + h10 = _sha10(b) + if h10 in seen: + return + seen.add(h10) + ext = _guess_ext(b) + # If name_hint has an extension use its basename; else fallback to guessed ext + if "." in name_hint: + fname = name_hint.split("/")[-1] + else: + fname = f"{h10}{ext}" + out.append((fname, b)) + + # OOXML/ZIP container (docx/xlsx/pptx) + if _is_zip(head): + try: + with zipfile.ZipFile(io.BytesIO(top), "r") as z: + embed_dirs = ( + "word/embeddings/", "word/objects/", "word/activex/", + "xl/embeddings/", "ppt/embeddings/" + ) + for name in z.namelist(): + low = name.lower() + if any(low.startswith(d) for d in embed_dirs): + try: + b = z.read(name) + push(b, name) + except Exception: + pass + except Exception: + pass + return out + + # OLE container (doc/ppt/xls) + if _is_ole(head): + try: + with olefile.OleFileIO(io.BytesIO(top)) as ole: + for entry in ole.listdir(): + p = "/".join(entry) + try: + data = ole.openstream(entry).read() + except Exception: + continue + if not data: + continue + if "Ole10Native" in p or "ole10native" in p.lower(): + data = _extract_ole10native_payload(data) + push(data, p) + except Exception: + pass + return out + + return out + + +def extract_links_from_docx(docx_bytes: bytes): + """ + Extract all hyperlinks from a Word (.docx) document binary stream. + + Args: + docx_bytes (bytes): Raw bytes of a .docx file. + + Returns: + set[str]: A set of unique hyperlink URLs. + """ + links = set() + with BytesIO(docx_bytes) as bio: + document = Document(bio) + + # Each relationship may represent a hyperlink, image, footer, etc. + for rel in document.part.rels.values(): + if rel.reltype == ( + "http://schemas.openxmlformats.org/officeDocument/2006/relationships/hyperlink" + ): + links.add(rel.target_ref) + + return links + + +def extract_links_from_pdf(pdf_bytes: bytes): + """ + Extract all clickable hyperlinks from a PDF binary stream. + + Args: + pdf_bytes (bytes): Raw bytes of a PDF file. + + Returns: + set[str]: A set of unique hyperlink URLs (unordered). + """ + links = set() + with BytesIO(pdf_bytes) as bio: + pdf = PyPDF2.PdfReader(bio) + + for page in pdf.pages: + annots = page.get("/Annots") + if not annots or isinstance(annots, PyPDF2.generic.IndirectObject): + continue + for annot in annots: + obj = annot.get_object() + a = obj.get("/A") + if a and a.get("/URI"): + links.add(a["/URI"]) + + return links + + +_GLOBAL_SESSION: Optional[requests.Session] = None +def _get_session(headers: Optional[Dict[str, str]] = None) -> requests.Session: + """Get or create a global reusable session.""" + global _GLOBAL_SESSION + if _GLOBAL_SESSION is None: + _GLOBAL_SESSION = requests.Session() + _GLOBAL_SESSION.headers.update({ + "User-Agent": ( + "Mozilla/5.0 (X11; Linux x86_64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/121.0 Safari/537.36" + ) + }) + if headers: + _GLOBAL_SESSION.headers.update(headers) + return _GLOBAL_SESSION + + +def extract_html( + url: str, + timeout: float = 60.0, + headers: Optional[Dict[str, str]] = None, + max_retries: int = 2, +) -> Tuple[Optional[bytes], Dict[str, str]]: + """ + Extract the full HTML page as raw bytes from a given URL. + Automatically reuses a persistent HTTP session and applies robust timeout & retry logic. + + Args: + url (str): Target webpage URL. + timeout (float): Request timeout in seconds (applies to connect + read). + headers (dict, optional): Extra HTTP headers. + max_retries (int): Number of retries on timeout or transient errors. + + Returns: + tuple(bytes|None, dict): + - html_bytes: Raw HTML content (or None if failed) + - metadata: HTTP info (status_code, content_type, final_url, error if any) + """ + session = _get_session(headers=headers) + metadata = {"final_url": url, "status_code": "", "content_type": "", "error": ""} + + for attempt in range(1, max_retries + 1): + try: + resp = session.get(url, timeout=timeout) + resp.raise_for_status() + + html_bytes = resp.content + metadata.update({ + "final_url": resp.url, + "status_code": str(resp.status_code), + "content_type": resp.headers.get("Content-Type", ""), + }) + return html_bytes, metadata + + except Timeout: + metadata["error"] = f"Timeout after {timeout}s (attempt {attempt}/{max_retries})" + if attempt >= max_retries: + continue + except RequestException as e: + metadata["error"] = f"Request failed: {e}" + continue + + return None, metadata \ No newline at end of file