Feat: add support for multi-column PDF parsing (#10475)

### What problem does this PR solve?

Add support for multi-columns PDF parsing. #9878, #9919.

Two-column sample:
<img width="1885" height="1020" alt="image"
src="https://github.com/user-attachments/assets/0270c028-2db8-4ca6-a4b7-cd5830882d28"
/>

Three-column sample: 
<img width="1881" height="992" alt="image"
src="https://github.com/user-attachments/assets/9ee88844-d5b1-4927-9e4e-3bd810d6e03a"
/>

Single-column sample:
<img width="1883" height="1042" alt="image"
src="https://github.com/user-attachments/assets/e93d3d18-43c3-4067-b5fa-e454ed0ab093"
/>



### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Yongteng Lei
2025-10-11 18:46:09 +08:00
committed by GitHub
parent c21cea2038
commit 5200711441
3 changed files with 196 additions and 85 deletions

View File

@ -17,7 +17,6 @@
import re import re
import mistune
from markdown import markdown from markdown import markdown
@ -117,8 +116,6 @@ class MarkdownElementExtractor:
def __init__(self, markdown_content): def __init__(self, markdown_content):
self.markdown_content = markdown_content self.markdown_content = markdown_content
self.lines = markdown_content.split("\n") self.lines = markdown_content.split("\n")
self.ast_parser = mistune.create_markdown(renderer="ast")
self.ast_nodes = self.ast_parser(markdown_content)
def extract_elements(self): def extract_elements(self):
"""Extract individual elements (headers, code blocks, lists, etc.)""" """Extract individual elements (headers, code blocks, lists, etc.)"""

View File

@ -15,11 +15,13 @@
# #
import logging import logging
import math
import os import os
import random import random
import re import re
import sys import sys
import threading import threading
from collections import Counter, defaultdict
from copy import deepcopy from copy import deepcopy
from io import BytesIO from io import BytesIO
from timeit import default_timer as timer from timeit import default_timer as timer
@ -349,9 +351,78 @@ class RAGFlowPdfParser:
self.boxes[i]["top"] += self.page_cum_height[self.boxes[i]["page_number"] - 1] self.boxes[i]["top"] += self.page_cum_height[self.boxes[i]["page_number"] - 1]
self.boxes[i]["bottom"] += self.page_cum_height[self.boxes[i]["page_number"] - 1] self.boxes[i]["bottom"] += self.page_cum_height[self.boxes[i]["page_number"] - 1]
def _text_merge(self): def _assign_column(self, boxes, zoomin=3):
if not boxes:
return boxes
if all("col_id" in b for b in boxes):
return boxes
by_page = defaultdict(list)
for b in boxes:
by_page[b["page_number"]].append(b)
page_info = {} # pg -> dict(page_w, left_edge, cand_cols)
counter = Counter()
for pg, bxs in by_page.items():
if not bxs:
page_info[pg] = {"page_w": 1.0, "left_edge": 0.0, "cand": 1}
counter[1] += 1
continue
if hasattr(self, "page_images") and self.page_images and len(self.page_images) >= pg:
page_w = self.page_images[pg - 1].size[0] / max(1, zoomin)
left_edge = 0.0
else:
xs0 = [box["x0"] for box in bxs]
xs1 = [box["x1"] for box in bxs]
left_edge = float(min(xs0))
page_w = max(1.0, float(max(xs1) - left_edge))
widths = [max(1.0, (box["x1"] - box["x0"])) for box in bxs]
median_w = float(np.median(widths)) if widths else 1.0
raw_cols = int(page_w / max(1.0, median_w))
# cand = raw_cols if (raw_cols >= 2 and median_w < page_w / raw_cols * 0.8) else 1
cand = raw_cols
page_info[pg] = {"page_w": page_w, "left_edge": left_edge, "cand": cand}
counter[cand] += 1
logging.info(f"[Page {pg}] median_w={median_w:.2f}, page_w={page_w:.2f}, raw_cols={raw_cols}, cand={cand}")
global_cols = counter.most_common(1)[0][0]
logging.info(f"Global column_num decided by majority: {global_cols}")
for pg, bxs in by_page.items():
if not bxs:
continue
page_w = page_info[pg]["page_w"]
left_edge = page_info[pg]["left_edge"]
if global_cols == 1:
for box in bxs:
box["col_id"] = 0
continue
for box in bxs:
w = box["x1"] - box["x0"]
if w >= 0.8 * page_w:
box["col_id"] = 0
continue
cx = 0.5 * (box["x0"] + box["x1"])
norm_cx = (cx - left_edge) / page_w
norm_cx = max(0.0, min(norm_cx, 0.999999))
box["col_id"] = int(min(global_cols - 1, norm_cx * global_cols))
return boxes
def _text_merge(self, zoomin=3):
# merge adjusted boxes # merge adjusted boxes
bxs = self.boxes bxs = self._assign_column(self.boxes, zoomin)
def end_with(b, txt): def end_with(b, txt):
txt = txt.strip() txt = txt.strip()
@ -367,9 +438,15 @@ class RAGFlowPdfParser:
while i < len(bxs) - 1: while i < len(bxs) - 1:
b = bxs[i] b = bxs[i]
b_ = bxs[i + 1] b_ = bxs[i + 1]
if b["page_number"] != b_["page_number"] or b.get("col_id") != b_.get("col_id"):
i += 1
continue
if b.get("layoutno", "0") != b_.get("layoutno", "1") or b.get("layout_type", "") in ["table", "figure", "equation"]: if b.get("layoutno", "0") != b_.get("layoutno", "1") or b.get("layout_type", "") in ["table", "figure", "equation"]:
i += 1 i += 1
continue continue
if abs(self._y_dis(b, b_)) < self.mean_height[bxs[i]["page_number"] - 1] / 3: if abs(self._y_dis(b, b_)) < self.mean_height[bxs[i]["page_number"] - 1] / 3:
# merge # merge
bxs[i]["x1"] = b_["x1"] bxs[i]["x1"] = b_["x1"]
@ -379,50 +456,49 @@ class RAGFlowPdfParser:
bxs.pop(i + 1) bxs.pop(i + 1)
continue continue
i += 1 i += 1
continue
dis_thr = 1
dis = b["x1"] - b_["x0"]
if b.get("layout_type", "") != "text" or b_.get("layout_type", "") != "text":
if end_with(b, "") or start_with(b_, ""):
dis_thr = -8
else:
i += 1
continue
if abs(self._y_dis(b, b_)) < self.mean_height[bxs[i]["page_number"] - 1] / 5 and dis >= dis_thr and b["x1"] < b_["x1"]:
# merge
bxs[i]["x1"] = b_["x1"]
bxs[i]["top"] = (b["top"] + b_["top"]) / 2
bxs[i]["bottom"] = (b["bottom"] + b_["bottom"]) / 2
bxs[i]["text"] += b_["text"]
bxs.pop(i + 1)
continue
i += 1
self.boxes = bxs self.boxes = bxs
def _naive_vertical_merge(self, zoomin=3): def _naive_vertical_merge(self, zoomin=3):
import math bxs = self._assign_column(self.boxes, zoomin)
bxs = Recognizer.sort_Y_firstly(self.boxes, np.median(self.mean_height) / 3)
column_width = np.median([b["x1"] - b["x0"] for b in self.boxes]) grouped = defaultdict(list)
if not column_width or math.isnan(column_width): for b in bxs:
column_width = self.mean_width[0] grouped[(b["page_number"], b.get("col_id", 0))].append(b)
self.column_num = int(self.page_images[0].size[0] / zoomin / column_width)
if column_width < self.page_images[0].size[0] / zoomin / self.column_num: merged_boxes = []
logging.info("Multi-column................... {} {}".format(column_width, self.page_images[0].size[0] / zoomin / self.column_num)) for (pg, col), bxs in grouped.items():
self.boxes = self.sort_X_by_page(self.boxes, column_width / self.column_num) bxs = sorted(bxs, key=lambda x: (x["top"], x["x0"]))
if not bxs:
continue
mh = self.mean_height[pg - 1] if self.mean_height else np.median([b["bottom"] - b["top"] for b in bxs]) or 10
i = 0 i = 0
while i + 1 < len(bxs): while i + 1 < len(bxs):
b = bxs[i] b = bxs[i]
b_ = bxs[i + 1] b_ = bxs[i + 1]
if b["page_number"] < b_["page_number"] and re.match(r"[0-9 •一—-]+$", b["text"]): if b["page_number"] < b_["page_number"] and re.match(r"[0-9 •一—-]+$", b["text"]):
bxs.pop(i) bxs.pop(i)
continue continue
if not b["text"].strip(): if not b["text"].strip():
bxs.pop(i) bxs.pop(i)
continue continue
if not b["text"].strip() or b.get("layoutno") != b_.get("layoutno"):
i += 1
continue
if b_["top"] - b["bottom"] > mh * 1.5:
i += 1
continue
overlap = max(0, min(b["x1"], b_["x1"]) - max(b["x0"], b_["x0"]))
if overlap / max(1, min(b["x1"] - b["x0"], b_["x1"] - b_["x0"])) < 0.3:
i += 1
continue
concatting_feats = [ concatting_feats = [
b["text"].strip()[-1] in ",;:'\",、‘“;:-", b["text"].strip()[-1] in ",;:'\",、‘“;:-",
len(b["text"].strip()) > 1 and b["text"].strip()[-2] in ",;:'\",‘“、;:", len(b["text"].strip()) > 1 and b["text"].strip()[-2] in ",;:'\",‘“、;:",
@ -449,13 +525,39 @@ class RAGFlowPdfParser:
) )
i += 1 i += 1
continue continue
# merge up and down
b["text"] = (b["text"].rstrip() + " " + b_["text"].lstrip()).strip()
b["bottom"] = b_["bottom"] b["bottom"] = b_["bottom"]
b["text"] += b_["text"]
b["x0"] = min(b["x0"], b_["x0"]) b["x0"] = min(b["x0"], b_["x0"])
b["x1"] = max(b["x1"], b_["x1"]) b["x1"] = max(b["x1"], b_["x1"])
bxs.pop(i + 1) bxs.pop(i + 1)
self.boxes = bxs
merged_boxes.extend(bxs)
self.boxes = sorted(merged_boxes, key=lambda x: (x["page_number"], x.get("col_id", 0), x["top"]))
def _final_reading_order_merge(self, zoomin=3):
if not self.boxes:
return
self.boxes = self._assign_column(self.boxes, zoomin=zoomin)
pages = defaultdict(lambda: defaultdict(list))
for b in self.boxes:
pg = b["page_number"]
col = b.get("col_id", 0)
pages[pg][col].append(b)
for pg in pages:
for col in pages[pg]:
pages[pg][col].sort(key=lambda x: (x["top"], x["x0"]))
new_boxes = []
for pg in sorted(pages.keys()):
for col in sorted(pages[pg].keys()):
new_boxes.extend(pages[pg][col])
self.boxes = new_boxes
def _concat_downward(self, concat_between_pages=True): def _concat_downward(self, concat_between_pages=True):
self.boxes = Recognizer.sort_Y_firstly(self.boxes, 0) self.boxes = Recognizer.sort_Y_firstly(self.boxes, 0)
@ -1074,7 +1176,6 @@ class RAGFlowPdfParser:
def insert_table_figures(tbls_or_figs, layout_type): def insert_table_figures(tbls_or_figs, layout_type):
def min_rectangle_distance(rect1, rect2): def min_rectangle_distance(rect1, rect2):
import math
pn1, left1, right1, top1, bottom1 = rect1 pn1, left1, right1, top1, bottom1 = rect1
pn2, left2, right2, top2, bottom2 = rect2 pn2, left2, right2, top2, bottom2 = rect2
if right1 >= left2 and right2 >= left1 and bottom1 >= top2 and bottom2 >= top1: if right1 >= left2 and right2 >= left1 and bottom1 >= top2 and bottom2 >= top1:
@ -1095,7 +1196,9 @@ class RAGFlowPdfParser:
for (img, txt), poss in tbls_or_figs: for (img, txt), poss in tbls_or_figs:
bboxes = [(i, (b["page_number"], b["x0"], b["x1"], b["top"], b["bottom"])) for i, b in enumerate(self.boxes)] bboxes = [(i, (b["page_number"], b["x0"], b["x1"], b["top"], b["bottom"])) for i, b in enumerate(self.boxes)]
dists = [(min_rectangle_distance((pn, left, right, top+self.page_cum_height[pn], bott+self.page_cum_height[pn]), rect),i) for i, rect in bboxes for pn, left, right, top, bott in poss] dists = [
(min_rectangle_distance((pn, left, right, top + self.page_cum_height[pn], bott + self.page_cum_height[pn]), rect), i) for i, rect in bboxes for pn, left, right, top, bott in poss
]
min_i = np.argmin(dists, axis=0)[0] min_i = np.argmin(dists, axis=0)[0]
min_i, rect = bboxes[dists[min_i][-1]] min_i, rect = bboxes[dists[min_i][-1]]
if isinstance(txt, list): if isinstance(txt, list):
@ -1103,10 +1206,20 @@ class RAGFlowPdfParser:
pn, left, right, top, bott = poss[0] pn, left, right, top, bott = poss[0]
if self.boxes[min_i]["bottom"] < top + self.page_cum_height[pn]: if self.boxes[min_i]["bottom"] < top + self.page_cum_height[pn]:
min_i += 1 min_i += 1
self.boxes.insert(min_i, { self.boxes.insert(
"page_number": pn+1, "x0": left, "x1": right, "top": top+self.page_cum_height[pn], "bottom": bott+self.page_cum_height[pn], "layout_type": layout_type, "text": txt, "image": img, min_i,
"positions": [[pn+1, int(left), int(right), int(top), int(bott)]] {
}) "page_number": pn + 1,
"x0": left,
"x1": right,
"top": top + self.page_cum_height[pn],
"bottom": bott + self.page_cum_height[pn],
"layout_type": layout_type,
"text": txt,
"image": img,
"positions": [[pn + 1, int(left), int(right), int(top), int(bott)]],
},
)
for b in self.boxes: for b in self.boxes:
b["position_tag"] = self._line_tag(b, zoomin) b["position_tag"] = self._line_tag(b, zoomin)

View File

@ -328,7 +328,7 @@ class Pdf(PdfParser):
callback(0.65, "Table analysis ({:.2f}s)".format(timer() - start)) callback(0.65, "Table analysis ({:.2f}s)".format(timer() - start))
start = timer() start = timer()
self._text_merge() self._text_merge(zoomin=zoomin)
callback(0.67, "Text merged ({:.2f}s)".format(timer() - start)) callback(0.67, "Text merged ({:.2f}s)".format(timer() - start))
if separate_tables_figures: if separate_tables_figures:
@ -340,6 +340,7 @@ class Pdf(PdfParser):
tbls = self._extract_table_figure(True, zoomin, True, True) tbls = self._extract_table_figure(True, zoomin, True, True)
self._naive_vertical_merge() self._naive_vertical_merge()
self._concat_downward() self._concat_downward()
self._final_reading_order_merge()
# self._filter_forpages() # self._filter_forpages()
logging.info("layouts cost: {}s".format(timer() - first_start)) logging.info("layouts cost: {}s".format(timer() - first_start))
return [(b["text"], self._line_tag(b, zoomin)) for b in self.boxes], tbls return [(b["text"], self._line_tag(b, zoomin)) for b in self.boxes], tbls