File: Now parsing support all types of embedded documents, solved #10059 (#10635)

### What problem does this PR solve?

File: Now parsing support all types of embedded documents, solved #10059
Fix: Incomplete words in chat #10530
### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Billy Bao
2025-10-17 18:46:47 +08:00
committed by GitHub
parent f50b2461cb
commit 8ee0b6ea54
6 changed files with 486 additions and 7 deletions

View File

@ -13,7 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Standard library imports
import base64
import hashlib
import io
import json
import os
import re
@ -22,13 +27,21 @@ import subprocess
import sys
import tempfile
import threading
import zipfile
from io import BytesIO
# Typing
from typing import List, Union, Tuple
# Third-party imports
import olefile
import fitz
import pdfplumber
from cachetools import LRUCache, cached
from PIL import Image
from ruamel.yaml import YAML
# Local imports
from api.constants import IMG_BASE64_PREFIX
from api.db import FileType
@ -284,3 +297,139 @@ def read_potential_broken_pdf(blob):
return repaired
return blob
def _is_zip(h: bytes) -> bool:
return h.startswith(b"PK\x03\x04") or h.startswith(b"PK\x05\x06") or h.startswith(b"PK\x07\x08")
def _is_pdf(h: bytes) -> bool:
return h.startswith(b"%PDF-")
def _is_ole(h: bytes) -> bool:
return h.startswith(b"\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1")
def _sha10(b: bytes) -> str:
return hashlib.sha256(b).hexdigest()[:10]
def _guess_ext(b: bytes) -> str:
h = b[:8]
if _is_zip(h):
try:
with zipfile.ZipFile(io.BytesIO(b), "r") as z:
names = [n.lower() for n in z.namelist()]
if any(n.startswith("word/") for n in names):
return ".docx"
if any(n.startswith("ppt/") for n in names):
return ".pptx"
if any(n.startswith("xl/") for n in names):
return ".xlsx"
except Exception:
pass
return ".zip"
if _is_pdf(h):
return ".pdf"
if _is_ole(h):
return ".doc"
return ".bin"
# Try to extract the real embedded payload from OLE's Ole10Native
def _extract_ole10native_payload(data: bytes) -> bytes:
try:
pos = 0
if len(data) < 4:
return data
_ = int.from_bytes(data[pos:pos+4], "little")
pos += 4
for _ in range(3): # filename/src/tmp (NUL-terminated ANSI)
z = data.index(b"\x00", pos)
pos = z + 1
pos += 4
if pos + 4 > len(data):
return data
size = int.from_bytes(data[pos:pos+4], "little")
pos += 4
if pos + size <= len(data):
return data[pos:pos+size]
except Exception:
pass
return data
def extract_embed_file(target: Union[bytes, bytearray]) -> List[Tuple[str, bytes]]:
"""
Only extract the "first layer" of embedding, returning raw (filename, bytes).
These bytes can be directly used for io.BytesIO and then passed to zipfile/fitz/olefile and other libraries for further parsing.
"""
top = bytes(target)
head = top[:8]
out: List[Tuple[str, bytes]] = []
seen = set()
def push(b: bytes, name_hint: str = ""):
h10 = _sha10(b)
if h10 in seen:
return
seen.add(h10)
ext = _guess_ext(b)
# If name_hint does not have a clear extension, use the extension guessed from the content
if "." in name_hint:
fname = name_hint.split("/")[-1]
else:
fname = f"{h10}{ext}"
out.append((fname, b))
# OOXML/ZIP container (docx/xlsx/pptx)
if _is_zip(head):
try:
with zipfile.ZipFile(io.BytesIO(top), "r") as z:
embed_dirs = ("word/embeddings/", "word/objects/", "word/activex/",
"xl/embeddings/", "ppt/embeddings/")
for name in z.namelist():
low = name.lower()
if any(low.startswith(d) for d in embed_dirs):
try:
b = z.read(name)
push(b, name)
except Exception:
pass
except Exception:
pass
return out
# PDF attachments
if _is_pdf(head):
try:
doc = fitz.open(stream=top, filetype="pdf")
count = getattr(doc, "embfile_count", 0)
for i in range(count):
try:
data = doc.embfile_get(i)
if data:
push(bytes(data), f"EmbeddedFiles[{i}]")
except Exception:
pass
doc.close()
except Exception:
pass
return out
# Legacy OLE (.doc/.xls/.ppt)
if _is_ole(head):
try:
with olefile.OleFileIO(io.BytesIO(top)) as ole:
for entry in ole.listdir():
p = "/".join(entry)
try:
data = ole.openstream(entry).read()
except Exception:
continue
if not data:
continue
if "Ole10Native" in p or "ole10native" in p.lower():
data = _extract_ole10native_payload(data)
push(data, p)
except Exception:
pass
return out
return out