Fix: debug hierachical merging... (#10337)

### What problem does this PR solve?


### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Kevin Hu
2025-09-29 09:29:33 +08:00
committed by GitHub
parent 664bc0b961
commit 71f69cdb75
13 changed files with 113 additions and 46 deletions

View File

@ -19,17 +19,19 @@ import re
import sys import sys
from functools import partial from functools import partial
import flask
import trio import trio
from flask import request, Response from flask import request, Response
from flask_login import login_required, current_user from flask_login import login_required, current_user
from agent.component import LLM from agent.component import LLM
from api import settings
from api.db import CanvasCategory, FileType from api.db import CanvasCategory, FileType
from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService, API4ConversationService from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService, API4ConversationService
from api.db.services.document_service import DocumentService from api.db.services.document_service import DocumentService
from api.db.services.file_service import FileService from api.db.services.file_service import FileService
from api.db.services.pipeline_operation_log_service import PipelineOperationLogService from api.db.services.pipeline_operation_log_service import PipelineOperationLogService
from api.db.services.task_service import queue_dataflow, CANVAS_DEBUG_DOC_ID from api.db.services.task_service import queue_dataflow, CANVAS_DEBUG_DOC_ID, TaskService
from api.db.services.user_service import TenantService from api.db.services.user_service import TenantService
from api.db.services.user_canvas_version import UserCanvasVersionService from api.db.services.user_canvas_version import UserCanvasVersionService
from api.settings import RetCode from api.settings import RetCode
@ -37,11 +39,12 @@ from api.utils import get_uuid
from api.utils.api_utils import get_json_result, server_error_response, validate_request, get_data_error_result from api.utils.api_utils import get_json_result, server_error_response, validate_request, get_data_error_result
from agent.canvas import Canvas from agent.canvas import Canvas
from peewee import MySQLDatabase, PostgresqlDatabase from peewee import MySQLDatabase, PostgresqlDatabase
from api.db.db_models import APIToken from api.db.db_models import APIToken, Task
import time import time
from api.utils.file_utils import filename_type, read_potential_broken_pdf from api.utils.file_utils import filename_type, read_potential_broken_pdf
from rag.flow.pipeline import Pipeline from rag.flow.pipeline import Pipeline
from rag.nlp import search
from rag.utils.redis_conn import REDIS_CONN from rag.utils.redis_conn import REDIS_CONN
@ -189,6 +192,15 @@ def rerun():
if 0 < doc["progress"] < 1: if 0 < doc["progress"] < 1:
return get_data_error_result(message=f"`{doc['name']}` is processing...") return get_data_error_result(message=f"`{doc['name']}` is processing...")
if settings.docStoreConn.indexExist(search.index_name(current_user.id), doc["kb_id"]):
settings.docStoreConn.delete({"doc_id": doc["id"]}, search.index_name(current_user.id), doc["kb_id"])
doc["progress_msg"] = ""
doc["chunk_num"] = 0
doc["token_num"] = 0
DocumentService.clear_chunk_num_when_rerun(doc["id"])
DocumentService.update_by_id(id, doc)
TaskService.filter_delete([Task.doc_id == id])
dsl = req["dsl"] dsl = req["dsl"]
dsl["path"] = [req["component_id"]] dsl["path"] = [req["component_id"]]
PipelineOperationLogService.update_by_id(req["id"], {"dsl": dsl}) PipelineOperationLogService.update_by_id(req["id"], {"dsl": dsl})
@ -420,8 +432,8 @@ def getversion( version_id):
@login_required @login_required
def list_canvas(): def list_canvas():
keywords = request.args.get("keywords", "") keywords = request.args.get("keywords", "")
page_number = int(request.args.get("page", 1)) page_number = int(request.args.get("page", 0))
items_per_page = int(request.args.get("page_size", 150)) items_per_page = int(request.args.get("page_size", 0))
orderby = request.args.get("orderby", "create_time") orderby = request.args.get("orderby", "create_time")
canvas_category = request.args.get("canvas_category") canvas_category = request.args.get("canvas_category")
if request.args.get("desc", "true").lower() == "false": if request.args.get("desc", "true").lower() == "false":
@ -429,9 +441,12 @@ def list_canvas():
else: else:
desc = True desc = True
owner_ids = request.args.get("owner_ids", []) owner_ids = request.args.get("owner_ids", [])
if owner_ids and isinstance(owner_ids, str):
owner_ids = [owner_ids]
if not owner_ids: if not owner_ids:
tenants = TenantService.get_joined_tenants_by_user_id(current_user.id) tenants = TenantService.get_joined_tenants_by_user_id(current_user.id)
tenants = [m["tenant_id"] for m in tenants] tenants = [m["tenant_id"] for m in tenants]
tenants.append(current_user.id)
canvas, total = UserCanvasService.get_by_tenant_ids( canvas, total = UserCanvasService.get_by_tenant_ids(
tenants, current_user.id, page_number, tenants, current_user.id, page_number,
items_per_page, orderby, desc, keywords, canvas_category) items_per_page, orderby, desc, keywords, canvas_category)
@ -525,3 +540,11 @@ def prompts():
#"context_ranking": RANK_MEMORY, #"context_ranking": RANK_MEMORY,
"citation_guidelines": CITATION_PROMPT_TEMPLATE "citation_guidelines": CITATION_PROMPT_TEMPLATE
}) })
@manager.route('/download', methods=['GET']) # noqa: F821
def download():
id = request.args.get("id")
created_by = request.args.get("created_by")
blob = FileService.get_blob(created_by, id)
return flask.make_response(blob)

View File

@ -68,7 +68,34 @@ def create():
e, t = TenantService.get_by_id(current_user.id) e, t = TenantService.get_by_id(current_user.id)
if not e: if not e:
return get_data_error_result(message="Tenant not found.") return get_data_error_result(message="Tenant not found.")
#req["embd_id"] = t.embd_id req["parser_config"] = {
"layout_recognize": "DeepDOC",
"chunk_token_num": 512,
"delimiter": "\n",
"auto_keywords": 0,
"auto_questions": 0,
"html4excel": False,
"topn_tags": 3,
"raptor": {
"use_raptor": True,
"prompt": "Please summarize the following paragraphs. Be careful with the numbers, do not make things up. Paragraphs as following:\n {cluster_content}\nThe above is the content you need to summarize.",
"max_token": 256,
"threshold": 0.1,
"max_cluster": 64,
"random_seed": 0
},
"graphrag": {
"use_graphrag": True,
"entity_types": [
"organization",
"person",
"geo",
"event",
"category"
],
"method": "light"
}
}
if not KnowledgebaseService.save(**req): if not KnowledgebaseService.save(**req):
return get_data_error_result() return get_data_error_result()
return get_json_result(data={"kb_id": req["id"]}) return get_json_result(data={"kb_id": req["id"]})
@ -729,19 +756,21 @@ def delete_kb_task():
if not pipeline_task_type or pipeline_task_type not in [PipelineTaskType.GRAPH_RAG, PipelineTaskType.RAPTOR, PipelineTaskType.MINDMAP]: if not pipeline_task_type or pipeline_task_type not in [PipelineTaskType.GRAPH_RAG, PipelineTaskType.RAPTOR, PipelineTaskType.MINDMAP]:
return get_error_data_result(message="Invalid task type") return get_error_data_result(message="Invalid task type")
kb_task_id = ""
match pipeline_task_type: match pipeline_task_type:
case PipelineTaskType.GRAPH_RAG: case PipelineTaskType.GRAPH_RAG:
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), kb_id) settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), kb_id)
kb_task_id = "graphrag_task_id" kb_task_id = "graphrag_task_id"
kb_task_finish_at = "graphrag_task_finish_at"
case PipelineTaskType.RAPTOR: case PipelineTaskType.RAPTOR:
kb_task_id = "raptor_task_id" kb_task_id = "raptor_task_id"
kb_task_finish_at = "raptor_task_finish_at"
case PipelineTaskType.MINDMAP: case PipelineTaskType.MINDMAP:
kb_task_id = "mindmap_task_id" kb_task_id = "mindmap_task_id"
kb_task_finish_at = "mindmap_task_finish_at"
case _: case _:
return get_error_data_result(message="Internal Error: Invalid task type") return get_error_data_result(message="Internal Error: Invalid task type")
ok = KnowledgebaseService.update_by_id(kb_id, {kb_task_id: ""}) ok = KnowledgebaseService.update_by_id(kb_id, {kb_task_id: "", kb_task_finish_at: None})
if not ok: if not ok:
return server_error_response(f"Internal error: cannot delete task {pipeline_task_type}") return server_error_response(f"Internal error: cannot delete task {pipeline_task_type}")

View File

@ -18,7 +18,7 @@ import logging
import time import time
from uuid import uuid4 from uuid import uuid4
from agent.canvas import Canvas from agent.canvas import Canvas
from api.db import CanvasCategory, TenantPermission from api.db import CanvasCategory
from api.db.db_models import DB, CanvasTemplate, User, UserCanvas, API4Conversation from api.db.db_models import DB, CanvasTemplate, User, UserCanvas, API4Conversation
from api.db.services.api_service import API4ConversationService from api.db.services.api_service import API4ConversationService
from api.db.services.common_service import CommonService from api.db.services.common_service import CommonService
@ -104,6 +104,7 @@ class UserCanvasService(CommonService):
cls.model.dsl, cls.model.dsl,
cls.model.description, cls.model.description,
cls.model.permission, cls.model.permission,
cls.model.user_id.alias("tenant_id"),
User.nickname, User.nickname,
User.avatar.alias('tenant_avatar'), User.avatar.alias('tenant_avatar'),
cls.model.update_time, cls.model.update_time,
@ -111,16 +112,15 @@ class UserCanvasService(CommonService):
] ]
if keywords: if keywords:
agents = cls.model.select(*fields).join(User, on=(cls.model.user_id == User.id)).where( agents = cls.model.select(*fields).join(User, on=(cls.model.user_id == User.id)).where(
((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == cls.model.user_id.in_(joined_tenant_ids),
TenantPermission.TEAM.value)) | ( fn.LOWER(cls.model.title).contains(keywords.lower())
cls.model.user_id == user_id)), #(((cls.model.user_id.in_(joined_tenant_ids)) & (cls.model.permission == TenantPermission.TEAM.value)) | (cls.model.user_id == user_id)),
(fn.LOWER(cls.model.title).contains(keywords.lower())) #(fn.LOWER(cls.model.title).contains(keywords.lower()))
) )
else: else:
agents = cls.model.select(*fields).join(User, on=(cls.model.user_id == User.id)).where( agents = cls.model.select(*fields).join(User, on=(cls.model.user_id == User.id)).where(
((cls.model.user_id.in_(joined_tenant_ids) & (cls.model.permission == cls.model.user_id.in_(joined_tenant_ids)
TenantPermission.TEAM.value)) | ( #(((cls.model.user_id.in_(joined_tenant_ids)) & (cls.model.permission == TenantPermission.TEAM.value)) | (cls.model.user_id == user_id))
cls.model.user_id == user_id))
) )
if canvas_category: if canvas_category:
agents = agents.where(cls.model.canvas_category == canvas_category) agents = agents.where(cls.model.canvas_category == canvas_category)
@ -128,8 +128,10 @@ class UserCanvasService(CommonService):
agents = agents.order_by(cls.model.getter_by(orderby).desc()) agents = agents.order_by(cls.model.getter_by(orderby).desc())
else: else:
agents = agents.order_by(cls.model.getter_by(orderby).asc()) agents = agents.order_by(cls.model.getter_by(orderby).asc())
count = agents.count() count = agents.count()
agents = agents.paginate(page_number, items_per_page) if page_number and items_per_page:
agents = agents.paginate(page_number, items_per_page)
return list(agents.dicts()), count return list(agents.dicts()), count
@classmethod @classmethod

View File

@ -29,7 +29,8 @@ from peewee import fn, Case, JOIN
from api import settings from api import settings
from api.constants import IMG_BASE64_PREFIX, FILE_NAME_LEN_LIMIT from api.constants import IMG_BASE64_PREFIX, FILE_NAME_LEN_LIMIT
from api.db import FileType, LLMType, ParserType, StatusEnum, TaskStatus, UserTenantRole, CanvasCategory from api.db import FileType, LLMType, ParserType, StatusEnum, TaskStatus, UserTenantRole, CanvasCategory
from api.db.db_models import DB, Document, Knowledgebase, Task, Tenant, UserTenant, File2Document, File, UserCanvas from api.db.db_models import DB, Document, Knowledgebase, Task, Tenant, UserTenant, File2Document, File, UserCanvas, \
User
from api.db.db_utils import bulk_insert_into_db from api.db.db_utils import bulk_insert_into_db
from api.db.services.common_service import CommonService from api.db.services.common_service import CommonService
from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.knowledgebase_service import KnowledgebaseService
@ -121,19 +122,21 @@ class DocumentService(CommonService):
orderby, desc, keywords, run_status, types, suffix): orderby, desc, keywords, run_status, types, suffix):
fields = cls.get_cls_model_fields() fields = cls.get_cls_model_fields()
if keywords: if keywords:
docs = cls.model.select(*[*fields, UserCanvas.title.alias("pipeline_name")])\ docs = cls.model.select(*[*fields, UserCanvas.title.alias("pipeline_name"), User.nickname])\
.join(File2Document, on=(File2Document.document_id == cls.model.id))\ .join(File2Document, on=(File2Document.document_id == cls.model.id))\
.join(File, on=(File.id == File2Document.file_id))\ .join(File, on=(File.id == File2Document.file_id))\
.join(UserCanvas, on=(cls.model.pipeline_id == UserCanvas.id), join_type=JOIN.LEFT_OUTER)\ .join(UserCanvas, on=(cls.model.pipeline_id == UserCanvas.id), join_type=JOIN.LEFT_OUTER)\
.join(User, on=(cls.model.created_by == User.id), join_type=JOIN.LEFT_OUTER)\
.where( .where(
(cls.model.kb_id == kb_id), (cls.model.kb_id == kb_id),
(fn.LOWER(cls.model.name).contains(keywords.lower())) (fn.LOWER(cls.model.name).contains(keywords.lower()))
) )
else: else:
docs = cls.model.select(*[*fields, UserCanvas.title.alias("pipeline_name")])\ docs = cls.model.select(*[*fields, UserCanvas.title.alias("pipeline_name"), User.nickname])\
.join(File2Document, on=(File2Document.document_id == cls.model.id))\ .join(File2Document, on=(File2Document.document_id == cls.model.id))\
.join(UserCanvas, on=(cls.model.pipeline_id == UserCanvas.id), join_type=JOIN.LEFT_OUTER)\ .join(UserCanvas, on=(cls.model.pipeline_id == UserCanvas.id), join_type=JOIN.LEFT_OUTER)\
.join(File, on=(File.id == File2Document.file_id))\ .join(File, on=(File.id == File2Document.file_id))\
.join(User, on=(cls.model.created_by == User.id), join_type=JOIN.LEFT_OUTER)\
.where(cls.model.kb_id == kb_id) .where(cls.model.kb_id == kb_id)
if run_status: if run_status:

View File

@ -123,7 +123,7 @@ class PipelineOperationLogService(CommonService):
raise RuntimeError(f"Cannot find knowledge base {document.kb_id} for referred_document {referred_document_id}") raise RuntimeError(f"Cannot find knowledge base {document.kb_id} for referred_document {referred_document_id}")
tenant_id = kb_info.tenant_id tenant_id = kb_info.tenant_id
title = document.name title = document.parser_id
avatar = document.thumbnail avatar = document.thumbnail
if task_type not in VALID_PIPELINE_TASK_TYPES: if task_type not in VALID_PIPELINE_TASK_TYPES:
@ -228,14 +228,12 @@ class PipelineOperationLogService(CommonService):
@classmethod @classmethod
@DB.connection_context() @DB.connection_context()
def get_documents_info(cls, id): def get_documents_info(cls, id):
fields = [Document.id, Document.name, Document.progress] fields = [Document.id, Document.name, Document.progress, Document.kb_id]
return ( return (
cls.model.select(*fields) cls.model.select(*fields)
.join(Document, on=(cls.model.document_id == Document.id)) .join(Document, on=(cls.model.document_id == Document.id))
.where( .where(
cls.model.id == id, cls.model.id == id
Document.progress > 0,
Document.progress < 1,
) )
.dicts() .dicts()
) )

View File

@ -358,7 +358,7 @@ def queue_tasks(doc: dict, bucket: str, name: str, priority: int):
page_size = doc["parser_config"].get("task_page_size") or 12 page_size = doc["parser_config"].get("task_page_size") or 12
if doc["parser_id"] == "paper": if doc["parser_id"] == "paper":
page_size = doc["parser_config"].get("task_page_size") or 22 page_size = doc["parser_config"].get("task_page_size") or 22
if doc["parser_id"] in ["one", "knowledge_graph"] or do_layout != "DeepDOC": if doc["parser_id"] in ["one", "knowledge_graph"] or do_layout != "DeepDOC" or doc["parser_config"].get("toc", True):
page_size = 10 ** 9 page_size = 10 ** 9
page_ranges = doc["parser_config"].get("pages") or [(1, 10 ** 5)] page_ranges = doc["parser_config"].get("pages") or [(1, 10 ** 5)]
for s, e in page_ranges: for s, e in page_ranges:
@ -505,7 +505,6 @@ def queue_dataflow(tenant_id:str, flow_id:str, task_id:str, doc_id:str=CANVAS_DE
task["kb_id"] = DocumentService.get_knowledgebase_id(doc_id) task["kb_id"] = DocumentService.get_knowledgebase_id(doc_id)
task["tenant_id"] = tenant_id task["tenant_id"] = tenant_id
task["task_type"] = "dataflow"
task["dataflow_id"] = flow_id task["dataflow_id"] = flow_id
task["file"] = file task["file"] = file

View File

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import json
import random import random
import re import re
from copy import deepcopy from copy import deepcopy
@ -68,9 +68,10 @@ class HierarchicalMerger(ProcessBase):
lines = [ln for ln in payload.split("\n") if ln] lines = [ln for ln in payload.split("\n") if ln]
else: else:
lines = [o.get("text", "") for o in from_upstream.json_result] arr = from_upstream.chunks if from_upstream.output_format == "chunks" else from_upstream.json_result
lines = [o.get("text", "") for o in arr]
sections, section_images = [], [] sections, section_images = [], []
for o in from_upstream.json_result or []: for o in arr or []:
sections.append((o.get("text", ""), o.get("position_tag", ""))) sections.append((o.get("text", ""), o.get("position_tag", "")))
section_images.append(o.get("img_id")) section_images.append(o.get("img_id"))
@ -128,21 +129,26 @@ class HierarchicalMerger(ProcessBase):
all_pathes = [] all_pathes = []
def dfs(n, path, depth): def dfs(n, path, depth):
nonlocal all_pathes nonlocal all_pathes
if depth < self._param.hierarchy: if not n["children"] and path:
path = deepcopy(path) all_pathes.append(path)
for nn in n["children"]: for nn in n["children"]:
path.extend([nn["index"], *nn["texts"]]) if depth < self._param.hierarchy:
dfs(nn, path, depth+1) _path = deepcopy(path)
else:
_path = path
_path.extend([nn["index"], *nn["texts"]])
dfs(nn, _path, depth+1)
if depth == self._param.hierarchy: if depth == self._param.hierarchy:
all_pathes.append(path) all_pathes.append(_path)
for i in range(len(lines)): for i in range(len(lines)):
print(i, lines[i]) print(i, lines[i])
dfs(root, [], 0) dfs(root, [], 0)
print("sSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS", json.dumps(root, ensure_ascii=False, indent=2))
if root["texts"]:
all_pathes.insert(0, root["texts"])
if from_upstream.output_format in ["markdown", "text", "html"]: if from_upstream.output_format in ["markdown", "text", "html"]:
cks = [] cks = []
for path in all_pathes: for path in all_pathes:
@ -161,7 +167,7 @@ class HierarchicalMerger(ProcessBase):
for i in path: for i in path:
txt += lines[i] + "\n" txt += lines[i] + "\n"
concat_img(img, id2image(section_images[i], partial(STORAGE_IMPL.get))) concat_img(img, id2image(section_images[i], partial(STORAGE_IMPL.get)))
cks.append(cks) cks.append(txt)
images.append(img) images.append(img)
cks = [ cks = [
@ -175,5 +181,6 @@ class HierarchicalMerger(ProcessBase):
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for d in cks: for d in cks:
nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), get_uuid()) nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), get_uuid())
self.set_output("chunks", cks)
self.callback(1, "Done.") self.callback(1, "Done.")

View File

@ -235,8 +235,8 @@ class Parser(ProcessBase):
self.set_output("output_format", conf["output_format"]) self.set_output("output_format", conf["output_format"])
spreadsheet_parser = ExcelParser() spreadsheet_parser = ExcelParser()
if conf.get("output_format") == "html": if conf.get("output_format") == "html":
html = spreadsheet_parser.html(blob, 1000000000) htmls = spreadsheet_parser.html(blob, 1000000000)
self.set_output("html", html) self.set_output("html", htmls[0])
elif conf.get("output_format") == "json": elif conf.get("output_format") == "json":
self.set_output("json", [{"text": txt} for txt in spreadsheet_parser(blob) if txt]) self.set_output("json", [{"text": txt} for txt in spreadsheet_parser(blob) if txt])
elif conf.get("output_format") == "markdown": elif conf.get("output_format") == "markdown":

View File

@ -75,7 +75,6 @@ class Pipeline(Graph):
"trace": [{"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": 0}], "trace": [{"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": 0}],
} }
] ]
REDIS_CONN.set_obj(log_key, obj, 60 * 30)
if component_name != "END" and self._doc_id and self.task_id: if component_name != "END" and self._doc_id and self.task_id:
percentage = 1.0 / len(self.components.items()) percentage = 1.0 / len(self.components.items())
finished = 0.0 finished = 0.0
@ -94,6 +93,10 @@ class Pipeline(Graph):
t = obj[-1]["trace"][-1] t = obj[-1]["trace"][-1]
msg += "%s: %s\n" % (t["datetime"], t["message"]) msg += "%s: %s\n" % (t["datetime"], t["message"])
TaskService.update_progress(self.task_id, {"progress": finished, "progress_msg": msg}) TaskService.update_progress(self.task_id, {"progress": finished, "progress_msg": msg})
elif component_name == "END" and not self._doc_id:
obj[-1]["trace"][-1]["dsl"] = json.loads(str(self))
REDIS_CONN.set_obj(log_key, obj, 60 * 30)
except Exception as e: except Exception as e:
logging.exception(e) logging.exception(e)

View File

@ -102,7 +102,7 @@ class Splitter(ProcessBase):
"image": img, "image": img,
"positions": [[pos[0][-1], *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)], "positions": [[pos[0][-1], *pos[1:]] for pos in RAGFlowPdfParser.extract_positions(c)],
} }
for c, img in zip(chunks, images) for c, img in zip(chunks, images) if c.strip()
] ]
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for d in cks: for d in cks:

View File

@ -40,12 +40,14 @@ class TokenizerFromUpstream(BaseModel):
if self.chunks: if self.chunks:
return self return self
if self.output_format in {"markdown", "text"}: if self.output_format in {"markdown", "text", "html"}:
if self.output_format == "markdown" and not self.markdown_result: if self.output_format == "markdown" and not self.markdown_result:
raise ValueError("output_format=markdown requires a markdown payload (field: 'markdown' or 'markdown_result').") raise ValueError("output_format=markdown requires a markdown payload (field: 'markdown' or 'markdown_result').")
if self.output_format == "text" and not self.text_result: if self.output_format == "text" and not self.text_result:
raise ValueError("output_format=text requires a text payload (field: 'text' or 'text_result').") raise ValueError("output_format=text requires a text payload (field: 'text' or 'text_result').")
if self.output_format == "html" and not self.html_result:
raise ValueError("output_format=text requires a html payload (field: 'html' or 'html_result').")
else: else:
if not self.json_result: if not self.json_result and not self.chunks:
raise ValueError("When no chunks are provided and output_format is not markdown/text, a JSON list payload is required (field: 'json' or 'json_result').") raise ValueError("When no chunks are provided and output_format is not markdown/text, a JSON list payload is required (field: 'json' or 'json_result').")
return self return self

View File

@ -137,7 +137,7 @@ class Tokenizer(ProcessBase):
payload = from_upstream.markdown_result payload = from_upstream.markdown_result
elif from_upstream.output_format == "text": elif from_upstream.output_format == "text":
payload = from_upstream.text_result payload = from_upstream.text_result
else: # == "html" else:
payload = from_upstream.html_result payload = from_upstream.html_result
if not payload: if not payload:

View File

@ -245,7 +245,7 @@ async def collect():
task_type = msg.get("task_type", "") task_type = msg.get("task_type", "")
task["task_type"] = task_type task["task_type"] = task_type
if task_type == "dataflow": if task_type[:8] == "dataflow":
task["tenant_id"] = msg["tenant_id"] task["tenant_id"] = msg["tenant_id"]
task["dataflow_id"] = msg["dataflow_id"] task["dataflow_id"] = msg["dataflow_id"]
task["kb_id"] = msg.get("kb_id", "") task["kb_id"] = msg.get("kb_id", "")
@ -491,6 +491,7 @@ async def run_dataflow(task: dict):
e, pipeline_log = PipelineOperationLogService.get_by_id(dataflow_id) e, pipeline_log = PipelineOperationLogService.get_by_id(dataflow_id)
assert e, "Pipeline log not found." assert e, "Pipeline log not found."
dsl = pipeline_log.dsl dsl = pipeline_log.dsl
dataflow_id = pipeline_log.pipeline_id
pipeline = Pipeline(dsl, tenant_id=task["tenant_id"], doc_id=doc_id, task_id=task_id, flow_id=dataflow_id) pipeline = Pipeline(dsl, tenant_id=task["tenant_id"], doc_id=doc_id, task_id=task_id, flow_id=dataflow_id)
chunks = await pipeline.run(file=task["file"]) if task.get("file") else await pipeline.run() chunks = await pipeline.run(file=task["file"]) if task.get("file") else await pipeline.run()
if doc_id == CANVAS_DEBUG_DOC_ID: if doc_id == CANVAS_DEBUG_DOC_ID:
@ -652,7 +653,7 @@ async def run_raptor_for_kb(row, kb_parser_config, chat_mdl, embd_mdl, vector_si
raptor_config["threshold"], raptor_config["threshold"],
) )
original_length = len(chunks) original_length = len(chunks)
chunks = await raptor(chunks, row["parser_config"]["raptor"]["random_seed"], callback) chunks = await raptor(chunks, row["kb_parser_config"]["raptor"]["random_seed"], callback)
doc = { doc = {
"doc_id": fake_doc_id, "doc_id": fake_doc_id,
"kb_id": [str(row["kb_id"])], "kb_id": [str(row["kb_id"])],