From 71f69cdb75d4095e6e88682ddb0d6a6e39ff350f Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Mon, 29 Sep 2025 09:29:33 +0800 Subject: [PATCH] Fix: debug hierachical merging... (#10337) ### What problem does this PR solve? ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- api/apps/canvas_app.py | 31 +++++++++++++--- api/apps/kb_app.py | 35 +++++++++++++++++-- api/db/services/canvas_service.py | 20 ++++++----- api/db/services/document_service.py | 9 +++-- .../pipeline_operation_log_service.py | 8 ++--- api/db/services/task_service.py | 3 +- .../hierarchical_merger.py | 29 +++++++++------ rag/flow/parser/parser.py | 4 +-- rag/flow/pipeline.py | 5 ++- rag/flow/splitter/splitter.py | 2 +- rag/flow/tokenizer/schema.py | 6 ++-- rag/flow/tokenizer/tokenizer.py | 2 +- rag/svr/task_executor.py | 5 +-- 13 files changed, 113 insertions(+), 46 deletions(-) diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index 622d28f4a..f0dac16f4 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -19,17 +19,19 @@ import re import sys from functools import partial +import flask import trio from flask import request, Response from flask_login import login_required, current_user from agent.component import LLM +from api import settings from api.db import CanvasCategory, FileType from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService, API4ConversationService from api.db.services.document_service import DocumentService from api.db.services.file_service import FileService from api.db.services.pipeline_operation_log_service import PipelineOperationLogService -from api.db.services.task_service import queue_dataflow, CANVAS_DEBUG_DOC_ID +from api.db.services.task_service import queue_dataflow, CANVAS_DEBUG_DOC_ID, TaskService from api.db.services.user_service import TenantService from api.db.services.user_canvas_version import UserCanvasVersionService from api.settings import RetCode @@ -37,11 +39,12 @@ from api.utils import get_uuid from api.utils.api_utils import get_json_result, server_error_response, validate_request, get_data_error_result from agent.canvas import Canvas from peewee import MySQLDatabase, PostgresqlDatabase -from api.db.db_models import APIToken +from api.db.db_models import APIToken, Task import time from api.utils.file_utils import filename_type, read_potential_broken_pdf from rag.flow.pipeline import Pipeline +from rag.nlp import search from rag.utils.redis_conn import REDIS_CONN @@ -189,6 +192,15 @@ def rerun(): if 0 < doc["progress"] < 1: return get_data_error_result(message=f"`{doc['name']}` is processing...") + if settings.docStoreConn.indexExist(search.index_name(current_user.id), doc["kb_id"]): + settings.docStoreConn.delete({"doc_id": doc["id"]}, search.index_name(current_user.id), doc["kb_id"]) + doc["progress_msg"] = "" + doc["chunk_num"] = 0 + doc["token_num"] = 0 + DocumentService.clear_chunk_num_when_rerun(doc["id"]) + DocumentService.update_by_id(id, doc) + TaskService.filter_delete([Task.doc_id == id]) + dsl = req["dsl"] dsl["path"] = [req["component_id"]] PipelineOperationLogService.update_by_id(req["id"], {"dsl": dsl}) @@ -420,8 +432,8 @@ def getversion( version_id): @login_required def list_canvas(): keywords = request.args.get("keywords", "") - page_number = int(request.args.get("page", 1)) - items_per_page = int(request.args.get("page_size", 150)) + page_number = int(request.args.get("page", 0)) + items_per_page = int(request.args.get("page_size", 0)) orderby = request.args.get("orderby", "create_time") canvas_category = request.args.get("canvas_category") if request.args.get("desc", "true").lower() == "false": @@ -429,9 +441,12 @@ def list_canvas(): else: desc = True owner_ids = request.args.get("owner_ids", []) + if owner_ids and isinstance(owner_ids, str): + owner_ids = [owner_ids] if not owner_ids: tenants = TenantService.get_joined_tenants_by_user_id(current_user.id) tenants = [m["tenant_id"] for m in tenants] + tenants.append(current_user.id) canvas, total = UserCanvasService.get_by_tenant_ids( tenants, current_user.id, page_number, items_per_page, orderby, desc, keywords, canvas_category) @@ -525,3 +540,11 @@ def prompts(): #"context_ranking": RANK_MEMORY, "citation_guidelines": CITATION_PROMPT_TEMPLATE }) + + +@manager.route('/download', methods=['GET']) # noqa: F821 +def download(): + id = request.args.get("id") + created_by = request.args.get("created_by") + blob = FileService.get_blob(created_by, id) + return flask.make_response(blob) \ No newline at end of file diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index 92e66c7dd..e14dd5264 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -68,7 +68,34 @@ def create(): e, t = TenantService.get_by_id(current_user.id) if not e: return get_data_error_result(message="Tenant not found.") - #req["embd_id"] = t.embd_id + req["parser_config"] = { + "layout_recognize": "DeepDOC", + "chunk_token_num": 512, + "delimiter": "\n", + "auto_keywords": 0, + "auto_questions": 0, + "html4excel": False, + "topn_tags": 3, + "raptor": { + "use_raptor": True, + "prompt": "Please summarize the following paragraphs. Be careful with the numbers, do not make things up. Paragraphs as following:\n {cluster_content}\nThe above is the content you need to summarize.", + "max_token": 256, + "threshold": 0.1, + "max_cluster": 64, + "random_seed": 0 + }, + "graphrag": { + "use_graphrag": True, + "entity_types": [ + "organization", + "person", + "geo", + "event", + "category" + ], + "method": "light" + } + } if not KnowledgebaseService.save(**req): return get_data_error_result() return get_json_result(data={"kb_id": req["id"]}) @@ -729,19 +756,21 @@ def delete_kb_task(): if not pipeline_task_type or pipeline_task_type not in [PipelineTaskType.GRAPH_RAG, PipelineTaskType.RAPTOR, PipelineTaskType.MINDMAP]: return get_error_data_result(message="Invalid task type") - kb_task_id = "" match pipeline_task_type: case PipelineTaskType.GRAPH_RAG: settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), kb_id) kb_task_id = "graphrag_task_id" + kb_task_finish_at = "graphrag_task_finish_at" case PipelineTaskType.RAPTOR: kb_task_id = "raptor_task_id" + kb_task_finish_at = "raptor_task_finish_at" case PipelineTaskType.MINDMAP: kb_task_id = "mindmap_task_id" + kb_task_finish_at = "mindmap_task_finish_at" case _: return get_error_data_result(message="Internal Error: Invalid task type") - ok = KnowledgebaseService.update_by_id(kb_id, {kb_task_id: ""}) + ok = KnowledgebaseService.update_by_id(kb_id, {kb_task_id: "", kb_task_finish_at: None}) if not ok: return server_error_response(f"Internal error: cannot delete task {pipeline_task_type}") diff --git a/api/db/services/canvas_service.py b/api/db/services/canvas_service.py index 4766ca821..bc07aec1c 100644 --- a/api/db/services/canvas_service.py +++ b/api/db/services/canvas_service.py @@ -18,7 +18,7 @@ import logging import time from uuid import uuid4 from agent.canvas import Canvas -from api.db import CanvasCategory, TenantPermission +from api.db import CanvasCategory from api.db.db_models import DB, CanvasTemplate, User, UserCanvas, API4Conversation from api.db.services.api_service import API4ConversationService from api.db.services.common_service import CommonService @@ -104,6 +104,7 @@ class UserCanvasService(CommonService): cls.model.dsl, cls.model.description, cls.model.permission, + cls.model.user_id.alias("tenant_id"), User.nickname, User.avatar.alias('tenant_avatar'), cls.model.update_time, @@ -111,16 +112,15 @@ class UserCanvasService(CommonService): ] if keywords: agents = cls.model.select(*fields).join(User, on=(cls.model.user_id == User.id)).where( - ((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == - TenantPermission.TEAM.value)) | ( - cls.model.user_id == user_id)), - (fn.LOWER(cls.model.title).contains(keywords.lower())) + cls.model.user_id.in_(joined_tenant_ids), + fn.LOWER(cls.model.title).contains(keywords.lower()) + #(((cls.model.user_id.in_(joined_tenant_ids)) & (cls.model.permission == TenantPermission.TEAM.value)) | (cls.model.user_id == user_id)), + #(fn.LOWER(cls.model.title).contains(keywords.lower())) ) else: agents = cls.model.select(*fields).join(User, on=(cls.model.user_id == User.id)).where( - ((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == - TenantPermission.TEAM.value)) | ( - cls.model.user_id == user_id)) + cls.model.user_id.in_(joined_tenant_ids) + #(((cls.model.user_id.in_(joined_tenant_ids)) & (cls.model.permission == TenantPermission.TEAM.value)) | (cls.model.user_id == user_id)) ) if canvas_category: agents = agents.where(cls.model.canvas_category == canvas_category) @@ -128,8 +128,10 @@ class UserCanvasService(CommonService): agents = agents.order_by(cls.model.getter_by(orderby).desc()) else: agents = agents.order_by(cls.model.getter_by(orderby).asc()) + count = agents.count() - agents = agents.paginate(page_number, items_per_page) + if page_number and items_per_page: + agents = agents.paginate(page_number, items_per_page) return list(agents.dicts()), count @classmethod diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index cdbe48c39..c091f57f3 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -29,7 +29,8 @@ from peewee import fn, Case, JOIN from api import settings from api.constants import IMG_BASE64_PREFIX, FILE_NAME_LEN_LIMIT from api.db import FileType, LLMType, ParserType, StatusEnum, TaskStatus, UserTenantRole, CanvasCategory -from api.db.db_models import DB, Document, Knowledgebase, Task, Tenant, UserTenant, File2Document, File, UserCanvas +from api.db.db_models import DB, Document, Knowledgebase, Task, Tenant, UserTenant, File2Document, File, UserCanvas, \ + User from api.db.db_utils import bulk_insert_into_db from api.db.services.common_service import CommonService from api.db.services.knowledgebase_service import KnowledgebaseService @@ -121,19 +122,21 @@ class DocumentService(CommonService): orderby, desc, keywords, run_status, types, suffix): fields = cls.get_cls_model_fields() if keywords: - docs = cls.model.select(*[*fields, UserCanvas.title.alias("pipeline_name")])\ + docs = cls.model.select(*[*fields, UserCanvas.title.alias("pipeline_name"), User.nickname])\ .join(File2Document, on=(File2Document.document_id == cls.model.id))\ .join(File, on=(File.id == File2Document.file_id))\ .join(UserCanvas, on=(cls.model.pipeline_id == UserCanvas.id), join_type=JOIN.LEFT_OUTER)\ + .join(User, on=(cls.model.created_by == User.id), join_type=JOIN.LEFT_OUTER)\ .where( (cls.model.kb_id == kb_id), (fn.LOWER(cls.model.name).contains(keywords.lower())) ) else: - docs = cls.model.select(*[*fields, UserCanvas.title.alias("pipeline_name")])\ + docs = cls.model.select(*[*fields, UserCanvas.title.alias("pipeline_name"), User.nickname])\ .join(File2Document, on=(File2Document.document_id == cls.model.id))\ .join(UserCanvas, on=(cls.model.pipeline_id == UserCanvas.id), join_type=JOIN.LEFT_OUTER)\ .join(File, on=(File.id == File2Document.file_id))\ + .join(User, on=(cls.model.created_by == User.id), join_type=JOIN.LEFT_OUTER)\ .where(cls.model.kb_id == kb_id) if run_status: diff --git a/api/db/services/pipeline_operation_log_service.py b/api/db/services/pipeline_operation_log_service.py index 74bb83554..2860c50b1 100644 --- a/api/db/services/pipeline_operation_log_service.py +++ b/api/db/services/pipeline_operation_log_service.py @@ -123,7 +123,7 @@ class PipelineOperationLogService(CommonService): raise RuntimeError(f"Cannot find knowledge base {document.kb_id} for referred_document {referred_document_id}") tenant_id = kb_info.tenant_id - title = document.name + title = document.parser_id avatar = document.thumbnail if task_type not in VALID_PIPELINE_TASK_TYPES: @@ -228,14 +228,12 @@ class PipelineOperationLogService(CommonService): @classmethod @DB.connection_context() def get_documents_info(cls, id): - fields = [Document.id, Document.name, Document.progress] + fields = [Document.id, Document.name, Document.progress, Document.kb_id] return ( cls.model.select(*fields) .join(Document, on=(cls.model.document_id == Document.id)) .where( - cls.model.id == id, - Document.progress > 0, - Document.progress < 1, + cls.model.id == id ) .dicts() ) diff --git a/api/db/services/task_service.py b/api/db/services/task_service.py index 077970fa7..4b8eddc82 100644 --- a/api/db/services/task_service.py +++ b/api/db/services/task_service.py @@ -358,7 +358,7 @@ def queue_tasks(doc: dict, bucket: str, name: str, priority: int): page_size = doc["parser_config"].get("task_page_size") or 12 if doc["parser_id"] == "paper": page_size = doc["parser_config"].get("task_page_size") or 22 - if doc["parser_id"] in ["one", "knowledge_graph"] or do_layout != "DeepDOC": + if doc["parser_id"] in ["one", "knowledge_graph"] or do_layout != "DeepDOC" or doc["parser_config"].get("toc", True): page_size = 10 ** 9 page_ranges = doc["parser_config"].get("pages") or [(1, 10 ** 5)] for s, e in page_ranges: @@ -505,7 +505,6 @@ def queue_dataflow(tenant_id:str, flow_id:str, task_id:str, doc_id:str=CANVAS_DE task["kb_id"] = DocumentService.get_knowledgebase_id(doc_id) task["tenant_id"] = tenant_id - task["task_type"] = "dataflow" task["dataflow_id"] = flow_id task["file"] = file diff --git a/rag/flow/hierarchical_merger/hierarchical_merger.py b/rag/flow/hierarchical_merger/hierarchical_merger.py index cb4b28957..dda2bcfa7 100644 --- a/rag/flow/hierarchical_merger/hierarchical_merger.py +++ b/rag/flow/hierarchical_merger/hierarchical_merger.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import json + import random import re from copy import deepcopy @@ -68,9 +68,10 @@ class HierarchicalMerger(ProcessBase): lines = [ln for ln in payload.split("\n") if ln] else: - lines = [o.get("text", "") for o in from_upstream.json_result] + arr = from_upstream.chunks if from_upstream.output_format == "chunks" else from_upstream.json_result + lines = [o.get("text", "") for o in arr] sections, section_images = [], [] - for o in from_upstream.json_result or []: + for o in arr or []: sections.append((o.get("text", ""), o.get("position_tag", ""))) section_images.append(o.get("img_id")) @@ -128,21 +129,26 @@ class HierarchicalMerger(ProcessBase): all_pathes = [] def dfs(n, path, depth): nonlocal all_pathes - if depth < self._param.hierarchy: - path = deepcopy(path) + if not n["children"] and path: + all_pathes.append(path) for nn in n["children"]: - path.extend([nn["index"], *nn["texts"]]) - dfs(nn, path, depth+1) + if depth < self._param.hierarchy: + _path = deepcopy(path) + else: + _path = path + _path.extend([nn["index"], *nn["texts"]]) + dfs(nn, _path, depth+1) - if depth == self._param.hierarchy: - all_pathes.append(path) + if depth == self._param.hierarchy: + all_pathes.append(_path) for i in range(len(lines)): print(i, lines[i]) dfs(root, [], 0) - print("sSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS", json.dumps(root, ensure_ascii=False, indent=2)) + if root["texts"]: + all_pathes.insert(0, root["texts"]) if from_upstream.output_format in ["markdown", "text", "html"]: cks = [] for path in all_pathes: @@ -161,7 +167,7 @@ class HierarchicalMerger(ProcessBase): for i in path: txt += lines[i] + "\n" concat_img(img, id2image(section_images[i], partial(STORAGE_IMPL.get))) - cks.append(cks) + cks.append(txt) images.append(img) cks = [ @@ -175,5 +181,6 @@ class HierarchicalMerger(ProcessBase): async with trio.open_nursery() as nursery: for d in cks: nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), get_uuid()) + self.set_output("chunks", cks) self.callback(1, "Done.") diff --git a/rag/flow/parser/parser.py b/rag/flow/parser/parser.py index 30bd4e329..e0d96d5ef 100644 --- a/rag/flow/parser/parser.py +++ b/rag/flow/parser/parser.py @@ -235,8 +235,8 @@ class Parser(ProcessBase): self.set_output("output_format", conf["output_format"]) spreadsheet_parser = ExcelParser() if conf.get("output_format") == "html": - html = spreadsheet_parser.html(blob, 1000000000) - self.set_output("html", html) + htmls = spreadsheet_parser.html(blob, 1000000000) + self.set_output("html", htmls[0]) elif conf.get("output_format") == "json": self.set_output("json", [{"text": txt} for txt in spreadsheet_parser(blob) if txt]) elif conf.get("output_format") == "markdown": diff --git a/rag/flow/pipeline.py b/rag/flow/pipeline.py index 557b708b7..a1db65bbc 100644 --- a/rag/flow/pipeline.py +++ b/rag/flow/pipeline.py @@ -75,7 +75,6 @@ class Pipeline(Graph): "trace": [{"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": 0}], } ] - REDIS_CONN.set_obj(log_key, obj, 60 * 30) if component_name != "END" and self._doc_id and self.task_id: percentage = 1.0 / len(self.components.items()) finished = 0.0 @@ -94,6 +93,10 @@ class Pipeline(Graph): t = obj[-1]["trace"][-1] msg += "%s: %s\n" % (t["datetime"], t["message"]) TaskService.update_progress(self.task_id, {"progress": finished, "progress_msg": msg}) + elif component_name == "END" and not self._doc_id: + obj[-1]["trace"][-1]["dsl"] = json.loads(str(self)) + REDIS_CONN.set_obj(log_key, obj, 60 * 30) + except Exception as e: logging.exception(e) diff --git a/rag/flow/splitter/splitter.py b/rag/flow/splitter/splitter.py index b18148f2f..c3aa9126c 100644 --- a/rag/flow/splitter/splitter.py +++ b/rag/flow/splitter/splitter.py @@ -102,7 +102,7 @@ class Splitter(ProcessBase): "image": img, "positions": [[pos[0][-1], *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)], } - for c, img in zip(chunks, images) + for c, img in zip(chunks, images) if c.strip() ] async with trio.open_nursery() as nursery: for d in cks: diff --git a/rag/flow/tokenizer/schema.py b/rag/flow/tokenizer/schema.py index a2bb68a7d..e74a5825f 100644 --- a/rag/flow/tokenizer/schema.py +++ b/rag/flow/tokenizer/schema.py @@ -40,12 +40,14 @@ class TokenizerFromUpstream(BaseModel): if self.chunks: return self - if self.output_format in {"markdown", "text"}: + if self.output_format in {"markdown", "text", "html"}: if self.output_format == "markdown" and not self.markdown_result: raise ValueError("output_format=markdown requires a markdown payload (field: 'markdown' or 'markdown_result').") if self.output_format == "text" and not self.text_result: raise ValueError("output_format=text requires a text payload (field: 'text' or 'text_result').") + if self.output_format == "html" and not self.html_result: + raise ValueError("output_format=text requires a html payload (field: 'html' or 'html_result').") else: - if not self.json_result: + if not self.json_result and not self.chunks: raise ValueError("When no chunks are provided and output_format is not markdown/text, a JSON list payload is required (field: 'json' or 'json_result').") return self diff --git a/rag/flow/tokenizer/tokenizer.py b/rag/flow/tokenizer/tokenizer.py index 1d5b22b65..425e20397 100644 --- a/rag/flow/tokenizer/tokenizer.py +++ b/rag/flow/tokenizer/tokenizer.py @@ -137,7 +137,7 @@ class Tokenizer(ProcessBase): payload = from_upstream.markdown_result elif from_upstream.output_format == "text": payload = from_upstream.text_result - else: # == "html" + else: payload = from_upstream.html_result if not payload: diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index f9b6f33a3..ac3a709c1 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -245,7 +245,7 @@ async def collect(): task_type = msg.get("task_type", "") task["task_type"] = task_type - if task_type == "dataflow": + if task_type[:8] == "dataflow": task["tenant_id"] = msg["tenant_id"] task["dataflow_id"] = msg["dataflow_id"] task["kb_id"] = msg.get("kb_id", "") @@ -491,6 +491,7 @@ async def run_dataflow(task: dict): e, pipeline_log = PipelineOperationLogService.get_by_id(dataflow_id) assert e, "Pipeline log not found." dsl = pipeline_log.dsl + dataflow_id = pipeline_log.pipeline_id pipeline = Pipeline(dsl, tenant_id=task["tenant_id"], doc_id=doc_id, task_id=task_id, flow_id=dataflow_id) chunks = await pipeline.run(file=task["file"]) if task.get("file") else await pipeline.run() if doc_id == CANVAS_DEBUG_DOC_ID: @@ -652,7 +653,7 @@ async def run_raptor_for_kb(row, kb_parser_config, chat_mdl, embd_mdl, vector_si raptor_config["threshold"], ) original_length = len(chunks) - chunks = await raptor(chunks, row["parser_config"]["raptor"]["random_seed"], callback) + chunks = await raptor(chunks, row["kb_parser_config"]["raptor"]["random_seed"], callback) doc = { "doc_id": fake_doc_id, "kb_id": [str(row["kb_id"])],