# # Copyright 2024 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Standard library imports import base64 import re import shutil import subprocess import sys import tempfile import threading from io import BytesIO import pdfplumber from PIL import Image # Local imports from api.constants import IMG_BASE64_PREFIX from api.db import FileType LOCK_KEY_pdfplumber = "global_shared_lock_pdfplumber" if LOCK_KEY_pdfplumber not in sys.modules: sys.modules[LOCK_KEY_pdfplumber] = threading.Lock() def filename_type(filename): filename = filename.lower() if re.match(r".*\.pdf$", filename): return FileType.PDF.value if re.match(r".*\.(msg|eml|doc|docx|ppt|pptx|yml|xml|htm|json|jsonl|ldjson|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md|py|js|java|c|cpp|h|php|go|ts|sh|cs|kt|html|sql)$", filename): return FileType.DOC.value if re.match(r".*\.(wav|flac|ape|alac|wavpack|wv|mp3|aac|ogg|vorbis|opus)$", filename): return FileType.AURAL.value if re.match(r".*\.(jpg|jpeg|png|tif|gif|pcx|tga|exif|fpx|svg|psd|cdr|pcd|dxf|ufo|eps|ai|raw|WMF|webp|avif|apng|icon|ico|mpg|mpeg|avi|rm|rmvb|mov|wmv|asf|dat|asx|wvx|mpe|mpa|mp4|avi|mkv)$", filename): return FileType.VISUAL.value return FileType.OTHER.value def thumbnail_img(filename, blob): """ MySQL LongText max length is 65535 """ filename = filename.lower() if re.match(r".*\.pdf$", filename): with sys.modules[LOCK_KEY_pdfplumber]: pdf = pdfplumber.open(BytesIO(blob)) buffered = BytesIO() resolution = 32 img = None for _ in range(10): # https://github.com/jsvine/pdfplumber?tab=readme-ov-file#creating-a-pageimage-with-to_image pdf.pages[0].to_image(resolution=resolution).annotated.save(buffered, format="png") img = buffered.getvalue() if len(img) >= 64000 and resolution >= 2: resolution = resolution / 2 buffered = BytesIO() else: break pdf.close() return img elif re.match(r".*\.(jpg|jpeg|png|tif|gif|icon|ico|webp)$", filename): image = Image.open(BytesIO(blob)) image.thumbnail((30, 30)) buffered = BytesIO() image.save(buffered, format="png") return buffered.getvalue() elif re.match(r".*\.(ppt|pptx)$", filename): import aspose.pydrawing as drawing import aspose.slides as slides try: with slides.Presentation(BytesIO(blob)) as presentation: buffered = BytesIO() scale = 0.03 img = None for _ in range(10): # https://reference.aspose.com/slides/python-net/aspose.slides/slide/get_thumbnail/#float-float presentation.slides[0].get_thumbnail(scale, scale).save(buffered, drawing.imaging.ImageFormat.png) img = buffered.getvalue() if len(img) >= 64000: scale = scale / 2.0 buffered = BytesIO() else: break return img except Exception: pass return None def thumbnail(filename, blob): img = thumbnail_img(filename, blob) if img is not None: return IMG_BASE64_PREFIX + base64.b64encode(img).decode("utf-8") else: return "" def repair_pdf_with_ghostscript(input_bytes): if shutil.which("gs") is None: return input_bytes with tempfile.NamedTemporaryFile(suffix=".pdf") as temp_in, tempfile.NamedTemporaryFile(suffix=".pdf") as temp_out: temp_in.write(input_bytes) temp_in.flush() cmd = [ "gs", "-o", temp_out.name, "-sDEVICE=pdfwrite", "-dPDFSETTINGS=/prepress", temp_in.name, ] try: proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode != 0: return input_bytes except Exception: return input_bytes temp_out.seek(0) repaired_bytes = temp_out.read() return repaired_bytes def read_potential_broken_pdf(blob): def try_open(blob): try: with pdfplumber.open(BytesIO(blob)) as pdf: if pdf.pages: return True except Exception: return False return False if try_open(blob): return blob repaired = repair_pdf_with_ghostscript(blob) if try_open(repaired): return repaired return blob def sanitize_path(raw_path: str | None) -> str: """Normalize and sanitize a user-provided path segment. - Converts backslashes to forward slashes - Strips leading/trailing slashes - Removes '.' and '..' segments - Restricts characters to A-Za-z0-9, underscore, dash, and '/' """ if not raw_path: return "" backslash_re = re.compile(r"[\\]+") unsafe_re = re.compile(r"[^A-Za-z0-9_\-/]") normalized = backslash_re.sub("/", raw_path) normalized = normalized.strip("/") parts = [seg for seg in normalized.split("/") if seg and seg not in (".", "..")] sanitized = "/".join(parts) sanitized = unsafe_re.sub("", sanitized) return sanitized