diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index c9420c1ff..d2c2e35e2 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -22,10 +22,11 @@ from api.db.services import duplicate_name from api.db.services.document_service import DocumentService from api.db.services.file2document_service import File2DocumentService from api.db.services.file_service import FileService +from api.db.services.pipeline_operation_log_service import PipelineOperationLogService from api.db.services.user_service import TenantService, UserTenantService from api.utils.api_utils import server_error_response, get_data_error_result, validate_request, not_allowed_parameters from api.utils import get_uuid -from api.db import StatusEnum, FileSource +from api.db import StatusEnum, FileSource, VALID_FILE_TYPES from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.db_models import File from api.utils.api_utils import get_json_result @@ -35,7 +36,6 @@ from api.constants import DATASET_NAME_LIMIT from rag.settings import PAGERANK_FLD from rag.utils.storage_factory import STORAGE_IMPL - @manager.route('/create', methods=['post']) # noqa: F821 @login_required @validate_request("name") @@ -395,3 +395,84 @@ def get_basic_info(): basic_info = DocumentService.knowledgebase_basic_info(kb_id) return get_json_result(data=basic_info) + + +@manager.route("/list_pipeline_logs", methods=["POST"]) # noqa: F821 +@login_required +def list_pipeline_logs(): + kb_id = request.args.get("kb_id") + if not kb_id: + return get_json_result(data=False, message='Lack of "KB ID"', code=settings.RetCode.ARGUMENT_ERROR) + + keywords = request.args.get("keywords", "") + + page_number = int(request.args.get("page", 0)) + items_per_page = int(request.args.get("page_size", 0)) + orderby = request.args.get("orderby", "create_time") + if request.args.get("desc", "true").lower() == "false": + desc = False + else: + desc = True + create_time_from = int(request.args.get("create_time_from", 0)) + create_time_to = int(request.args.get("create_time_to", 0)) + + req = request.get_json() + + operation_status = req.get("operation_status", []) + if operation_status: + invalid_status = {s for s in operation_status if s not in ["success", "failed", "running", "pending"]} + if invalid_status: + return get_data_error_result(message=f"Invalid filter operation_status status conditions: {', '.join(invalid_status)}") + + types = req.get("types", []) + if types: + invalid_types = {t for t in types if t not in VALID_FILE_TYPES} + if invalid_types: + return get_data_error_result(message=f"Invalid filter conditions: {', '.join(invalid_types)} type{'s' if len(invalid_types) > 1 else ''}") + + suffix = req.get("suffix", []) + + try: + docs, tol = PipelineOperationLogService.get_by_kb_id(kb_id, page_number, items_per_page, orderby, desc, keywords, operation_status, types, suffix) + + if create_time_from or create_time_to: + filtered_docs = [] + for doc in docs: + doc_create_time = doc.get("create_time", 0) + if (create_time_from == 0 or doc_create_time >= create_time_from) and (create_time_to == 0 or doc_create_time <= create_time_to): + filtered_docs.append(doc) + docs = filtered_docs + + + return get_json_result(data={"total": tol, "docs": docs}) + except Exception as e: + return server_error_response(e) + + +@manager.route("/delete_pipeline_logs", methods=["POST"]) # noqa: F821 +@login_required +def delete_pipeline_logs(): + kb_id = request.args.get("kb_id") + if not kb_id: + return get_json_result(data=False, message='Lack of "KB ID"', code=settings.RetCode.ARGUMENT_ERROR) + + req = request.get_json() + log_ids = req.get("log_ids", []) + + PipelineOperationLogService.delete_by_ids(log_ids) + + return get_json_result(data=True) + + +@manager.route("/pipeline_log_detail", methods=["GET"]) # noqa: F821 +@login_required +def pipeline_log_detail(): + log_id = request.args.get("log_id") + if not log_id: + return get_json_result(data=False, message='Lack of "Pipeline log ID"', code=settings.RetCode.ARGUMENT_ERROR) + + ok, log = PipelineOperationLogService.get_by_id(log_id) + if not ok: + return get_data_error_result(message="Invalid pipeline log ID") + + return get_json_result(data=log.to_dict()) diff --git a/api/db/__init__.py b/api/db/__init__.py index 186e9bcba..9bdfc726d 100644 --- a/api/db/__init__.py +++ b/api/db/__init__.py @@ -122,4 +122,12 @@ class MCPServerType(StrEnum): VALID_MCP_SERVER_TYPES = {MCPServerType.SSE, MCPServerType.STREAMABLE_HTTP} +class PipelineTaskType(StrEnum): + PARSE = "Parse" + DOWNLOAD = "DOWNLOAD" + + +VALID_PIPELINE_TASK_TYPES = {PipelineTaskType.PARSE, PipelineTaskType.DOWNLOAD} + + KNOWLEDGEBASE_FOLDER_NAME=".knowledgebase" diff --git a/api/db/db_models.py b/api/db/db_models.py index 98db65be3..07702c2e8 100644 --- a/api/db/db_models.py +++ b/api/db/db_models.py @@ -906,6 +906,32 @@ class Search(DataBaseModel): db_table = "search" +class PipelineOperationLog(DataBaseModel): + id = CharField(max_length=32, primary_key=True) + document_id = CharField(max_length=32, index=True) + tenant_id = CharField(max_length=32, null=False, index=True) + kb_id = CharField(max_length=32, null=False, index=True) + pipeline_id = CharField(max_length=32, null=True, help_text="Pipeline ID", index=True) + pipeline_title = CharField(max_length=32, null=True, help_text="Pipeline title", index=True) + parser_id = CharField(max_length=32, null=False, help_text="Parser ID", index=True) + document_name = CharField(max_length=255, null=False, help_text="File name") + document_suffix = CharField(max_length=255, null=False, help_text="File suffix") + document_type = CharField(max_length=255, null=False, help_text="Document type") + source_from = CharField(max_length=255, null=False, help_text="Source") + progress = FloatField(default=0, index=True) + progress_msg = TextField(null=True, help_text="process message", default="") + process_begin_at = DateTimeField(null=True, index=True) + process_duration = FloatField(default=0) + dsl = JSONField(null=True, default=dict) + task_type = CharField(max_length=32, null=False, default="") + operation_status = CharField(max_length=32, null=False, help_text="Operation status") + avatar = TextField(null=True, help_text="avatar base64 string") + status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True) + + class Meta: + db_table = "pipeline_operation_log" + + def migrate_db(): logging.disable(logging.ERROR) migrator = DatabaseMigrator[settings.DATABASE_TYPE.upper()].value(DB) diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 23eef474f..71bed0847 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -597,6 +597,22 @@ class DocumentService(CommonService): @DB.connection_context() def update_progress(cls): docs = cls.get_unfinished_docs() + + cls._sync_progress(docs) + + + @classmethod + @DB.connection_context() + def update_progress_immediately(cls, docs:list[dict]): + if not docs: + return + + cls._sync_progress(docs) + + + @classmethod + @DB.connection_context() + def _sync_progress(cls, docs:list[dict]): for d in docs: try: tsks = Task.query(doc_id=d["id"], order_by=Task.create_time) diff --git a/api/db/services/pipeline_operation_log_service.py b/api/db/services/pipeline_operation_log_service.py new file mode 100644 index 000000000..c547a6a06 --- /dev/null +++ b/api/db/services/pipeline_operation_log_service.py @@ -0,0 +1,163 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import json +from datetime import datetime + +from peewee import fn + +from api.db import VALID_PIPELINE_TASK_TYPES +from api.db.db_models import DB, PipelineOperationLog +from api.db.services.canvas_service import UserCanvasService +from api.db.services.common_service import CommonService +from api.db.services.document_service import DocumentService +from api.db.services.knowledgebase_service import KnowledgebaseService +from api.utils import current_timestamp, datetime_format, get_uuid + + +class PipelineOperationLogService(CommonService): + model = PipelineOperationLog + + @classmethod + def get_cls_model_fields(cls): + return [ + cls.model.id, + cls.model.document_id, + cls.model.tenant_id, + cls.model.kb_id, + cls.model.pipeline_id, + cls.model.pipeline_title, + cls.model.parser_id, + cls.model.document_name, + cls.model.document_suffix, + cls.model.document_type, + cls.model.source_from, + cls.model.progress, + cls.model.progress_msg, + cls.model.process_begin_at, + cls.model.process_duration, + cls.model.dsl, + cls.model.task_type, + cls.model.operation_status, + cls.model.avatar, + cls.model.status, + cls.model.create_time, + cls.model.create_date, + cls.model.update_time, + cls.model.update_date, + ] + + @classmethod + @DB.connection_context() + def create(cls, document_id, pipeline_id, task_type): + from rag.flow.pipeline import Pipeline + + tenant_id = "" + title = "" + avatar = "" + dsl = "" + operation_status = "" + + ok, document = DocumentService.get_by_id(document_id) + if not ok: + raise RuntimeError(f"Document {document_id} not found") + DocumentService.update_progress_immediately([document.to_dict()]) + ok, document = DocumentService.get_by_id(document_id) + if not ok: + raise RuntimeError(f"Document {document_id} not found") + operation_status = document.run + + if pipeline_id: + ok, user_pipeline = UserCanvasService.get_by_id(pipeline_id) + if not ok: + raise RuntimeError(f"Pipeline {pipeline_id} not found") + + pipeline = Pipeline(dsl=json.dumps(user_pipeline.dsl), tenant_id=user_pipeline.user_id, doc_id=document_id, task_id="", flow_id=pipeline_id) + + tenant_id = user_pipeline.user_id + title = user_pipeline.title + avatar = user_pipeline.avatar + dsl = json.loads(str(pipeline)) + else: + ok, kb_info = KnowledgebaseService.get_by_id(document.kb_id) + if not ok: + raise RuntimeError(f"Cannot find knowledge base {document.kb_id} for document {document_id}") + + tenant_id = kb_info.tenant_id + title = document.name + avatar = document.thumbnail + + if task_type not in VALID_PIPELINE_TASK_TYPES: + raise ValueError(f"Invalid task type: {task_type}") + + log = dict( + id=get_uuid(), + document_id=document_id, + tenant_id=tenant_id, + kb_id=document.kb_id, + pipeline_id=pipeline_id, + pipeline_title=title, + parser_id=document.parser_id, + document_name=document.name, + document_suffix=document.suffix, + document_type=document.type, + source_from="", # TODO: add in the future + progress=document.progress, + progress_msg=document.progress_msg, + process_begin_at=document.process_begin_at, + process_duration=document.process_duration, + dsl=dsl, + task_type=task_type, + operation_status=operation_status, + avatar=avatar, + ) + log["create_time"] = current_timestamp() + log["create_date"] = datetime_format(datetime.now()) + log["update_time"] = current_timestamp() + log["update_date"] = datetime_format(datetime.now()) + obj = cls.save(**log) + return obj + + @classmethod + @DB.connection_context() + def record_pipeline_operation(cls, document_id, pipeline_id, task_type): + return cls.create(document_id=document_id, pipeline_id=pipeline_id, task_type=task_type) + + @classmethod + @DB.connection_context() + def get_by_kb_id(cls, kb_id, page_number, items_per_page, orderby, desc, keywords, operation_status, types, suffix): + fields = cls.get_cls_model_fields() + if keywords: + logs = cls.model.select(*fields).where((cls.model.kb_id == kb_id), (fn.LOWER(cls.model.document_name).contains(keywords.lower()))) + else: + logs = cls.model.select(*fields).where(cls.model.kb_id == kb_id) + + if operation_status: + logs = logs.where(cls.model.operation_status.in_(operation_status)) + if types: + logs = logs.where(cls.model.document_type.in_(types)) + if suffix: + logs = logs.where(cls.model.document_suffix.in_(suffix)) + + count = logs.count() + if desc: + logs = logs.order_by(cls.model.getter_by(orderby).desc()) + else: + logs = logs.order_by(cls.model.getter_by(orderby).asc()) + + if page_number and items_per_page: + logs = logs.paginate(page_number, items_per_page) + + return list(logs.dicts()), count diff --git a/rag/flow/parser/parser.py b/rag/flow/parser/parser.py index 2d5eeb4a3..d15bed7de 100644 --- a/rag/flow/parser/parser.py +++ b/rag/flow/parser/parser.py @@ -231,7 +231,6 @@ class Parser(ProcessBase): if conf.get("output_format") == "json": self.set_output("json", bboxes) - if conf.get("output_format") == "markdown": mkdn = "" for b in bboxes: @@ -295,6 +294,7 @@ class Parser(ProcessBase): def _markdown(self, name, blob): from functools import reduce + from rag.app.naive import Markdown as naive_markdown_parser from rag.nlp import concat_img @@ -346,7 +346,7 @@ class Parser(ProcessBase): else: # use VLM to describe the picture - cv_model = LLMBundle(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, llm_name=conf["llm_id"],lang=lang) + cv_model = LLMBundle(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, llm_name=conf["llm_id"], lang=lang) img_binary = io.BytesIO() img.save(img_binary, format="JPEG") img_binary.seek(0) diff --git a/rag/flow/pipeline.py b/rag/flow/pipeline.py index 659892f10..cf020f68c 100644 --- a/rag/flow/pipeline.py +++ b/rag/flow/pipeline.py @@ -19,10 +19,13 @@ import logging import random import time from timeit import default_timer as timer + import trio from agent.canvas import Graph +from api.db import PipelineTaskType from api.db.services.document_service import DocumentService +from api.db.services.pipeline_operation_log_service import PipelineOperationLogService from rag.utils.redis_conn import REDIS_CONN @@ -44,22 +47,40 @@ class Pipeline(Graph): obj = json.loads(bin.encode("utf-8")) if obj: if obj[-1]["component_id"] == component_name: - obj[-1]["trace"].append({"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": timestamp-obj[-1]["trace"][-1]["timestamp"]}) + obj[-1]["trace"].append( + { + "progress": progress, + "message": message, + "datetime": datetime.datetime.now().strftime("%H:%M:%S"), + "timestamp": timestamp, + "elapsed_time": timestamp - obj[-1]["trace"][-1]["timestamp"], + } + ) else: - obj.append({"component_id": component_name, "trace": [{"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": 0}]}) + obj.append( + { + "component_id": component_name, + "trace": [{"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": 0}], + } + ) else: - obj = [{"component_id": component_name, "trace": [{"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": 0}]}] + obj = [ + { + "component_id": component_name, + "trace": [{"progress": progress, "message": message, "datetime": datetime.datetime.now().strftime("%H:%M:%S"), "timestamp": timestamp, "elapsed_time": 0}], + } + ] REDIS_CONN.set_obj(log_key, obj, 60 * 30) if self._doc_id: - percentage = 1./len(self.components.items()) + percentage = 1.0 / len(self.components.items()) msg = "" - finished = 0. + finished = 0.0 for o in obj: - if o['component_id'] == "END": + if o["component_id"] == "END": continue msg += f"\n[{o['component_id']}]:\n" for t in o["trace"]: - msg += "%s: %s\n"%(t["datetime"], t["message"]) + msg += "%s: %s\n" % (t["datetime"], t["message"]) if t["progress"] < 0: finished = -1 break @@ -129,8 +150,13 @@ class Pipeline(Graph): self.callback("END", 1, json.dumps(self.get_component_obj(self.path[-1]).output(), ensure_ascii=False)) if self._doc_id: - DocumentService.update_by_id(self._doc_id,{ - "progress": 1 if not self.error else -1, - "progress_msg": "Pipeline finished...\n" + self.error, - "process_duration": time.perf_counter() - st - }) + DocumentService.update_by_id( + self._doc_id, + { + "progress": 1 if not self.error else -1, + "progress_msg": "Pipeline finished...\n" + self.error, + "process_duration": time.perf_counter() - st, + }, + ) + + PipelineOperationLogService.create(document_id=self._doc_id, pipeline_id=self._flow_id, task_type=PipelineTaskType.PARSE) diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 8ce5f0339..fca3c6ccc 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -21,6 +21,7 @@ import sys import threading import time from api.db.services.canvas_service import UserCanvasService +from api.db.services.pipeline_operation_log_service import PipelineOperationLogService from api.utils.api_utils import timeout from api.utils.base64_image import image2id from api.utils.log_utils import init_root_logger, get_project_base_directory @@ -45,7 +46,7 @@ import exceptiongroup import faulthandler import numpy as np from peewee import DoesNotExist -from api.db import LLMType, ParserType +from api.db import LLMType, ParserType, PipelineTaskType from api.db.services.document_service import DocumentService from api.db.services.llm_service import LLMBundle from api.db.services.task_service import TaskService, has_canceled @@ -650,6 +651,7 @@ async def do_handle_task(task): timer() - start_ts)) DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, token_count, chunk_count, 0) + PipelineOperationLogService.record_pipeline_operation(document_id=task_doc_id, pipeline_id="", task_type=PipelineTaskType.PARSE) time_cost = timer() - start_ts task_time_cost = timer() - task_start_ts @@ -685,6 +687,7 @@ async def handle_task(): except Exception: pass logging.exception(f"handle_task got exception for task {json.dumps(task)}") + PipelineOperationLogService.record_pipeline_operation(document_id=task["doc_id"], pipeline_id=task.get("dataflow_id", "") or "", task_type=PipelineTaskType.PARSE) redis_msg.ack()