Feat: add support for pipeline logs operation (#10207)

### What problem does this PR solve?

Add support for pipeline logs operation

### Type of change


- [x] New Feature (non-breaking change which adds functionality)

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Yongteng Lei
2025-09-23 09:46:31 +08:00
committed by GitHub
parent d0bfe8b10c
commit 0c557e37ad
8 changed files with 340 additions and 17 deletions

View File

@ -22,10 +22,11 @@ from api.db.services import duplicate_name
from api.db.services.document_service import DocumentService from api.db.services.document_service import DocumentService
from api.db.services.file2document_service import File2DocumentService from api.db.services.file2document_service import File2DocumentService
from api.db.services.file_service import FileService from api.db.services.file_service import FileService
from api.db.services.pipeline_operation_log_service import PipelineOperationLogService
from api.db.services.user_service import TenantService, UserTenantService from api.db.services.user_service import TenantService, UserTenantService
from api.utils.api_utils import server_error_response, get_data_error_result, validate_request, not_allowed_parameters from api.utils.api_utils import server_error_response, get_data_error_result, validate_request, not_allowed_parameters
from api.utils import get_uuid from api.utils import get_uuid
from api.db import StatusEnum, FileSource from api.db import StatusEnum, FileSource, VALID_FILE_TYPES
from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.db_models import File from api.db.db_models import File
from api.utils.api_utils import get_json_result from api.utils.api_utils import get_json_result
@ -35,7 +36,6 @@ from api.constants import DATASET_NAME_LIMIT
from rag.settings import PAGERANK_FLD from rag.settings import PAGERANK_FLD
from rag.utils.storage_factory import STORAGE_IMPL from rag.utils.storage_factory import STORAGE_IMPL
@manager.route('/create', methods=['post']) # noqa: F821 @manager.route('/create', methods=['post']) # noqa: F821
@login_required @login_required
@validate_request("name") @validate_request("name")
@ -395,3 +395,84 @@ def get_basic_info():
basic_info = DocumentService.knowledgebase_basic_info(kb_id) basic_info = DocumentService.knowledgebase_basic_info(kb_id)
return get_json_result(data=basic_info) return get_json_result(data=basic_info)
@manager.route("/list_pipeline_logs", methods=["POST"]) # noqa: F821
@login_required
def list_pipeline_logs():
kb_id = request.args.get("kb_id")
if not kb_id:
return get_json_result(data=False, message='Lack of "KB ID"', code=settings.RetCode.ARGUMENT_ERROR)
keywords = request.args.get("keywords", "")
page_number = int(request.args.get("page", 0))
items_per_page = int(request.args.get("page_size", 0))
orderby = request.args.get("orderby", "create_time")
if request.args.get("desc", "true").lower() == "false":
desc = False
else:
desc = True
create_time_from = int(request.args.get("create_time_from", 0))
create_time_to = int(request.args.get("create_time_to", 0))
req = request.get_json()
operation_status = req.get("operation_status", [])
if operation_status:
invalid_status = {s for s in operation_status if s not in ["success", "failed", "running", "pending"]}
if invalid_status:
return get_data_error_result(message=f"Invalid filter operation_status status conditions: {', '.join(invalid_status)}")
types = req.get("types", [])
if types:
invalid_types = {t for t in types if t not in VALID_FILE_TYPES}
if invalid_types:
return get_data_error_result(message=f"Invalid filter conditions: {', '.join(invalid_types)} type{'s' if len(invalid_types) > 1 else ''}")
suffix = req.get("suffix", [])
try:
docs, tol = PipelineOperationLogService.get_by_kb_id(kb_id, page_number, items_per_page, orderby, desc, keywords, operation_status, types, suffix)
if create_time_from or create_time_to:
filtered_docs = []
for doc in docs:
doc_create_time = doc.get("create_time", 0)
if (create_time_from == 0 or doc_create_time >= create_time_from) and (create_time_to == 0 or doc_create_time <= create_time_to):
filtered_docs.append(doc)
docs = filtered_docs
return get_json_result(data={"total": tol, "docs": docs})
except Exception as e:
return server_error_response(e)
@manager.route("/delete_pipeline_logs", methods=["POST"]) # noqa: F821
@login_required
def delete_pipeline_logs():
kb_id = request.args.get("kb_id")
if not kb_id:
return get_json_result(data=False, message='Lack of "KB ID"', code=settings.RetCode.ARGUMENT_ERROR)
req = request.get_json()
log_ids = req.get("log_ids", [])
PipelineOperationLogService.delete_by_ids(log_ids)
return get_json_result(data=True)
@manager.route("/pipeline_log_detail", methods=["GET"]) # noqa: F821
@login_required
def pipeline_log_detail():
log_id = request.args.get("log_id")
if not log_id:
return get_json_result(data=False, message='Lack of "Pipeline log ID"', code=settings.RetCode.ARGUMENT_ERROR)
ok, log = PipelineOperationLogService.get_by_id(log_id)
if not ok:
return get_data_error_result(message="Invalid pipeline log ID")
return get_json_result(data=log.to_dict())

View File

@ -122,4 +122,12 @@ class MCPServerType(StrEnum):
VALID_MCP_SERVER_TYPES = {MCPServerType.SSE, MCPServerType.STREAMABLE_HTTP} VALID_MCP_SERVER_TYPES = {MCPServerType.SSE, MCPServerType.STREAMABLE_HTTP}
class PipelineTaskType(StrEnum):
PARSE = "Parse"
DOWNLOAD = "DOWNLOAD"
VALID_PIPELINE_TASK_TYPES = {PipelineTaskType.PARSE, PipelineTaskType.DOWNLOAD}
KNOWLEDGEBASE_FOLDER_NAME=".knowledgebase" KNOWLEDGEBASE_FOLDER_NAME=".knowledgebase"

View File

@ -906,6 +906,32 @@ class Search(DataBaseModel):
db_table = "search" db_table = "search"
class PipelineOperationLog(DataBaseModel):
id = CharField(max_length=32, primary_key=True)
document_id = CharField(max_length=32, index=True)
tenant_id = CharField(max_length=32, null=False, index=True)
kb_id = CharField(max_length=32, null=False, index=True)
pipeline_id = CharField(max_length=32, null=True, help_text="Pipeline ID", index=True)
pipeline_title = CharField(max_length=32, null=True, help_text="Pipeline title", index=True)
parser_id = CharField(max_length=32, null=False, help_text="Parser ID", index=True)
document_name = CharField(max_length=255, null=False, help_text="File name")
document_suffix = CharField(max_length=255, null=False, help_text="File suffix")
document_type = CharField(max_length=255, null=False, help_text="Document type")
source_from = CharField(max_length=255, null=False, help_text="Source")
progress = FloatField(default=0, index=True)
progress_msg = TextField(null=True, help_text="process message", default="")
process_begin_at = DateTimeField(null=True, index=True)
process_duration = FloatField(default=0)
dsl = JSONField(null=True, default=dict)
task_type = CharField(max_length=32, null=False, default="")
operation_status = CharField(max_length=32, null=False, help_text="Operation status")
avatar = TextField(null=True, help_text="avatar base64 string")
status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True)
class Meta:
db_table = "pipeline_operation_log"
def migrate_db(): def migrate_db():
logging.disable(logging.ERROR) logging.disable(logging.ERROR)
migrator = DatabaseMigrator[settings.DATABASE_TYPE.upper()].value(DB) migrator = DatabaseMigrator[settings.DATABASE_TYPE.upper()].value(DB)

View File

@ -597,6 +597,22 @@ class DocumentService(CommonService):
@DB.connection_context() @DB.connection_context()
def update_progress(cls): def update_progress(cls):
docs = cls.get_unfinished_docs() docs = cls.get_unfinished_docs()
cls._sync_progress(docs)
@classmethod
@DB.connection_context()
def update_progress_immediately(cls, docs:list[dict]):
if not docs:
return
cls._sync_progress(docs)
@classmethod
@DB.connection_context()
def _sync_progress(cls, docs:list[dict]):
for d in docs: for d in docs:
try: try:
tsks = Task.query(doc_id=d["id"], order_by=Task.create_time) tsks = Task.query(doc_id=d["id"], order_by=Task.create_time)

View File

@ -0,0 +1,163 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
from datetime import datetime
from peewee import fn
from api.db import VALID_PIPELINE_TASK_TYPES
from api.db.db_models import DB, PipelineOperationLog
from api.db.services.canvas_service import UserCanvasService
from api.db.services.common_service import CommonService
from api.db.services.document_service import DocumentService
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.utils import current_timestamp, datetime_format, get_uuid
class PipelineOperationLogService(CommonService):
model = PipelineOperationLog
@classmethod
def get_cls_model_fields(cls):
return [
cls.model.id,
cls.model.document_id,
cls.model.tenant_id,
cls.model.kb_id,
cls.model.pipeline_id,
cls.model.pipeline_title,
cls.model.parser_id,
cls.model.document_name,
cls.model.document_suffix,
cls.model.document_type,
cls.model.source_from,
cls.model.progress,
cls.model.progress_msg,
cls.model.process_begin_at,
cls.model.process_duration,
cls.model.dsl,
cls.model.task_type,
cls.model.operation_status,
cls.model.avatar,
cls.model.status,
cls.model.create_time,
cls.model.create_date,
cls.model.update_time,
cls.model.update_date,
]
@classmethod
@DB.connection_context()
def create(cls, document_id, pipeline_id, task_type):
from rag.flow.pipeline import Pipeline
tenant_id = ""
title = ""
avatar = ""
dsl = ""
operation_status = ""
ok, document = DocumentService.get_by_id(document_id)
if not ok:
raise RuntimeError(f"Document {document_id} not found")
DocumentService.update_progress_immediately([document.to_dict()])
ok, document = DocumentService.get_by_id(document_id)
if not ok:
raise RuntimeError(f"Document {document_id} not found")
operation_status = document.run
if pipeline_id:
ok, user_pipeline = UserCanvasService.get_by_id(pipeline_id)
if not ok:
raise RuntimeError(f"Pipeline {pipeline_id} not found")
pipeline = Pipeline(dsl=json.dumps(user_pipeline.dsl), tenant_id=user_pipeline.user_id, doc_id=document_id, task_id="", flow_id=pipeline_id)
tenant_id = user_pipeline.user_id
title = user_pipeline.title
avatar = user_pipeline.avatar
dsl = json.loads(str(pipeline))
else:
ok, kb_info = KnowledgebaseService.get_by_id(document.kb_id)
if not ok:
raise RuntimeError(f"Cannot find knowledge base {document.kb_id} for document {document_id}")
tenant_id = kb_info.tenant_id
title = document.name
avatar = document.thumbnail
if task_type not in VALID_PIPELINE_TASK_TYPES:
raise ValueError(f"Invalid task type: {task_type}")
log = dict(
id=get_uuid(),
document_id=document_id,
tenant_id=tenant_id,
kb_id=document.kb_id,
pipeline_id=pipeline_id,
pipeline_title=title,
parser_id=document.parser_id,
document_name=document.name,
document_suffix=document.suffix,
document_type=document.type,
source_from="", # TODO: add in the future
progress=document.progress,
progress_msg=document.progress_msg,
process_begin_at=document.process_begin_at,
process_duration=document.process_duration,
dsl=dsl,
task_type=task_type,
operation_status=operation_status,
avatar=avatar,
)
log["create_time"] = current_timestamp()
log["create_date"] = datetime_format(datetime.now())
log["update_time"] = current_timestamp()
log["update_date"] = datetime_format(datetime.now())
obj = cls.save(**log)
return obj
@classmethod
@DB.connection_context()
def record_pipeline_operation(cls, document_id, pipeline_id, task_type):
return cls.create(document_id=document_id, pipeline_id=pipeline_id, task_type=task_type)
@classmethod
@DB.connection_context()
def get_by_kb_id(cls, kb_id, page_number, items_per_page, orderby, desc, keywords, operation_status, types, suffix):
fields = cls.get_cls_model_fields()
if keywords:
logs = cls.model.select(*fields).where((cls.model.kb_id == kb_id), (fn.LOWER(cls.model.document_name).contains(keywords.lower())))
else:
logs = cls.model.select(*fields).where(cls.model.kb_id == kb_id)
if operation_status:
logs = logs.where(cls.model.operation_status.in_(operation_status))
if types:
logs = logs.where(cls.model.document_type.in_(types))
if suffix:
logs = logs.where(cls.model.document_suffix.in_(suffix))
count = logs.count()
if desc:
logs = logs.order_by(cls.model.getter_by(orderby).desc())
else:
logs = logs.order_by(cls.model.getter_by(orderby).asc())
if page_number and items_per_page:
logs = logs.paginate(page_number, items_per_page)
return list(logs.dicts()), count

View File

@ -231,7 +231,6 @@ class Parser(ProcessBase):
if conf.get("output_format") == "json": if conf.get("output_format") == "json":
self.set_output("json", bboxes) self.set_output("json", bboxes)
if conf.get("output_format") == "markdown": if conf.get("output_format") == "markdown":
mkdn = "" mkdn = ""
for b in bboxes: for b in bboxes:
@ -295,6 +294,7 @@ class Parser(ProcessBase):
def _markdown(self, name, blob): def _markdown(self, name, blob):
from functools import reduce from functools import reduce
from rag.app.naive import Markdown as naive_markdown_parser from rag.app.naive import Markdown as naive_markdown_parser
from rag.nlp import concat_img from rag.nlp import concat_img
@ -346,7 +346,7 @@ class Parser(ProcessBase):
else: else:
# use VLM to describe the picture # use VLM to describe the picture
cv_model = LLMBundle(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, llm_name=conf["llm_id"],lang=lang) cv_model = LLMBundle(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, llm_name=conf["llm_id"], lang=lang)
img_binary = io.BytesIO() img_binary = io.BytesIO()
img.save(img_binary, format="JPEG") img.save(img_binary, format="JPEG")
img_binary.seek(0) img_binary.seek(0)

View File

@ -19,10 +19,13 @@ import logging
import random import random
import time import time
from timeit import default_timer as timer from timeit import default_timer as timer
import trio import trio
from agent.canvas import Graph from agent.canvas import Graph
from api.db import PipelineTaskType
from api.db.services.document_service import DocumentService from api.db.services.document_service import DocumentService
from api.db.services.pipeline_operation_log_service import PipelineOperationLogService
from rag.utils.redis_conn import REDIS_CONN from rag.utils.redis_conn import REDIS_CONN
@ -44,22 +47,40 @@ class Pipeline(Graph):
obj = json.loads(bin.encode("utf-8")) obj = json.loads(bin.encode("utf-8"))
if obj: if obj:
if obj[-1]["component_id"] == component_name: if obj[-1]["component_id"] == component_name:
obj[-1]["trace"].append({"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": timestamp-obj[-1]["trace"][-1]["timestamp"]}) obj[-1]["trace"].append(
{
"progress": progress,
"message": message,
"datetime": datetime.datetime.now().strftime("%H:%M:%S"),
"timestamp": timestamp,
"elapsed_time": timestamp - obj[-1]["trace"][-1]["timestamp"],
}
)
else: else:
obj.append({"component_id": component_name, "trace": [{"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": 0}]}) obj.append(
{
"component_id": component_name,
"trace": [{"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": 0}],
}
)
else: else:
obj = [{"component_id": component_name, "trace": [{"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": 0}]}] obj = [
{
"component_id": component_name,
"trace": [{"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": 0}],
}
]
REDIS_CONN.set_obj(log_key, obj, 60 * 30) REDIS_CONN.set_obj(log_key, obj, 60 * 30)
if self._doc_id: if self._doc_id:
percentage = 1./len(self.components.items()) percentage = 1.0 / len(self.components.items())
msg = "" msg = ""
finished = 0. finished = 0.0
for o in obj: for o in obj:
if o['component_id'] == "END": if o["component_id"] == "END":
continue continue
msg += f"\n[{o['component_id']}]:\n" msg += f"\n[{o['component_id']}]:\n"
for t in o["trace"]: for t in o["trace"]:
msg += "%s: %s\n"%(t["datetime"], t["message"]) msg += "%s: %s\n" % (t["datetime"], t["message"])
if t["progress"] < 0: if t["progress"] < 0:
finished = -1 finished = -1
break break
@ -129,8 +150,13 @@ class Pipeline(Graph):
self.callback("END", 1, json.dumps(self.get_component_obj(self.path[-1]).output(), ensure_ascii=False)) self.callback("END", 1, json.dumps(self.get_component_obj(self.path[-1]).output(), ensure_ascii=False))
if self._doc_id: if self._doc_id:
DocumentService.update_by_id(self._doc_id,{ DocumentService.update_by_id(
"progress": 1 if not self.error else -1, self._doc_id,
"progress_msg": "Pipeline finished...\n" + self.error, {
"process_duration": time.perf_counter() - st "progress": 1 if not self.error else -1,
}) "progress_msg": "Pipeline finished...\n" + self.error,
"process_duration": time.perf_counter() - st,
},
)
PipelineOperationLogService.create(document_id=self._doc_id, pipeline_id=self._flow_id, task_type=PipelineTaskType.PARSE)

View File

@ -21,6 +21,7 @@ import sys
import threading import threading
import time import time
from api.db.services.canvas_service import UserCanvasService from api.db.services.canvas_service import UserCanvasService
from api.db.services.pipeline_operation_log_service import PipelineOperationLogService
from api.utils.api_utils import timeout from api.utils.api_utils import timeout
from api.utils.base64_image import image2id from api.utils.base64_image import image2id
from api.utils.log_utils import init_root_logger, get_project_base_directory from api.utils.log_utils import init_root_logger, get_project_base_directory
@ -45,7 +46,7 @@ import exceptiongroup
import faulthandler import faulthandler
import numpy as np import numpy as np
from peewee import DoesNotExist from peewee import DoesNotExist
from api.db import LLMType, ParserType from api.db import LLMType, ParserType, PipelineTaskType
from api.db.services.document_service import DocumentService from api.db.services.document_service import DocumentService
from api.db.services.llm_service import LLMBundle from api.db.services.llm_service import LLMBundle
from api.db.services.task_service import TaskService, has_canceled from api.db.services.task_service import TaskService, has_canceled
@ -650,6 +651,7 @@ async def do_handle_task(task):
timer() - start_ts)) timer() - start_ts))
DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0) DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0)
PipelineOperationLogService.record_pipeline_operation(document_id=task_doc_id, pipeline_id="", task_type=PipelineTaskType.PARSE)
time_cost = timer() - start_ts time_cost = timer() - start_ts
task_time_cost = timer() - task_start_ts task_time_cost = timer() - task_start_ts
@ -685,6 +687,7 @@ async def handle_task():
except Exception: except Exception:
pass pass
logging.exception(f"handle_task got exception for task {json.dumps(task)}") logging.exception(f"handle_task got exception for task {json.dumps(task)}")
PipelineOperationLogService.record_pipeline_operation(document_id=task["doc_id"], pipeline_id=task.get("dataflow_id", "") or "", task_type=PipelineTaskType.PARSE)
redis_msg.ack() redis_msg.ack()