From 34283d4db4fffeaf6b7d63bbbcdb737877c5349c Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Fri, 7 Nov 2025 11:43:59 +0800 Subject: [PATCH] Feat: add data source to pipleline logs . (#11075) ### What problem does this PR solve? #10953 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- api/apps/connector_app.py | 14 ++--- api/apps/document_app.py | 2 + api/db/db_models.py | 5 ++ api/db/services/connector_service.py | 63 ++++++++----------- .../pipeline_operation_log_service.py | 2 +- common/data_source/notion_connector.py | 2 + rag/svr/sync_data_source.py | 2 +- 7 files changed, 45 insertions(+), 45 deletions(-) diff --git a/api/apps/connector_app.py b/api/apps/connector_app.py index ea234c89f..180e14c3b 100644 --- a/api/apps/connector_app.py +++ b/api/apps/connector_app.py @@ -19,7 +19,7 @@ from flask import request from flask_login import login_required, current_user from api.db import InputType -from api.db.services.connector_service import ConnectorService, Connector2KbService, SyncLogsService +from api.db.services.connector_service import ConnectorService, SyncLogsService from api.utils.api_utils import get_json_result, validate_request, get_data_error_result from common.misc_utils import get_uuid from common.constants import RetCode, TaskStatus @@ -88,14 +88,14 @@ def resume(connector_id): return get_json_result(data=True) -@manager.route("//link", methods=["POST"]) # noqa: F821 -@validate_request("kb_ids") +@manager.route("//rebuild", methods=["PUT"]) # noqa: F821 @login_required -def link_kb(connector_id): +@validate_request("kb_id") +def rebuild(connector_id): req = request.json - errors = Connector2KbService.link_kb(connector_id, req["kb_ids"], current_user.id) - if errors: - return get_json_result(data=False, message=errors, code=RetCode.SERVER_ERROR) + err = ConnectorService.rebuild(connector_id, req["kb_id"], current_user.id) + if err: + return get_json_result(data=False, message=err, code=RetCode.SERVER_ERROR) return get_json_result(data=True) diff --git a/api/apps/document_app.py b/api/apps/document_app.py index 4b871c876..c2e37598e 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -260,6 +260,8 @@ def list_docs(): for doc_item in docs: if doc_item["thumbnail"] and not doc_item["thumbnail"].startswith(IMG_BASE64_PREFIX): doc_item["thumbnail"] = f"/v1/document/image/{kb_id}-{doc_item['thumbnail']}" + if doc_item.get("source_type"): + doc_item["source_type"] = doc_item["source_type"].split("/")[0] return get_json_result(data={"total": tol, "docs": docs}) except Exception as e: diff --git a/api/db/db_models.py b/api/db/db_models.py index ce9f29647..2ad1a22cc 100644 --- a/api/db/db_models.py +++ b/api/db/db_models.py @@ -1064,6 +1064,7 @@ class Connector2Kb(DataBaseModel): id = CharField(max_length=32, primary_key=True) connector_id = CharField(max_length=32, null=False, index=True) kb_id = CharField(max_length=32, null=False, index=True) + auto_parse = CharField(max_length=1, null=False, default="1", index=False) class Meta: db_table = "connector2kb" @@ -1282,4 +1283,8 @@ def migrate_db(): migrate(migrator.add_column("tenant_llm", "status", CharField(max_length=1, null=False, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True))) except Exception: pass + try: + migrate(migrator.add_column("connector2kb", "auto_parse", CharField(max_length=1, null=False, default="1", index=False))) + except Exception: + pass logging.disable(logging.NOTSET) diff --git a/api/db/services/connector_service.py b/api/db/services/connector_service.py index 92719f887..daca5dd7a 100644 --- a/api/db/services/connector_service.py +++ b/api/db/services/connector_service.py @@ -54,7 +54,6 @@ class ConnectorService(CommonService): SyncLogsService.update_by_id(task["id"], task) ConnectorService.update_by_id(connector_id, {"status": status}) - @classmethod def list(cls, tenant_id): fields = [ @@ -67,6 +66,15 @@ class ConnectorService(CommonService): cls.model.tenant_id == tenant_id ).dicts()) + @classmethod + def rebuild(cls, kb_id:str, connector_id: str, tenant_id:str): + e, conn = cls.get_by_id(connector_id) + if not e: + return + SyncLogsService.filter_delete([SyncLogs.connector_id==connector_id, SyncLogs.kb_id==kb_id]) + docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}") + return FileService.delete_docs([d.id for d in docs], tenant_id) + class SyncLogsService(CommonService): model = SyncLogs @@ -91,6 +99,7 @@ class SyncLogsService(CommonService): Connector.timeout_secs, Knowledgebase.name.alias("kb_name"), Knowledgebase.avatar.alias("kb_avatar"), + Connector2Kb.auto_parse, cls.model.from_beginning.alias("reindex"), cls.model.status ] @@ -179,7 +188,7 @@ class SyncLogsService(CommonService): .where(cls.model.id == id).execute() @classmethod - def duplicate_and_parse(cls, kb, docs, tenant_id, src): + def duplicate_and_parse(cls, kb, docs, tenant_id, src, auto_parse=True): if not docs: return None @@ -191,14 +200,17 @@ class SyncLogsService(CommonService): return self.blob errs = [] - files = [FileObj(filename=d["semantic_identifier"]+f".{d['extension']}", blob=d["blob"]) for d in docs] + files = [FileObj(filename=d["semantic_identifier"]+(f"{d['extension']}" if d["semantic_identifier"][::-1].find(d['extension'][::-1])<0 else ""), blob=d["blob"]) for d in docs] doc_ids = [] err, doc_blob_pairs = FileService.upload_document(kb, files, tenant_id, src) errs.extend(err) + kb_table_num_map = {} for doc, _ in doc_blob_pairs: - DocumentService.run(tenant_id, doc, kb_table_num_map) doc_ids.append(doc["id"]) + if not auto_parse or auto_parse == "0": + continue + DocumentService.run(tenant_id, doc, kb_table_num_map) return errs, doc_ids @@ -213,33 +225,6 @@ class SyncLogsService(CommonService): class Connector2KbService(CommonService): model = Connector2Kb - @classmethod - def link_kb(cls, conn_id:str, kb_ids: list[str], tenant_id:str): - arr = cls.query(connector_id=conn_id) - old_kb_ids = [a.kb_id for a in arr] - for kb_id in kb_ids: - if kb_id in old_kb_ids: - continue - cls.save(**{ - "id": get_uuid(), - "connector_id": conn_id, - "kb_id": kb_id - }) - SyncLogsService.schedule(conn_id, kb_id, reindex=True) - - errs = [] - e, conn = ConnectorService.get_by_id(conn_id) - for kb_id in old_kb_ids: - if kb_id in kb_ids: - continue - cls.filter_delete([cls.model.kb_id==kb_id, cls.model.connector_id==conn_id]) - SyncLogsService.filter_update([SyncLogs.connector_id==conn_id, SyncLogs.kb_id==kb_id, SyncLogs.status==TaskStatus.SCHEDULE], {"status": TaskStatus.CANCEL}) - docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}") - err = FileService.delete_docs([d.id for d in docs], tenant_id) - if err: - errs.append(err) - return "\n".join(errs) - @classmethod def link_connectors(cls, kb_id:str, connector_ids: list[str], tenant_id:str): arr = cls.query(kb_id=kb_id) @@ -260,11 +245,15 @@ class Connector2KbService(CommonService): continue cls.filter_delete([cls.model.kb_id==kb_id, cls.model.connector_id==conn_id]) e, conn = ConnectorService.get_by_id(conn_id) - SyncLogsService.filter_update([SyncLogs.connector_id==conn_id, SyncLogs.kb_id==kb_id, SyncLogs.status==TaskStatus.SCHEDULE], {"status": TaskStatus.CANCEL}) - docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}") - err = FileService.delete_docs([d.id for d in docs], tenant_id) - if err: - errs.append(err) + if not e: + continue + #SyncLogsService.filter_delete([SyncLogs.connector_id==conn_id, SyncLogs.kb_id==kb_id]) + # Do not delete docs while unlinking. + SyncLogsService.filter_update([SyncLogs.connector_id==conn_id, SyncLogs.kb_id==kb_id, SyncLogs.status.in_([TaskStatus.SCHEDULE, TaskStatus.RUNNING])], {"status": TaskStatus.CANCEL}) + #docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}") + #err = FileService.delete_docs([d.id for d in docs], tenant_id) + #if err: + # errs.append(err) return "\n".join(errs) @classmethod @@ -282,3 +271,5 @@ class Connector2KbService(CommonService): ).dicts() ) + + diff --git a/api/db/services/pipeline_operation_log_service.py b/api/db/services/pipeline_operation_log_service.py index 2d5e71ca0..c3c333665 100644 --- a/api/db/services/pipeline_operation_log_service.py +++ b/api/db/services/pipeline_operation_log_service.py @@ -159,7 +159,7 @@ class PipelineOperationLogService(CommonService): document_name=document.name, document_suffix=document.suffix, document_type=document.type, - source_from="", # TODO: add in the future + source_from=document.source_type.split("/")[0], progress=document.progress, progress_msg=document.progress_msg, process_begin_at=document.process_begin_at, diff --git a/common/data_source/notion_connector.py b/common/data_source/notion_connector.py index 082caa87b..f77184e31 100644 --- a/common/data_source/notion_connector.py +++ b/common/data_source/notion_connector.py @@ -253,6 +253,8 @@ class NotionConnector(LoadConnector, PollConnector): all_child_page_ids: list[str] = [] for page in pages: + if isinstance(page, dict): + page = NotionPage(**page) if page.id in self.indexed_pages: logging.debug(f"Already indexed page with ID '{page.id}'. Skipping.") continue diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index f98693f6f..ba63da82a 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -78,7 +78,7 @@ class SyncBase: } for doc in document_batch] e, kb = KnowledgebaseService.get_by_id(task["kb_id"]) - err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}") + err, dids = SyncLogsService.duplicate_and_parse(kb, docs, task["tenant_id"], f"{self.SOURCE_NAME}/{task['connector_id']}", task["auto_parse"]) SyncLogsService.increase_docs(task["id"], min_update, max_update, len(docs), "\n".join(err), len(err)) doc_num += len(docs)