From 12979a3f21f5fe90c514255267f87e56826f5df2 Mon Sep 17 00:00:00 2001 From: Levi <81591061+levischd@users.noreply.github.com> Date: Wed, 26 Nov 2025 12:55:48 +0100 Subject: [PATCH] feat: improve metadata handling in connector service (#11421) ### What problem does this PR solve? - Update sync data source to handle metadata properly ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Co-authored-by: Kevin Hu --- api/db/services/connector_service.py | 12 ++++++++++++ common/data_source/confluence_connector.py | 1 + common/data_source/discord_connector.py | 1 + common/data_source/models.py | 1 + rag/svr/sync_data_source.py | 11 +++++++---- 5 files changed, 22 insertions(+), 4 deletions(-) diff --git a/api/db/services/connector_service.py b/api/db/services/connector_service.py index db8c713e2..660530c82 100644 --- a/api/db/services/connector_service.py +++ b/api/db/services/connector_service.py @@ -214,9 +214,21 @@ class SyncLogsService(CommonService): err, doc_blob_pairs = FileService.upload_document(kb, files, tenant_id, src) errs.extend(err) + # Create a mapping from filename to metadata for later use + metadata_map = {} + for d in docs: + if d.get("metadata"): + filename = d["semantic_identifier"]+(f"{d['extension']}" if d["semantic_identifier"][::-1].find(d['extension'][::-1])<0 else "") + metadata_map[filename] = d["metadata"] + kb_table_num_map = {} for doc, _ in doc_blob_pairs: doc_ids.append(doc["id"]) + + # Set metadata if available for this document + if doc["name"] in metadata_map: + DocumentService.update_by_id(doc["id"], {"meta_fields": metadata_map[doc["name"]]}) + if not auto_parse or auto_parse == "0": continue DocumentService.run(tenant_id, doc, kb_table_num_map) diff --git a/common/data_source/confluence_connector.py b/common/data_source/confluence_connector.py index 821f79862..a7935ff6d 100644 --- a/common/data_source/confluence_connector.py +++ b/common/data_source/confluence_connector.py @@ -1562,6 +1562,7 @@ class ConfluenceConnector( size_bytes=len(page_content.encode("utf-8")), # Calculate size in bytes doc_updated_at=datetime_from_string(page["version"]["when"]), primary_owners=primary_owners if primary_owners else None, + metadata=metadata if metadata else None, ) except Exception as e: logging.error(f"Error converting page {page.get('id', 'unknown')}: {e}") diff --git a/common/data_source/discord_connector.py b/common/data_source/discord_connector.py index 93a0477b0..46b23443c 100644 --- a/common/data_source/discord_connector.py +++ b/common/data_source/discord_connector.py @@ -65,6 +65,7 @@ def _convert_message_to_document( blob=message.content.encode("utf-8"), extension=".txt", size_bytes=len(message.content.encode("utf-8")), + metadata=metadata if metadata else None, ) diff --git a/common/data_source/models.py b/common/data_source/models.py index 032f26cc8..e956194b8 100644 --- a/common/data_source/models.py +++ b/common/data_source/models.py @@ -94,6 +94,7 @@ class Document(BaseModel): blob: bytes doc_updated_at: datetime size_bytes: int + metadata: Optional[dict[str, Any]] = None class BasicExpertInfo(BaseModel): diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 87e9c2dac..5fb1f198b 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -75,8 +75,9 @@ class SyncBase: min_update = min([doc.doc_updated_at for doc in document_batch]) max_update = max([doc.doc_updated_at for doc in document_batch]) next_update = max([next_update, max_update]) - docs = [ - { + docs = [] + for doc in document_batch: + doc_dict = { "id": doc.id, "connector_id": task["connector_id"], "source": self.SOURCE_NAME, @@ -86,8 +87,10 @@ class SyncBase: "doc_updated_at": doc.doc_updated_at, "blob": doc.blob, } - for doc in document_batch - ] + # Add metadata if present + if doc.metadata: + doc_dict["metadata"] = doc.metadata + docs.append(doc_dict) try: e, kb = KnowledgebaseService.get_by_id(task["kb_id"])