Fix: debug PDF positions.. (#10365)

### What problem does this PR solve?

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Kevin Hu
2025-09-30 09:24:44 +08:00
committed by GitHub
parent c49e81882c
commit 9989e06abb
9 changed files with 52 additions and 51 deletions

View File

@ -440,9 +440,7 @@ def list_canvas():
desc = False desc = False
else: else:
desc = True desc = True
owner_ids = request.args.get("owner_ids", []) owner_ids = [id for id in request.args.get("owner_ids", "").strip().split(",") if id]
if owner_ids and isinstance(owner_ids, str):
owner_ids = [owner_ids]
if not owner_ids: if not owner_ids:
tenants = TenantService.get_joined_tenants_by_user_id(current_user.id) tenants = TenantService.get_joined_tenants_by_user_id(current_user.id)
tenants = [m["tenant_id"] for m in tenants] tenants = [m["tenant_id"] for m in tenants]

View File

@ -28,7 +28,7 @@ from api.db.services.task_service import TaskService, GRAPH_RAPTOR_FAKE_DOC_ID
from api.db.services.user_service import TenantService, UserTenantService from api.db.services.user_service import TenantService, UserTenantService
from api.utils.api_utils import get_error_data_result, server_error_response, get_data_error_result, validate_request, not_allowed_parameters from api.utils.api_utils import get_error_data_result, server_error_response, get_data_error_result, validate_request, not_allowed_parameters
from api.utils import get_uuid from api.utils import get_uuid
from api.db import PipelineTaskType, StatusEnum, FileSource, VALID_FILE_TYPES from api.db import PipelineTaskType, StatusEnum, FileSource, VALID_FILE_TYPES, VALID_TASK_STATUS
from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.db_models import File from api.db.db_models import File
from api.utils.api_utils import get_json_result from api.utils.api_utils import get_json_result
@ -453,7 +453,7 @@ def list_pipeline_logs():
operation_status = req.get("operation_status", []) operation_status = req.get("operation_status", [])
if operation_status: if operation_status:
invalid_status = {s for s in operation_status if s not in ["success", "failed", "running", "pending"]} invalid_status = {s for s in operation_status if s not in VALID_TASK_STATUS}
if invalid_status: if invalid_status:
return get_data_error_result(message=f"Invalid filter operation_status status conditions: {', '.join(invalid_status)}") return get_data_error_result(message=f"Invalid filter operation_status status conditions: {', '.join(invalid_status)}")
@ -495,7 +495,7 @@ def list_pipeline_dataset_logs():
operation_status = req.get("operation_status", []) operation_status = req.get("operation_status", [])
if operation_status: if operation_status:
invalid_status = {s for s in operation_status if s not in ["success", "failed", "running", "pending"]} invalid_status = {s for s in operation_status if s not in VALID_TASK_STATUS}
if invalid_status: if invalid_status:
return get_data_error_result(message=f"Invalid filter operation_status status conditions: {', '.join(invalid_status)}") return get_data_error_result(message=f"Invalid filter operation_status status conditions: {', '.join(invalid_status)}")

View File

@ -176,7 +176,7 @@ class PipelineOperationLogService(CommonService):
with DB.atomic(): with DB.atomic():
obj = cls.save(**log) obj = cls.save(**log)
limit = int(os.getenv("PIPELINE_OPERATION_LOG_LIMIT", 1)) limit = int(os.getenv("PIPELINE_OPERATION_LOG_LIMIT", 1000))
total = cls.model.select().where(cls.model.kb_id == document.kb_id).count() total = cls.model.select().where(cls.model.kb_id == document.kb_id).count()
if total > limit: if total > limit:

View File

@ -1104,12 +1104,14 @@ class RAGFlowPdfParser:
if self.boxes[min_i]["bottom"] < top+self.page_cum_height[pn]: if self.boxes[min_i]["bottom"] < top+self.page_cum_height[pn]:
min_i += 1 min_i += 1
self.boxes.insert(min_i, { self.boxes.insert(min_i, {
"page_number": pn+1, "x0": left, "x1": right, "top": top+self.page_cum_height[pn], "bottom": bott+self.page_cum_height[pn], "layout_type": layout_type, "text": txt, "image": img "page_number": pn+1, "x0": left, "x1": right, "top": top+self.page_cum_height[pn], "bottom": bott+self.page_cum_height[pn], "layout_type": layout_type, "text": txt, "image": img,
"positions": [[pn+1, int(left), int(right), int(top), int(bott)]]
}) })
for b in self.boxes: for b in self.boxes:
b["position_tag"] = self._line_tag(b, zoomin) b["position_tag"] = self._line_tag(b, zoomin)
b["image"] = self.crop(b["position_tag"], zoomin) b["image"] = self.crop(b["position_tag"], zoomin)
b["positions"] = [[pos[0][-1]+1, *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(b["position_tag"])]
insert_table_figures(tbls, "table") insert_table_figures(tbls, "table")
insert_table_figures(figs, "figure") insert_table_figures(figs, "figure")

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import random import random
from copy import deepcopy
from agent.component.llm import LLMParam, LLM from agent.component.llm import LLMParam, LLM
from rag.flow.base import ProcessBase, ProcessParamBase from rag.flow.base import ProcessBase, ProcessParamBase
@ -40,7 +41,7 @@ class Extractor(ProcessBase, LLM):
for k, v in inputs.items(): for k, v in inputs.items():
args[k] = v["value"] args[k] = v["value"]
if isinstance(args[k], list): if isinstance(args[k], list):
chunks = args[k] chunks = deepcopy(args[k])
chunks_key = k chunks_key = k
if chunks: if chunks:

View File

@ -14,7 +14,6 @@
# limitations under the License. # limitations under the License.
import io import io
import json import json
import logging
import os import os
import random import random
from functools import partial from functools import partial
@ -31,6 +30,7 @@ from api.utils import get_uuid
from api.utils.base64_image import image2id from api.utils.base64_image import image2id
from deepdoc.parser import ExcelParser from deepdoc.parser import ExcelParser
from deepdoc.parser.pdf_parser import PlainParser, RAGFlowPdfParser, VisionParser from deepdoc.parser.pdf_parser import PlainParser, RAGFlowPdfParser, VisionParser
from rag.app.naive import Docx
from rag.flow.base import ProcessBase, ProcessParamBase from rag.flow.base import ProcessBase, ProcessParamBase
from rag.flow.parser.schema import ParserFromUpstream from rag.flow.parser.schema import ParserFromUpstream
from rag.llm.cv_model import Base as VLM from rag.llm.cv_model import Base as VLM
@ -243,19 +243,13 @@ class Parser(ProcessBase):
self.set_output("markdown", spreadsheet_parser.markdown(blob)) self.set_output("markdown", spreadsheet_parser.markdown(blob))
def _word(self, name, blob): def _word(self, name, blob):
from tika import parser as word_parser
self.callback(random.randint(1, 5) / 100.0, "Start to work on a Word Processor Document") self.callback(random.randint(1, 5) / 100.0, "Start to work on a Word Processor Document")
conf = self._param.setups["word"] conf = self._param.setups["word"]
self.set_output("output_format", conf["output_format"]) self.set_output("output_format", conf["output_format"])
doc_parsed = word_parser.from_buffer(blob) docx_parser = Docx()
sections = [] sections, tbls = docx_parser(name, binary=blob)
if doc_parsed.get("content"): sections = [{"text": section[0], "image": section[1]} for section in sections if section]
sections = doc_parsed["content"].split("\n") sections.extend([{"text": tb, "image": None} for ((_,tb), _) in tbls])
sections = [{"text": section} for section in sections if section]
else:
logging.warning(f"tika.parser got empty content from {name}.")
# json # json
assert conf.get("output_format") == "json", "have to be json for doc" assert conf.get("output_format") == "json", "have to be json for doc"
if conf.get("output_format") == "json": if conf.get("output_format") == "json":

View File

@ -100,7 +100,7 @@ class Splitter(ProcessBase):
{ {
"text": RAGFlowPdfParser.remove_tag(c), "text": RAGFlowPdfParser.remove_tag(c),
"image": img, "image": img,
"positions": [[pos[0][-1], *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)], "positions": [[pos[0][-1]+1, *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)],
} }
for c, img in zip(chunks, images) if c.strip() for c, img in zip(chunks, images) if c.strip()
] ]

View File

@ -756,7 +756,7 @@ class SILICONFLOWEmbed(Base):
texts_batch = texts[i : i + batch_size] texts_batch = texts[i : i + batch_size]
if self.model_name in ["BAAI/bge-large-zh-v1.5", "BAAI/bge-large-en-v1.5"]: if self.model_name in ["BAAI/bge-large-zh-v1.5", "BAAI/bge-large-en-v1.5"]:
# limit 512, 340 is almost safe # limit 512, 340 is almost safe
texts_batch = [" " if not text.strip() else truncate(text, 340) for text in texts_batch] texts_batch = [" " if not text.strip() else truncate(text, 256) for text in texts_batch]
else: else:
texts_batch = [" " if not text.strip() else text for text in texts_batch] texts_batch = [" " if not text.strip() else text for text in texts_batch]

View File

@ -503,9 +503,9 @@ async def run_dataflow(task: dict):
embedding_token_consumption = chunks.get("embedding_token_consumption", 0) embedding_token_consumption = chunks.get("embedding_token_consumption", 0)
if chunks.get("chunks"): if chunks.get("chunks"):
chunks = chunks["chunks"] chunks = copy.deepcopy(chunks["chunks"])
elif chunks.get("json"): elif chunks.get("json"):
chunks = chunks["json"] chunks = copy.deepcopy(chunks["json"])
elif chunks.get("markdown"): elif chunks.get("markdown"):
chunks = [{"text": [chunks["markdown"]]}] chunks = [{"text": [chunks["markdown"]]}]
elif chunks.get("text"): elif chunks.get("text"):
@ -515,34 +515,40 @@ async def run_dataflow(task: dict):
keys = [k for o in chunks for k in list(o.keys())] keys = [k for o in chunks for k in list(o.keys())]
if not any([re.match(r"q_[0-9]+_vec", k) for k in keys]): if not any([re.match(r"q_[0-9]+_vec", k) for k in keys]):
set_progress(task_id, prog=0.82, msg="\n-------------------------------------\nStart to embedding...") try:
e, kb = KnowledgebaseService.get_by_id(task["kb_id"]) set_progress(task_id, prog=0.82, msg="\n-------------------------------------\nStart to embedding...")
embedding_id = kb.embd_id e, kb = KnowledgebaseService.get_by_id(task["kb_id"])
embedding_model = LLMBundle(task["tenant_id"], LLMType.EMBEDDING, llm_name=embedding_id) embedding_id = kb.embd_id
@timeout(60) embedding_model = LLMBundle(task["tenant_id"], LLMType.EMBEDDING, llm_name=embedding_id)
def batch_encode(txts): @timeout(60)
nonlocal embedding_model def batch_encode(txts):
return embedding_model.encode([truncate(c, embedding_model.max_length - 10) for c in txts]) nonlocal embedding_model
vects = np.array([]) return embedding_model.encode([truncate(c, embedding_model.max_length - 10) for c in txts])
texts = [o.get("questions", o.get("summary", o["text"])) for o in chunks] vects = np.array([])
delta = 0.20/(len(texts)//EMBEDDING_BATCH_SIZE+1) texts = [o.get("questions", o.get("summary", o["text"])) for o in chunks]
prog = 0.8 delta = 0.20/(len(texts)//EMBEDDING_BATCH_SIZE+1)
for i in range(0, len(texts), EMBEDDING_BATCH_SIZE): prog = 0.8
async with embed_limiter: for i in range(0, len(texts), EMBEDDING_BATCH_SIZE):
vts, c = await trio.to_thread.run_sync(lambda: batch_encode(texts[i : i + EMBEDDING_BATCH_SIZE])) async with embed_limiter:
if len(vects) == 0: vts, c = await trio.to_thread.run_sync(lambda: batch_encode(texts[i : i + EMBEDDING_BATCH_SIZE]))
vects = vts if len(vects) == 0:
else: vects = vts
vects = np.concatenate((vects, vts), axis=0) else:
embedding_token_consumption += c vects = np.concatenate((vects, vts), axis=0)
prog += delta embedding_token_consumption += c
if i % (len(texts)//EMBEDDING_BATCH_SIZE/100+1) == 1: prog += delta
set_progress(task_id, prog=prog, msg=f"{i+1} / {len(texts)//EMBEDDING_BATCH_SIZE}") if i % (len(texts)//EMBEDDING_BATCH_SIZE/100+1) == 1:
set_progress(task_id, prog=prog, msg=f"{i+1} / {len(texts)//EMBEDDING_BATCH_SIZE}")
assert len(vects) == len(chunks)
for i, ck in enumerate(chunks):
v = vects[i].tolist()
ck["q_%d_vec" % len(v)] = v
except Exception as e:
set_progress(task_id, prog=-1, msg=f"[ERROR]: {e}")
PipelineOperationLogService.create(document_id=doc_id, pipeline_id=dataflow_id, task_type=PipelineTaskType.PARSE, dsl=str(pipeline))
return
assert len(vects) == len(chunks)
for i, ck in enumerate(chunks):
v = vects[i].tolist()
ck["q_%d_vec" % len(v)] = v
metadata = {} metadata = {}
def dict_update(meta): def dict_update(meta):