From 9a5208976c8283197a659648c7bbb3f297fcb36e Mon Sep 17 00:00:00 2001 From: qinling0210 <88864212+qinling0210@users.noreply.github.com> Date: Wed, 28 Jan 2026 13:29:34 +0800 Subject: [PATCH] Put document metadata in ES/Infinity (#12826) ### What problem does this PR solve? Put document metadata in ES/Infinity. Index name of meta data: ragflow_doc_meta_{tenant_id} ### Type of change - [x] Refactoring --- agent/tools/retrieval.py | 4 +- api/apps/chunk_app.py | 3 +- api/apps/document_app.py | 21 +- api/apps/kb_app.py | 3 +- api/apps/sdk/dify_retrieval.py | 3 +- api/apps/sdk/doc.py | 14 +- api/apps/sdk/session.py | 8 +- api/db/db_models.py | 2 - api/db/joint_services/user_account_service.py | 24 + api/db/services/connector_service.py | 3 +- api/db/services/dialog_service.py | 8 +- api/db/services/doc_metadata_service.py | 1073 +++++++++++++++++ api/db/services/document_service.py | 277 +---- common/doc_store/es_conn_base.py | 24 +- common/doc_store/infinity_conn_base.py | 69 +- common/metadata_utils.py | 4 +- conf/doc_meta_es_mapping.json | 29 + conf/doc_meta_infinity_mapping.json | 5 + rag/prompts/generator.py | 8 +- rag/svr/task_executor.py | 21 +- rag/utils/es_conn.py | 6 +- rag/utils/infinity_conn.py | 36 +- .../test_metadata_retrieval.py | 184 +++ .../test_metadata_summary.py | 4 +- 24 files changed, 1529 insertions(+), 304 deletions(-) create mode 100644 api/db/services/doc_metadata_service.py create mode 100644 conf/doc_meta_es_mapping.json create mode 100644 conf/doc_meta_infinity_mapping.json create mode 100644 test/testcases/test_http_api/test_file_management_within_dataset/test_metadata_retrieval.py diff --git a/agent/tools/retrieval.py b/agent/tools/retrieval.py index 2a19b74ef..29bddde23 100644 --- a/agent/tools/retrieval.py +++ b/agent/tools/retrieval.py @@ -21,7 +21,7 @@ import re from abc import ABC from agent.tools.base import ToolParamBase, ToolBase, ToolMeta from common.constants import LLMType -from api.db.services.document_service import DocumentService +from api.db.services.doc_metadata_service import DocMetadataService from common.metadata_utils import apply_meta_data_filter from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.llm_service import LLMBundle @@ -125,7 +125,7 @@ class Retrieval(ToolBase, ABC): doc_ids = [] if self._param.meta_data_filter != {}: - metas = DocumentService.get_meta_by_kbs(kb_ids) + metas = DocMetadataService.get_flatted_meta_by_kbs(kb_ids) def _resolve_manual_filter(flt: dict) -> dict: pat = re.compile(self.variable_ref_patt) diff --git a/api/apps/chunk_app.py b/api/apps/chunk_app.py index 18e9998d3..c1be1ef88 100644 --- a/api/apps/chunk_app.py +++ b/api/apps/chunk_app.py @@ -22,6 +22,7 @@ import xxhash from quart import request from api.db.services.document_service import DocumentService +from api.db.services.doc_metadata_service import DocMetadataService from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.llm_service import LLMBundle from common.metadata_utils import apply_meta_data_filter @@ -381,7 +382,7 @@ async def retrieval_test(): chat_mdl = LLMBundle(user_id, LLMType.CHAT) if meta_data_filter: - metas = DocumentService.get_meta_by_kbs(kb_ids) + metas = DocMetadataService.get_flatted_meta_by_kbs(kb_ids) local_doc_ids = await apply_meta_data_filter(meta_data_filter, metas, question, chat_mdl, local_doc_ids) tenants = UserTenantService.query(user_id=user_id) diff --git a/api/apps/document_app.py b/api/apps/document_app.py index 5d86c5692..776db58c7 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -26,6 +26,7 @@ from api.db import VALID_FILE_TYPES, FileType from api.db.db_models import Task from api.db.services import duplicate_name from api.db.services.document_service import DocumentService, doc_upload_and_parse +from api.db.services.doc_metadata_service import DocMetadataService from common.metadata_utils import meta_filter, convert_conditions, turn2jsonschema from api.db.services.file2document_service import File2DocumentService from api.db.services.file_service import FileService @@ -281,7 +282,7 @@ async def list_docs(): doc_ids_filter = None metas = None if metadata_condition or metadata: - metas = DocumentService.get_flatted_meta_by_kbs([kb_id]) + metas = DocMetadataService.get_flatted_meta_by_kbs([kb_id]) if metadata_condition: doc_ids_filter = set(meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and"))) @@ -401,7 +402,11 @@ async def doc_infos(): if not DocumentService.accessible(doc_id, current_user.id): return get_json_result(data=False, message="No authorization.", code=RetCode.AUTHENTICATION_ERROR) docs = DocumentService.get_by_ids(doc_ids) - return get_json_result(data=list(docs.dicts())) + docs_list = list(docs.dicts()) + # Add meta_fields for each document + for doc in docs_list: + doc["meta_fields"] = DocMetadataService.get_document_metadata(doc["id"]) + return get_json_result(data=docs_list) @manager.route("/metadata/summary", methods=["POST"]) # noqa: F821 @@ -421,7 +426,7 @@ async def metadata_summary(): return get_json_result(data=False, message="Only owner of dataset authorized for this operation.", code=RetCode.OPERATING_ERROR) try: - summary = DocumentService.get_metadata_summary(kb_id, doc_ids) + summary = DocMetadataService.get_metadata_summary(kb_id, doc_ids) return get_json_result(data={"summary": summary}) except Exception as e: return server_error_response(e) @@ -432,10 +437,14 @@ async def metadata_summary(): @validate_request("doc_ids") async def metadata_update(): req = await get_request_json() + kb_id = req.get("kb_id") document_ids = req.get("doc_ids") updates = req.get("updates", []) or [] deletes = req.get("deletes", []) or [] + if not kb_id: + return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR) + if not isinstance(updates, list) or not isinstance(deletes, list): return get_json_result(data=False, message="updates and deletes must be lists.", code=RetCode.ARGUMENT_ERROR) @@ -446,8 +455,8 @@ async def metadata_update(): if not isinstance(d, dict) or not d.get("key"): return get_json_result(data=False, message="Each delete requires key.", code=RetCode.ARGUMENT_ERROR) - updated = DocumentService.batch_update_metadata(None, document_ids, updates, deletes) - return get_json_result(data={"updated": updated}) + updated = DocMetadataService.batch_update_metadata(kb_id, document_ids, updates, deletes) + return get_json_result(data={"updated": updated, "matched_docs": len(document_ids)}) @manager.route("/update_metadata_setting", methods=["POST"]) # noqa: F821 @@ -905,7 +914,7 @@ async def set_meta(): if not e: return get_data_error_result(message="Document not found!") - if not DocumentService.update_by_id(req["doc_id"], {"meta_fields": meta}): + if not DocMetadataService.update_document_metadata(req["doc_id"], meta): return get_data_error_result(message="Database error (meta updates)!") return get_json_result(data=True) diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index c8207d007..efb028bf1 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -25,6 +25,7 @@ import numpy as np from api.db.services.connector_service import Connector2KbService from api.db.services.llm_service import LLMBundle from api.db.services.document_service import DocumentService, queue_raptor_o_graphrag_tasks +from api.db.services.doc_metadata_service import DocMetadataService from api.db.services.file2document_service import File2DocumentService from api.db.services.file_service import FileService from api.db.services.pipeline_operation_log_service import PipelineOperationLogService @@ -467,7 +468,7 @@ def get_meta(): message='No authorization.', code=RetCode.AUTHENTICATION_ERROR ) - return get_json_result(data=DocumentService.get_meta_by_kbs(kb_ids)) + return get_json_result(data=DocMetadataService.get_flatted_meta_by_kbs(kb_ids)) @manager.route("/basic_info", methods=["GET"]) # noqa: F821 diff --git a/api/apps/sdk/dify_retrieval.py b/api/apps/sdk/dify_retrieval.py index 64752fee4..881614e5d 100644 --- a/api/apps/sdk/dify_retrieval.py +++ b/api/apps/sdk/dify_retrieval.py @@ -18,6 +18,7 @@ import logging from quart import jsonify from api.db.services.document_service import DocumentService +from api.db.services.doc_metadata_service import DocMetadataService from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.llm_service import LLMBundle from common.metadata_utils import meta_filter, convert_conditions @@ -121,7 +122,7 @@ async def retrieval(tenant_id): similarity_threshold = float(retrieval_setting.get("score_threshold", 0.0)) top = int(retrieval_setting.get("top_k", 1024)) metadata_condition = req.get("metadata_condition", {}) or {} - metas = DocumentService.get_meta_by_kbs([kb_id]) + metas = DocMetadataService.get_meta_by_kbs([kb_id]) doc_ids = [] try: diff --git a/api/apps/sdk/doc.py b/api/apps/sdk/doc.py index 1bf495607..44cb07735 100644 --- a/api/apps/sdk/doc.py +++ b/api/apps/sdk/doc.py @@ -29,6 +29,7 @@ from api.constants import FILE_NAME_LEN_LIMIT from api.db import FileType from api.db.db_models import File, Task from api.db.services.document_service import DocumentService +from api.db.services.doc_metadata_service import DocMetadataService from api.db.services.file2document_service import File2DocumentService from api.db.services.file_service import FileService from api.db.services.knowledgebase_service import KnowledgebaseService @@ -255,7 +256,8 @@ async def update_doc(tenant_id, dataset_id, document_id): if "meta_fields" in req: if not isinstance(req["meta_fields"], dict): return get_error_data_result(message="meta_fields must be a dictionary") - DocumentService.update_meta_fields(document_id, req["meta_fields"]) + if not DocMetadataService.update_document_metadata(document_id, req["meta_fields"]): + return get_error_data_result(message="Failed to update metadata") if "name" in req and req["name"] != doc.name: if len(req["name"].encode("utf-8")) > FILE_NAME_LEN_LIMIT: @@ -568,7 +570,7 @@ def list_docs(dataset_id, tenant_id): doc_ids_filter = None if metadata_condition: - metas = DocumentService.get_flatted_meta_by_kbs([dataset_id]) + metas = DocMetadataService.get_flatted_meta_by_kbs([dataset_id]) doc_ids_filter = meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and")) if metadata_condition.get("conditions") and not doc_ids_filter: return get_result(data={"total": 0, "docs": []}) @@ -611,7 +613,7 @@ async def metadata_summary(dataset_id, tenant_id): return get_error_data_result(message=f"You don't own the dataset {dataset_id}. ") req = await get_request_json() try: - summary = DocumentService.get_metadata_summary(dataset_id, req.get("doc_ids")) + summary = DocMetadataService.get_metadata_summary(dataset_id, req.get("doc_ids")) return get_result(data={"summary": summary}) except Exception as e: return server_error_response(e) @@ -657,14 +659,14 @@ async def metadata_batch_update(dataset_id, tenant_id): target_doc_ids = set(document_ids) if metadata_condition: - metas = DocumentService.get_flatted_meta_by_kbs([dataset_id]) + metas = DocMetadataService.get_flatted_meta_by_kbs([dataset_id]) filtered_ids = set(meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and"))) target_doc_ids = target_doc_ids & filtered_ids if metadata_condition.get("conditions") and not target_doc_ids: return get_result(data={"updated": 0, "matched_docs": 0}) target_doc_ids = list(target_doc_ids) - updated = DocumentService.batch_update_metadata(dataset_id, target_doc_ids, updates, deletes) + updated = DocMetadataService.batch_update_metadata(dataset_id, target_doc_ids, updates, deletes) return get_result(data={"updated": updated, "matched_docs": len(target_doc_ids)}) @manager.route("/datasets//documents", methods=["DELETE"]) # noqa: F821 @@ -1534,7 +1536,7 @@ async def retrieval_test(tenant_id): if not doc_ids: metadata_condition = req.get("metadata_condition") if metadata_condition: - metas = DocumentService.get_meta_by_kbs(kb_ids) + metas = DocMetadataService.get_meta_by_kbs(kb_ids) doc_ids = meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and")) # If metadata_condition has conditions but no docs match, return empty result if not doc_ids and metadata_condition.get("conditions"): diff --git a/api/apps/sdk/session.py b/api/apps/sdk/session.py index 3b10ff3d8..29466694d 100644 --- a/api/apps/sdk/session.py +++ b/api/apps/sdk/session.py @@ -35,7 +35,7 @@ from api.db.services.conversation_service import ConversationService from api.db.services.conversation_service import async_iframe_completion as iframe_completion from api.db.services.conversation_service import async_completion as rag_completion from api.db.services.dialog_service import DialogService, async_ask, async_chat, gen_mindmap -from api.db.services.document_service import DocumentService +from api.db.services.doc_metadata_service import DocMetadataService from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.llm_service import LLMBundle from common.metadata_utils import apply_meta_data_filter, convert_conditions, meta_filter @@ -147,7 +147,7 @@ async def chat_completion(tenant_id, chat_id): return get_error_data_result(message="metadata_condition must be an object.") if metadata_condition and req.get("question"): - metas = DocumentService.get_meta_by_kbs(dia.kb_ids or []) + metas = DocMetadataService.get_flatted_meta_by_kbs(dia.kb_ids or []) filtered_doc_ids = meta_filter( metas, convert_conditions(metadata_condition), @@ -279,7 +279,7 @@ async def chat_completion_openai_like(tenant_id, chat_id): doc_ids_str = None if metadata_condition: - metas = DocumentService.get_meta_by_kbs(dia.kb_ids or []) + metas = DocMetadataService.get_flatted_meta_by_kbs(dia.kb_ids or []) filtered_doc_ids = meta_filter( metas, convert_conditions(metadata_condition), @@ -1084,7 +1084,7 @@ async def retrieval_test_embedded(): chat_mdl = LLMBundle(tenant_id, LLMType.CHAT) if meta_data_filter: - metas = DocumentService.get_meta_by_kbs(kb_ids) + metas = DocMetadataService.get_flatted_meta_by_kbs(kb_ids) local_doc_ids = await apply_meta_data_filter(meta_data_filter, metas, _question, chat_mdl, local_doc_ids) tenants = UserTenantService.query(user_id=tenant_id) diff --git a/api/db/db_models.py b/api/db/db_models.py index 4c71c36f1..c9db474b8 100644 --- a/api/db/db_models.py +++ b/api/db/db_models.py @@ -795,7 +795,6 @@ class Document(DataBaseModel): progress_msg = TextField(null=True, help_text="process message", default="") process_begin_at = DateTimeField(null=True, index=True) process_duration = FloatField(default=0) - meta_fields = JSONField(null=True, default={}) suffix = CharField(max_length=32, null=False, help_text="The real file extension suffix", index=True) run = CharField(max_length=1, null=True, help_text="start to run processing or cancel.(1: run it; 2: cancel)", default="0", index=True) @@ -1267,7 +1266,6 @@ def migrate_db(): alter_db_add_column(migrator, "task", "digest", TextField(null=True, help_text="task digest", default="")) alter_db_add_column(migrator, "task", "chunk_ids", LongTextField(null=True, help_text="chunk ids", default="")) alter_db_add_column(migrator, "conversation", "user_id", CharField(max_length=255, null=True, help_text="user_id", index=True)) - alter_db_add_column(migrator, "document", "meta_fields", JSONField(null=True, default={})) alter_db_add_column(migrator, "task", "task_type", CharField(max_length=32, null=False, default="")) alter_db_add_column(migrator, "task", "priority", IntegerField(default=0)) alter_db_add_column(migrator, "user_canvas", "permission", CharField(max_length=16, null=False, help_text="me|team", default="me", index=True)) diff --git a/api/db/joint_services/user_account_service.py b/api/db/joint_services/user_account_service.py index 2e4dfeaab..7490c9bad 100644 --- a/api/db/joint_services/user_account_service.py +++ b/api/db/joint_services/user_account_service.py @@ -23,6 +23,7 @@ from api.db.services.canvas_service import UserCanvasService from api.db.services.conversation_service import ConversationService from api.db.services.dialog_service import DialogService from api.db.services.document_service import DocumentService +from api.db.services.doc_metadata_service import DocMetadataService from api.db.services.file2document_service import File2DocumentService from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.langfuse_service import TenantLangfuseService @@ -107,6 +108,11 @@ def create_new_user(user_info: dict) -> dict: except Exception as create_error: logging.exception(create_error) # rollback + try: + metadata_index_name = DocMetadataService._get_doc_meta_index_name(user_id) + settings.docStoreConn.delete_idx(metadata_index_name, "") + except Exception as e: + logging.exception(e) try: TenantService.delete_by_id(user_id) except Exception as e: @@ -165,6 +171,12 @@ def delete_user_data(user_id: str) -> dict: # step1.1.2 delete file and document info in db doc_ids = DocumentService.get_all_doc_ids_by_kb_ids(kb_ids) if doc_ids: + for doc in doc_ids: + try: + DocMetadataService.delete_document_metadata(doc["id"], skip_empty_check=True) + except Exception as e: + logging.warning(f"Failed to delete metadata for document {doc['id']}: {e}") + doc_delete_res = DocumentService.delete_by_ids([i["id"] for i in doc_ids]) done_msg += f"- Deleted {doc_delete_res} document records.\n" task_delete_res = TaskService.delete_by_doc_ids([i["id"] for i in doc_ids]) @@ -202,6 +214,13 @@ def delete_user_data(user_id: str) -> dict: done_msg += f"- Deleted {llm_delete_res} tenant-LLM records.\n" langfuse_delete_res = TenantLangfuseService.delete_ty_tenant_id(tenant_id) done_msg += f"- Deleted {langfuse_delete_res} langfuse records.\n" + try: + metadata_index_name = DocMetadataService._get_doc_meta_index_name(tenant_id) + settings.docStoreConn.delete_idx(metadata_index_name, "") + done_msg += f"- Deleted metadata table {metadata_index_name}.\n" + except Exception as e: + logging.warning(f"Failed to delete metadata table for tenant {tenant_id}: {e}") + done_msg += "- Warning: Failed to delete metadata table (continuing).\n" # step1.3 delete memory and messages user_memory = MemoryService.get_by_tenant_id(tenant_id) if user_memory: @@ -269,6 +288,11 @@ def delete_user_data(user_id: str) -> dict: # step2.1.5 delete document record doc_delete_res = DocumentService.delete_by_ids([d['id'] for d in created_documents]) done_msg += f"- Deleted {doc_delete_res} documents.\n" + for doc in created_documents: + try: + DocMetadataService.delete_document_metadata(doc['id']) + except Exception as e: + logging.warning(f"Failed to delete metadata for document {doc['id']}: {e}") # step2.1.6 update dataset doc&chunk&token cnt for kb_id, doc_num in kb_doc_info.items(): KnowledgebaseService.decrease_document_num_in_delete(kb_id, doc_num) diff --git a/api/db/services/connector_service.py b/api/db/services/connector_service.py index 0a7b5cb71..d2fcb1b41 100644 --- a/api/db/services/connector_service.py +++ b/api/db/services/connector_service.py @@ -25,6 +25,7 @@ from api.db import InputType from api.db.db_models import Connector, SyncLogs, Connector2Kb, Knowledgebase from api.db.services.common_service import CommonService from api.db.services.document_service import DocumentService +from api.db.services.document_service import DocMetadataService from common.misc_utils import get_uuid from common.constants import TaskStatus from common.time_utils import current_timestamp, timestamp_to_date @@ -227,7 +228,7 @@ class SyncLogsService(CommonService): # Set metadata if available for this document if doc["name"] in metadata_map: - DocumentService.update_by_id(doc["id"], {"meta_fields": metadata_map[doc["name"]]}) + DocMetadataService.update_document_metadata(doc["id"], metadata_map[doc["name"]]) if not auto_parse or auto_parse == "0": continue diff --git a/api/db/services/dialog_service.py b/api/db/services/dialog_service.py index ae2ff758c..1a25a8308 100644 --- a/api/db/services/dialog_service.py +++ b/api/db/services/dialog_service.py @@ -28,7 +28,7 @@ from api.db.services.file_service import FileService from common.constants import LLMType, ParserType, StatusEnum from api.db.db_models import DB, Dialog from api.db.services.common_service import CommonService -from api.db.services.document_service import DocumentService +from api.db.services.doc_metadata_service import DocMetadataService from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.langfuse_service import TenantLangfuseService from api.db.services.llm_service import LLMBundle @@ -355,7 +355,7 @@ async def async_chat(dialog, messages, stream=True, **kwargs): questions = [await cross_languages(dialog.tenant_id, dialog.llm_id, questions[0], prompt_config["cross_languages"])] if dialog.meta_data_filter: - metas = DocumentService.get_meta_by_kbs(dialog.kb_ids) + metas = DocMetadataService.get_flatted_meta_by_kbs(dialog.kb_ids) attachments = await apply_meta_data_filter( dialog.meta_data_filter, metas, @@ -1048,7 +1048,7 @@ async def async_ask(question, kb_ids, tenant_id, chat_llm_name=None, search_conf tenant_ids = list(set([kb.tenant_id for kb in kbs])) if meta_data_filter: - metas = DocumentService.get_meta_by_kbs(kb_ids) + metas = DocMetadataService.get_flatted_meta_by_kbs(kb_ids) doc_ids = await apply_meta_data_filter(meta_data_filter, metas, question, chat_mdl, doc_ids) kbinfos = await retriever.retrieval( @@ -1124,7 +1124,7 @@ async def gen_mindmap(question, kb_ids, tenant_id, search_config={}): rerank_mdl = LLMBundle(tenant_id, LLMType.RERANK, rerank_id) if meta_data_filter: - metas = DocumentService.get_meta_by_kbs(kb_ids) + metas = DocMetadataService.get_flatted_meta_by_kbs(kb_ids) doc_ids = await apply_meta_data_filter(meta_data_filter, metas, question, chat_mdl, doc_ids) ranks = await settings.retriever.retrieval( diff --git a/api/db/services/doc_metadata_service.py b/api/db/services/doc_metadata_service.py new file mode 100644 index 000000000..98820f906 --- /dev/null +++ b/api/db/services/doc_metadata_service.py @@ -0,0 +1,1073 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Document Metadata Service + +Manages document-level metadata storage in ES/Infinity. +This is the SOLE source of truth for document metadata - MySQL meta_fields column has been removed. +""" + +import json +import logging +from copy import deepcopy +from typing import Dict, List, Optional + +from api.db.db_models import DB, Document +from common import settings +from common.metadata_utils import dedupe_list + + +class DocMetadataService: + """Service for managing document metadata in ES/Infinity""" + + @staticmethod + def _get_doc_meta_index_name(tenant_id: str) -> str: + """ + Get the index name for document metadata. + + Args: + tenant_id: Tenant ID + + Returns: + Index name for document metadata + """ + return f"ragflow_doc_meta_{tenant_id}" + + @staticmethod + def _extract_metadata(flat_meta: Dict) -> Dict: + """ + Extract metadata from ES/Infinity document format. + + Args: + flat_meta: Raw document from ES/Infinity with meta_fields field + + Returns: + Simple metadata dictionary + """ + if not flat_meta: + return {} + + meta_fields = flat_meta.get('meta_fields') + if not meta_fields: + return {} + + # Parse JSON string if needed + if isinstance(meta_fields, str): + import json + try: + return json.loads(meta_fields) + except json.JSONDecodeError: + return {} + + # Already a dict, return as-is + if isinstance(meta_fields, dict): + return meta_fields + + return {} + + @staticmethod + def _extract_doc_id(doc: Dict, hit: Dict = None) -> str: + """ + Extract document ID from various formats. + + Args: + doc: Document dictionary (from DataFrame or list format) + hit: Hit dictionary (from ES format with _id field) + + Returns: + Document ID or empty string + """ + if hit: + # ES format: doc is in _source, id is in _id + return hit.get('_id', '') + # DataFrame or list format: check multiple possible fields + return doc.get("doc_id") or doc.get("_id") or doc.get("id", "") + + @classmethod + def _iter_search_results(cls, results): + """ + Iterate over search results in various formats (DataFrame, ES, list). + + Yields: + Tuple of (doc_id, doc_dict) for each document + + Args: + results: Search results from ES/Infinity in any format + """ + # Handle tuple return from Infinity: (DataFrame, int) + # Check this FIRST because pandas DataFrames also have __getitem__ + if isinstance(results, tuple) and len(results) == 2: + results = results[0] # Extract DataFrame from tuple + + # Check if results is a pandas DataFrame (from Infinity) + if hasattr(results, 'iterrows'): + # Handle pandas DataFrame - use iterrows() to iterate over rows + for _, row in results.iterrows(): + doc = dict(row) # Convert Series to dict + doc_id = cls._extract_doc_id(doc) + if doc_id: + yield doc_id, doc + + # Check if ES format (has 'hits' key) + # Note: ES returns ObjectApiResponse which is dict-like but not isinstance(dict) + elif hasattr(results, '__getitem__') and 'hits' in results: + # ES format: {"hits": {"hits": [{"_source": {...}, "_id": "..."}]}} + hits = results.get('hits', {}).get('hits', []) + for hit in hits: + doc = hit.get('_source', {}) + doc_id = cls._extract_doc_id(doc, hit) + if doc_id: + yield doc_id, doc + + # Handle list of dicts or other formats + elif isinstance(results, list): + for res in results: + if isinstance(res, dict): + docs = [res] + else: + docs = res + + for doc in docs: + doc_id = cls._extract_doc_id(doc) + if doc_id: + yield doc_id, doc + + @classmethod + def _search_metadata(cls, kb_id: str, condition: Dict = None, limit: int = 10000): + """ + Common search logic for metadata queries. + + Args: + kb_id: Knowledge base ID + condition: Optional search condition (defaults to {"kb_id": kb_id}) + limit: Max results to return + + Returns: + Search results from ES/Infinity + """ + from api.db.db_models import Knowledgebase + from common.doc_store.doc_store_base import OrderByExpr + + kb = Knowledgebase.get_by_id(kb_id) + if not kb: + return None + + tenant_id = kb.tenant_id + index_name = cls._get_doc_meta_index_name(tenant_id) + + if condition is None: + condition = {"kb_id": kb_id} + + order_by = OrderByExpr() + + return settings.docStoreConn.search( + select_fields=["*"], + highlight_fields=[], + condition=condition, + match_expressions=[], + order_by=order_by, + offset=0, + limit=limit, + index_names=index_name, + knowledgebase_ids=[kb_id] + ) + + @classmethod + def _split_combined_values(cls, meta_fields: Dict) -> Dict: + """ + Post-process metadata to split combined values by common delimiters. + + For example: "关羽、孙权、张辽" -> ["关羽", "孙权", "张辽"] + This fixes LLM extraction where multiple values are extracted as one combined value. + Also removes duplicates after splitting. + + Args: + meta_fields: Metadata dictionary + + Returns: + Processed metadata with split values + """ + import re + + if not meta_fields or not isinstance(meta_fields, dict): + return meta_fields + + processed = {} + for key, value in meta_fields.items(): + if isinstance(value, list): + # Process each item in the list + new_values = [] + for item in value: + if isinstance(item, str): + # Split by common delimiters: Chinese comma (、), regular comma (,), pipe (|), semicolon (;), Chinese semicolon (;) + # Also handle mixed delimiters and spaces + split_items = re.split(r'[、,,;;|]+', item.strip()) + # Trim whitespace and filter empty strings + split_items = [s.strip() for s in split_items if s.strip()] + if split_items: + new_values.extend(split_items) + else: + # Keep original if no split happened + new_values.append(item) + else: + new_values.append(item) + # Remove duplicates while preserving order + processed[key] = list(dict.fromkeys(new_values)) + else: + processed[key] = value + + if processed != meta_fields: + logging.debug(f"[METADATA SPLIT] Split combined values: {meta_fields} -> {processed}") + return processed + + @classmethod + @DB.connection_context() + def insert_document_metadata(cls, doc_id: str, meta_fields: Dict) -> bool: + """ + Insert document metadata into ES/Infinity. + + Args: + doc_id: Document ID + meta_fields: Metadata dictionary + + Returns: + True if successful, False otherwise + """ + try: + from api.db.db_models import Knowledgebase + + # Get document with tenant_id (need to join with Knowledgebase) + doc_query = Document.select(Document, Knowledgebase.tenant_id).join( + Knowledgebase, on=(Knowledgebase.id == Document.kb_id) + ).where(Document.id == doc_id) + + doc = doc_query.first() + if not doc: + logging.warning(f"Document {doc_id} not found for metadata insertion") + return False + + # Extract document fields + doc_obj = doc # This is the Document object + tenant_id = doc.knowledgebase.tenant_id # Get tenant_id from joined Knowledgebase + kb_id = doc_obj.kb_id + + # Prepare metadata document + doc_meta = { + "id": doc_obj.id, + "kb_id": kb_id, + } + + # Store metadata as JSON object in meta_fields column (same as MySQL structure) + if meta_fields: + # Post-process to split combined values by common delimiters + meta_fields = cls._split_combined_values(meta_fields) + doc_meta["meta_fields"] = meta_fields + else: + doc_meta["meta_fields"] = {} + + # Ensure index/table exists (per-tenant for both ES and Infinity) + index_name = cls._get_doc_meta_index_name(tenant_id) + + # Check if table exists + table_exists = settings.docStoreConn.index_exist(index_name, kb_id) + logging.debug(f"Metadata table exists check: {index_name} -> {table_exists}") + + # Create index if it doesn't exist + if not table_exists: + logging.debug(f"Creating metadata table: {index_name}") + # Both ES and Infinity now use per-tenant metadata tables + result = settings.docStoreConn.create_doc_meta_idx(index_name) + logging.debug(f"Table creation result: {result}") + else: + logging.debug(f"Metadata table already exists: {index_name}") + + # Insert into ES/Infinity + result = settings.docStoreConn.insert( + [doc_meta], + index_name, + kb_id + ) + + if result: + logging.error(f"Failed to insert metadata for document {doc_id}: {result}") + return False + + logging.debug(f"Successfully inserted metadata for document {doc_id}") + return True + + except Exception as e: + logging.error(f"Error inserting metadata for document {doc_id}: {e}") + return False + + @classmethod + @DB.connection_context() + def update_document_metadata(cls, doc_id: str, meta_fields: Dict) -> bool: + """ + Update document metadata in ES/Infinity. + + For Elasticsearch: Uses partial update to directly update the meta_fields field. + For Infinity: Falls back to delete+insert (Infinity doesn't support partial updates well). + + Args: + doc_id: Document ID + meta_fields: Metadata dictionary + + Returns: + True if successful, False otherwise + """ + try: + from api.db.db_models import Knowledgebase + + # Get document with tenant_id + doc_query = Document.select(Document, Knowledgebase.tenant_id).join( + Knowledgebase, on=(Knowledgebase.id == Document.kb_id) + ).where(Document.id == doc_id) + + doc = doc_query.first() + if not doc: + logging.warning(f"Document {doc_id} not found for metadata update") + return False + + # Extract fields + doc_obj = doc + tenant_id = doc.knowledgebase.tenant_id + kb_id = doc_obj.kb_id + index_name = cls._get_doc_meta_index_name(tenant_id) + + # Post-process to split combined values + processed_meta = cls._split_combined_values(meta_fields) + + logging.debug(f"[update_document_metadata] Updating doc_id: {doc_id}, kb_id: {kb_id}, meta_fields: {processed_meta}") + + # For Elasticsearch, use efficient partial update + if not settings.DOC_ENGINE_INFINITY: + try: + # Use ES partial update API - much more efficient than delete+insert + settings.docStoreConn.es.update( + index=index_name, + id=doc_id, + refresh=True, # Make changes immediately visible + doc={"meta_fields": processed_meta} + ) + logging.debug(f"Successfully updated metadata for document {doc_id} using ES partial update") + return True + except Exception as e: + logging.error(f"ES partial update failed for document {doc_id}: {e}") + # Fall back to delete+insert if partial update fails + logging.info(f"Falling back to delete+insert for document {doc_id}") + + # For Infinity or as fallback: use delete+insert + logging.debug(f"[update_document_metadata] Using delete+insert method for doc_id: {doc_id}") + cls.delete_document_metadata(doc_id, skip_empty_check=True) + return cls.insert_document_metadata(doc_id, processed_meta) + + except Exception as e: + logging.error(f"Error updating metadata for document {doc_id}: {e}") + return False + + @classmethod + @DB.connection_context() + def delete_document_metadata(cls, doc_id: str, skip_empty_check: bool = False) -> bool: + """ + Delete document metadata from ES/Infinity. + Also drops the metadata table if it becomes empty (efficiently). + If document has no metadata in the table, this is a no-op. + + Args: + doc_id: Document ID + skip_empty_check: If True, skip checking/dropping empty table (for bulk deletions) + + Returns: + True if successful (or no metadata to delete), False otherwise + """ + try: + from api.db.db_models import Knowledgebase + + logging.debug(f"[METADATA DELETE] Starting metadata deletion for document: {doc_id}") + + # Get document with tenant_id + doc_query = Document.select(Document, Knowledgebase.tenant_id).join( + Knowledgebase, on=(Knowledgebase.id == Document.kb_id) + ).where(Document.id == doc_id) + + doc = doc_query.first() + if not doc: + logging.warning(f"Document {doc_id} not found for metadata deletion") + return False + + tenant_id = doc.knowledgebase.tenant_id + kb_id = doc.kb_id + index_name = cls._get_doc_meta_index_name(tenant_id) + logging.debug(f"[delete_document_metadata] Deleting doc_id: {doc_id}, kb_id: {kb_id}, index: {index_name}") + + # Check if metadata table exists before attempting deletion + # This is the key optimization - no table = no metadata = nothing to delete + if not settings.docStoreConn.index_exist(index_name, ""): + logging.debug(f"Metadata table {index_name} does not exist, skipping metadata deletion for document {doc_id}") + return True # No metadata to delete is considered success + + # Try to get the metadata to confirm it exists before deleting + # This is more efficient than attempting delete on non-existent records + try: + existing_metadata = settings.docStoreConn.get( + doc_id, + index_name, + [""] # Empty list for metadata tables + ) + logging.debug(f"[METADATA DELETE] Get result: {existing_metadata is not None}") + if not existing_metadata: + logging.debug(f"[METADATA DELETE] Document {doc_id} has no metadata in table, skipping deletion") + # Only check/drop table if not skipped (tenant deletion will handle it) + if not skip_empty_check: + cls._drop_empty_metadata_table(index_name, tenant_id) + return True # No metadata to delete is success + except Exception as e: + # If get fails, document might not exist in metadata table, which is fine + logging.error(f"[METADATA DELETE] Get failed: {e}") + # Continue to check/drop table if needed + + # Delete from ES/Infinity (only if metadata exists) + # For metadata tables, pass kb_id for the delete operation + # The delete() method will detect it's a metadata table and skip the kb_id filter + logging.debug(f"[METADATA DELETE] Deleting metadata with condition: {{'id': '{doc_id}'}}") + deleted_count = settings.docStoreConn.delete( + {"id": doc_id}, + index_name, + kb_id # Pass actual kb_id (delete() will handle metadata tables correctly) + ) + logging.debug(f"[METADATA DELETE] Deleted count: {deleted_count}") + + # Only check if table should be dropped if not skipped (for bulk operations) + # Note: delete operation already uses refresh=True, so data is immediately available + if not skip_empty_check: + # Check by querying the actual metadata table (not MySQL) + cls._drop_empty_metadata_table(index_name, tenant_id) + + logging.debug(f"Successfully deleted metadata for document {doc_id}") + return True + + except Exception as e: + logging.error(f"Error deleting metadata for document {doc_id}: {e}") + return False + + @classmethod + def _drop_empty_metadata_table(cls, index_name: str, tenant_id: str) -> None: + """ + Check if metadata table is empty and drop it if so. + Uses optimized count query instead of full search. + This prevents accumulation of empty metadata tables. + + Args: + index_name: Metadata table/index name + tenant_id: Tenant ID + """ + try: + logging.debug(f"[DROP EMPTY TABLE] Starting empty table check for: {index_name}") + + # Check if table exists first (cheap operation) + if not settings.docStoreConn.index_exist(index_name, ""): + logging.debug(f"[DROP EMPTY TABLE] Metadata table {index_name} does not exist, skipping") + return + + logging.debug(f"[DROP EMPTY TABLE] Table {index_name} exists, checking if empty...") + + # Use ES count API for accurate count + # Note: No need to refresh since delete operation already uses refresh=True + try: + count_response = settings.docStoreConn.es.count(index=index_name) + total_count = count_response['count'] + logging.debug(f"[DROP EMPTY TABLE] ES count API result: {total_count} documents") + is_empty = (total_count == 0) + except Exception as e: + logging.warning(f"[DROP EMPTY TABLE] Count API failed, falling back to search: {e}") + # Fallback to search if count fails + from common.doc_store.doc_store_base import OrderByExpr + + results = settings.docStoreConn.search( + select_fields=["id"], + highlight_fields=[], + condition={}, + match_expressions=[], + order_by=OrderByExpr(), + offset=0, + limit=1, # Only need 1 result to know if table is non-empty + index_names=index_name, + knowledgebase_ids=[""] # Metadata tables don't filter by KB + ) + + logging.debug(f"[DROP EMPTY TABLE] Search results type: {type(results)}, results: {results}") + + # Check if empty based on return type (fallback search only) + if isinstance(results, tuple) and len(results) == 2: + # Infinity returns (DataFrame, int) + df, total = results + logging.debug(f"[DROP EMPTY TABLE] Infinity format - total: {total}, df length: {len(df) if hasattr(df, '__len__') else 'N/A'}") + is_empty = (total == 0 or (hasattr(df, '__len__') and len(df) == 0)) + elif hasattr(results, 'get') and 'hits' in results: + # ES format - MUST check this before hasattr(results, '__len__') + # because ES response objects also have __len__ + total = results.get('hits', {}).get('total', {}) + hits = results.get('hits', {}).get('hits', []) + + # ES 7.x+: total is a dict like {'value': 0, 'relation': 'eq'} + # ES 6.x: total is an int + if isinstance(total, dict): + total_count = total.get('value', 0) + else: + total_count = total + + logging.debug(f"[DROP EMPTY TABLE] ES format - total: {total_count}, hits count: {len(hits)}") + is_empty = (total_count == 0 or len(hits) == 0) + elif hasattr(results, '__len__'): + # DataFrame or list (check this AFTER ES format) + result_len = len(results) + logging.debug(f"[DROP EMPTY TABLE] List/DataFrame format - length: {result_len}") + is_empty = result_len == 0 + else: + logging.warning(f"[DROP EMPTY TABLE] Unknown result format: {type(results)}") + is_empty = False + + if is_empty: + logging.debug(f"[DROP EMPTY TABLE] Metadata table {index_name} is empty, dropping it") + drop_result = settings.docStoreConn.delete_idx(index_name, "") + logging.debug(f"[DROP EMPTY TABLE] Drop result: {drop_result}") + else: + logging.debug(f"[DROP EMPTY TABLE] Metadata table {index_name} still has documents, keeping it") + + except Exception as e: + # Log but don't fail - metadata deletion was successful + logging.error(f"[DROP EMPTY TABLE] Failed to check/drop empty metadata table {index_name}: {e}") + + @classmethod + @DB.connection_context() + def get_document_metadata(cls, doc_id: str) -> Dict: + """ + Get document metadata from ES/Infinity. + + Args: + doc_id: Document ID + + Returns: + Metadata dictionary, empty dict if not found + """ + try: + from api.db.db_models import Knowledgebase + + # Get document with tenant_id + doc_query = Document.select(Document, Knowledgebase.tenant_id).join( + Knowledgebase, on=(Knowledgebase.id == Document.kb_id) + ).where(Document.id == doc_id) + + doc = doc_query.first() + if not doc: + logging.warning(f"Document {doc_id} not found") + return {} + + # Extract fields + doc_obj = doc + tenant_id = doc.knowledgebase.tenant_id + kb_id = doc_obj.kb_id + index_name = cls._get_doc_meta_index_name(tenant_id) + + # Try to get metadata from ES/Infinity + metadata_doc = settings.docStoreConn.get( + doc_id, + index_name, + [kb_id] + ) + + if metadata_doc: + # Extract and unflatten metadata + return cls._extract_metadata(metadata_doc) + + return {} + + except Exception as e: + logging.error(f"Error getting metadata for document {doc_id}: {e}") + return {} + + @classmethod + @DB.connection_context() + def get_meta_by_kbs(cls, kb_ids: List[str]) -> Dict: + """ + Get metadata for documents in knowledge bases (Legacy). + + Legacy metadata aggregator (backward-compatible). + - Does NOT expand list values and a list is kept as one string key. + Example: {"tags": ["foo","bar"]} -> meta["tags"]["['foo', 'bar']"] = [doc_id] + - Expects meta_fields is a dict. + Use when existing callers rely on the old list-as-string semantics. + + Args: + kb_ids: List of knowledge base IDs + + Returns: + Metadata dictionary in format: {field_name: {value: [doc_ids]}} + """ + try: + from api.db.db_models import Knowledgebase + from common.doc_store.doc_store_base import OrderByExpr + + # Get tenant_id from first KB + kb = Knowledgebase.get_by_id(kb_ids[0]) + if not kb: + return {} + + tenant_id = kb.tenant_id + index_name = cls._get_doc_meta_index_name(tenant_id) + + condition = {"kb_id": kb_ids} + order_by = OrderByExpr() + + # Query with large limit + results = settings.docStoreConn.search( + select_fields=["*"], + highlight_fields=[], + condition=condition, + match_expressions=[], + order_by=order_by, + offset=0, + limit=10000, + index_names=index_name, + knowledgebase_ids=kb_ids + ) + + logging.debug(f"[get_meta_by_kbs] index_name: {index_name}, kb_ids: {kb_ids}") + + # Aggregate metadata (legacy: keeps lists as string keys) + meta = {} + + # Use helper to iterate over results in any format + for doc_id, doc in cls._iter_search_results(results): + # Extract metadata fields (exclude system fields) + doc_meta = cls._extract_metadata(doc) + + # Legacy: Keep lists as string keys (do NOT expand) + for k, v in doc_meta.items(): + if k not in meta: + meta[k] = {} + # If not list, make it a list + if not isinstance(v, list): + v = [v] + # Legacy: Use the entire list as a string key + # Skip nested lists/dicts + if isinstance(v, list) and any(isinstance(x, (list, dict)) for x in v): + continue + list_key = str(v) + if list_key not in meta[k]: + meta[k][list_key] = [] + meta[k][list_key].append(doc_id) + + logging.debug(f"[get_meta_by_kbs] KBs: {kb_ids}, Returning metadata: {meta}") + return meta + + except Exception as e: + logging.error(f"Error getting metadata for KBs {kb_ids}: {e}") + return {} + + @classmethod + @DB.connection_context() + def get_flatted_meta_by_kbs(cls, kb_ids: List[str]) -> Dict: + """ + Get flattened metadata for documents in knowledge bases. + + - Parses stringified JSON meta_fields when possible and skips non-dict or unparsable values. + - Expands list values into individual entries. + Example: {"tags": ["foo","bar"], "author": "alice"} -> + meta["tags"]["foo"] = [doc_id], meta["tags"]["bar"] = [doc_id], meta["author"]["alice"] = [doc_id] + Prefer for metadata_condition filtering and scenarios that must respect list semantics. + + Args: + kb_ids: List of knowledge base IDs + + Returns: + Metadata dictionary in format: {field_name: {value: [doc_ids]}} + """ + try: + from api.db.db_models import Knowledgebase + from common.doc_store.doc_store_base import OrderByExpr + + # Get tenant_id from first KB + kb = Knowledgebase.get_by_id(kb_ids[0]) + if not kb: + return {} + + tenant_id = kb.tenant_id + index_name = cls._get_doc_meta_index_name(tenant_id) + + condition = {"kb_id": kb_ids} + order_by = OrderByExpr() + + # Query with large limit + results = settings.docStoreConn.search( + select_fields=["*"], # Get all fields + highlight_fields=[], + condition=condition, + match_expressions=[], + order_by=order_by, + offset=0, + limit=10000, + index_names=index_name, + knowledgebase_ids=kb_ids + ) + + logging.debug(f"[get_flatted_meta_by_kbs] index_name: {index_name}, kb_ids: {kb_ids}") + logging.debug(f"[get_flatted_meta_by_kbs] results type: {type(results)}") + + # Aggregate metadata + meta = {} + + # Use helper to iterate over results in any format + for doc_id, doc in cls._iter_search_results(results): + # Extract metadata fields (exclude system fields) + doc_meta = cls._extract_metadata(doc) + + for k, v in doc_meta.items(): + if k not in meta: + meta[k] = {} + + values = v if isinstance(v, list) else [v] + for vv in values: + if vv is None: + continue + sv = str(vv) + if sv not in meta[k]: + meta[k][sv] = [] + meta[k][sv].append(doc_id) + + logging.debug(f"[get_flatted_meta_by_kbs] KBs: {kb_ids}, Returning metadata: {meta}") + return meta + + except Exception as e: + logging.error(f"Error getting flattened metadata for KBs {kb_ids}: {e}") + return {} + + @classmethod + def get_metadata_for_documents(cls, doc_ids: Optional[List[str]], kb_id: str) -> Dict[str, Dict]: + """ + Get metadata fields for specific documents. + Returns a mapping of doc_id -> meta_fields + + Args: + doc_ids: List of document IDs (if None, gets all documents with metadata for the KB) + kb_id: Knowledge base ID + + Returns: + Dictionary mapping doc_id to meta_fields dict + """ + try: + results = cls._search_metadata(kb_id, condition={"kb_id": kb_id}) + if not results: + return {} + + # Build mapping: doc_id -> meta_fields + meta_mapping = {} + + # If doc_ids is provided, create a set for efficient lookup + doc_ids_set = set(doc_ids) if doc_ids else None + + # Use helper to iterate over results in any format + for doc_id, doc in cls._iter_search_results(results): + # Filter by doc_ids if provided + if doc_ids_set is not None and doc_id not in doc_ids_set: + continue + + # Extract metadata (handles both JSON strings and dicts) + doc_meta = cls._extract_metadata(doc) + if doc_meta: + meta_mapping[doc_id] = doc_meta + + logging.debug(f"[get_metadata_for_documents] Found metadata for {len(meta_mapping)}/{len(doc_ids) if doc_ids else 'all'} documents") + return meta_mapping + + except Exception as e: + logging.error(f"Error getting metadata for documents: {e}") + return {} + + @classmethod + @DB.connection_context() + def get_metadata_summary(cls, kb_id: str, doc_ids=None) -> Dict: + """ + Get metadata summary for documents in a knowledge base. + + Args: + kb_id: Knowledge base ID + doc_ids: Optional list of document IDs to filter by + + Returns: + Dictionary with metadata field statistics in format: + { + "field_name": { + "type": "string" | "number" | "list", + "values": [("value1", count1), ("value2", count2), ...] # sorted by count desc + } + } + """ + def _meta_value_type(value): + """Determine the type of a metadata value.""" + if value is None: + return None + if isinstance(value, list): + return "list" + if isinstance(value, bool): + return "string" + if isinstance(value, (int, float)): + return "number" + return "string" + + try: + results = cls._search_metadata(kb_id, condition={"kb_id": kb_id}) + if not results: + return {} + + # If doc_ids are provided, we'll filter after the search + doc_ids_set = set(doc_ids) if doc_ids else None + + # Aggregate metadata + summary = {} + type_counter = {} + + logging.debug(f"[METADATA SUMMARY] KB: {kb_id}, doc_ids: {doc_ids}") + + # Use helper to iterate over results in any format + for doc_id, doc in cls._iter_search_results(results): + # Check doc_ids filter + if doc_ids_set and doc_id not in doc_ids_set: + continue + + doc_meta = cls._extract_metadata(doc) + + for k, v in doc_meta.items(): + # Track type counts for this field + value_type = _meta_value_type(v) + if value_type: + if k not in type_counter: + type_counter[k] = {} + type_counter[k][value_type] = type_counter[k].get(value_type, 0) + 1 + + # Aggregate value counts + values = v if isinstance(v, list) else [v] + for vv in values: + if not vv: + continue + sv = str(vv) + if k not in summary: + summary[k] = {} + summary[k][sv] = summary[k].get(sv, 0) + 1 + + # Build result with type information and sorted values + result = {} + for k, v in summary.items(): + values = sorted([(val, cnt) for val, cnt in v.items()], key=lambda x: x[1], reverse=True) + type_counts = type_counter.get(k, {}) + value_type = "string" + if type_counts: + value_type = max(type_counts.items(), key=lambda item: item[1])[0] + result[k] = {"type": value_type, "values": values} + + logging.debug(f"[METADATA SUMMARY] Final result: {result}") + return result + + except Exception as e: + logging.error(f"Error getting metadata summary for KB {kb_id}: {e}") + return {} + + @classmethod + @DB.connection_context() + def batch_update_metadata(cls, kb_id: str, doc_ids: List[str], updates=None, deletes=None) -> int: + """ + Batch update metadata for documents in a knowledge base. + + Args: + kb_id: Knowledge base ID + doc_ids: List of document IDs to update + updates: List of update operations, each with: + - key: field name to update + - value: new value + - match (optional): only update if current value matches this + deletes: List of delete operations, each with: + - key: field name to delete from + - value (optional): specific value to delete (if not provided, deletes the entire field) + + Returns: + Number of documents updated + + Examples: + updates = [{"key": "author", "value": "John"}] + updates = [{"key": "tags", "value": "new", "match": "old"}] # Replace "old" with "new" in tags list + deletes = [{"key": "author"}] # Delete entire author field + deletes = [{"key": "tags", "value": "obsolete"}] # Remove "obsolete" from tags list + """ + updates = updates or [] + deletes = deletes or [] + if not doc_ids: + return 0 + + def _normalize_meta(meta): + """Normalize metadata to a dict.""" + if isinstance(meta, str): + try: + meta = json.loads(meta) + except Exception: + return {} + if not isinstance(meta, dict): + return {} + return deepcopy(meta) + + def _str_equal(a, b): + """Compare two values as strings.""" + return str(a) == str(b) + + def _apply_updates(meta): + """Apply update operations to metadata.""" + changed = False + for upd in updates: + key = upd.get("key") + if not key: + continue + + new_value = upd.get("value") + match_value = upd.get("match", None) + match_provided = match_value is not None and match_value != "" + + if key not in meta: + if match_provided: + continue + meta[key] = dedupe_list(new_value) if isinstance(new_value, list) else new_value + changed = True + continue + + if isinstance(meta[key], list): + if not match_provided: + # No match provided, append new_value to the list + if isinstance(new_value, list): + meta[key] = dedupe_list(meta[key] + new_value) + else: + meta[key] = dedupe_list(meta[key] + [new_value]) + changed = True + else: + # Replace items matching match_value with new_value + replaced = False + new_list = [] + for item in meta[key]: + if _str_equal(item, match_value): + new_list.append(new_value) + replaced = True + else: + new_list.append(item) + if replaced: + meta[key] = dedupe_list(new_list) + changed = True + else: + if not match_provided: + meta[key] = new_value + changed = True + else: + if _str_equal(meta[key], match_value): + meta[key] = new_value + changed = True + return changed + + def _apply_deletes(meta): + """Apply delete operations to metadata.""" + changed = False + for d in deletes: + key = d.get("key") + if not key or key not in meta: + continue + value = d.get("value", None) + if isinstance(meta[key], list): + if value is None: + del meta[key] + changed = True + continue + new_list = [item for item in meta[key] if not _str_equal(item, value)] + if len(new_list) != len(meta[key]): + if new_list: + meta[key] = new_list + else: + del meta[key] + changed = True + else: + if value is None or _str_equal(meta[key], value): + del meta[key] + changed = True + return changed + + try: + results = cls._search_metadata(kb_id, condition=None) + if not results: + results = [] # Treat as empty list if None + + updated_docs = 0 + doc_ids_set = set(doc_ids) + found_doc_ids = set() + + logging.debug(f"[batch_update_metadata] Searching for doc_ids: {doc_ids}") + + # Use helper to iterate over results in any format + for doc_id, doc in cls._iter_search_results(results): + # Filter to only process requested doc_ids + if doc_id not in doc_ids_set: + continue + + found_doc_ids.add(doc_id) + + # Get current metadata + current_meta = cls._extract_metadata(doc) + meta = _normalize_meta(current_meta) + original_meta = deepcopy(meta) + + logging.debug(f"[batch_update_metadata] Doc {doc_id}: current_meta={current_meta}, meta={meta}") + logging.debug(f"[batch_update_metadata] Updates to apply: {updates}, Deletes: {deletes}") + + # Apply updates and deletes + changed = _apply_updates(meta) + logging.debug(f"[batch_update_metadata] After _apply_updates: changed={changed}, meta={meta}") + changed = _apply_deletes(meta) or changed + logging.debug(f"[batch_update_metadata] After _apply_deletes: changed={changed}, meta={meta}") + + # Update if changed + if changed and meta != original_meta: + logging.debug(f"[batch_update_metadata] Updating doc_id: {doc_id}, meta: {meta}") + # If metadata is empty, delete the row entirely instead of keeping empty metadata + if not meta: + cls.delete_document_metadata(doc_id, skip_empty_check=True) + else: + cls.update_document_metadata(doc_id, meta) + updated_docs += 1 + + # Handle documents that don't have metadata rows yet + # These documents weren't in the search results, so we need to insert new metadata for them + missing_doc_ids = doc_ids_set - found_doc_ids + if missing_doc_ids and updates: + logging.debug(f"[batch_update_metadata] Inserting new metadata for documents without metadata rows: {missing_doc_ids}") + for doc_id in missing_doc_ids: + # Apply updates to create new metadata + meta = {} + _apply_updates(meta) + if meta: + # Only insert if there's actual metadata to add + cls.update_document_metadata(doc_id, meta) + updated_docs += 1 + logging.debug(f"[batch_update_metadata] Inserted metadata for doc_id: {doc_id}, meta: {meta}") + + logging.debug(f"[batch_update_metadata] KB: {kb_id}, doc_ids: {doc_ids}, updated: {updated_docs}") + return updated_docs + + except Exception as e: + logging.error(f"Error in batch_update_metadata for KB {kb_id}: {e}") + return 0 diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index b0527c646..99e0f0f8f 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -33,7 +33,7 @@ from api.db.db_models import DB, Document, Knowledgebase, Task, Tenant, UserTena from api.db.db_utils import bulk_insert_into_db from api.db.services.common_service import CommonService from api.db.services.knowledgebase_service import KnowledgebaseService -from common.metadata_utils import dedupe_list +from api.db.services.doc_metadata_service import DocMetadataService from common.misc_utils import get_uuid from common.time_utils import current_timestamp, get_format_time from common.constants import LLMType, ParserType, StatusEnum, TaskStatus, SVR_CONSUMER_GROUP_NAME @@ -67,7 +67,6 @@ class DocumentService(CommonService): cls.model.progress_msg, cls.model.process_begin_at, cls.model.process_duration, - cls.model.meta_fields, cls.model.suffix, cls.model.run, cls.model.status, @@ -154,8 +153,11 @@ class DocumentService(CommonService): docs = docs.where(cls.model.type.in_(types)) if suffix: docs = docs.where(cls.model.suffix.in_(suffix)) - if return_empty_metadata: - docs = docs.where(fn.COALESCE(fn.JSON_LENGTH(cls.model.meta_fields), 0) == 0) + + metadata_map = DocMetadataService.get_metadata_for_documents(None, kb_id) + doc_ids_with_metadata = set(metadata_map.keys()) + if return_empty_metadata and doc_ids_with_metadata: + docs = docs.where(cls.model.id.not_in(doc_ids_with_metadata)) count = docs.count() if desc: @@ -166,7 +168,14 @@ class DocumentService(CommonService): if page_number and items_per_page: docs = docs.paginate(page_number, items_per_page) - return list(docs.dicts()), count + docs_list = list(docs.dicts()) + if return_empty_metadata: + for doc in docs_list: + doc["meta_fields"] = {} + else: + for doc in docs_list: + doc["meta_fields"] = metadata_map.get(doc["id"], {}) + return docs_list, count @classmethod @DB.connection_context() @@ -212,7 +221,7 @@ class DocumentService(CommonService): if suffix: query = query.where(cls.model.suffix.in_(suffix)) - rows = query.select(cls.model.run, cls.model.suffix, cls.model.meta_fields) + rows = query.select(cls.model.run, cls.model.suffix, cls.model.id) total = rows.count() suffix_counter = {} @@ -220,10 +229,18 @@ class DocumentService(CommonService): metadata_counter = {} empty_metadata_count = 0 + doc_ids = [row.id for row in rows] + metadata = {} + if doc_ids: + try: + metadata = DocMetadataService.get_metadata_for_documents(doc_ids, kb_id) + except Exception as e: + logging.warning(f"Failed to fetch metadata from ES/Infinity: {e}") + for row in rows: suffix_counter[row.suffix] = suffix_counter.get(row.suffix, 0) + 1 run_status_counter[str(row.run)] = run_status_counter.get(str(row.run), 0) + 1 - meta_fields = row.meta_fields or {} + meta_fields = metadata.get(row.id, {}) if not meta_fields: empty_metadata_count += 1 continue @@ -374,6 +391,12 @@ class DocumentService(CommonService): except Exception as e: logging.error(f"Failed to delete chunks from doc store for document {doc.id}: {e}") + # Delete document metadata (non-critical, log and continue) + try: + DocMetadataService.delete_document_metadata(doc.id) + except Exception as e: + logging.warning(f"Failed to delete metadata for document {doc.id}: {e}") + # Cleanup knowledge graph references (non-critical, log and continue) try: graph_source = settings.docStoreConn.get_fields( @@ -707,246 +730,6 @@ class DocumentService(CommonService): cls.update_by_id(doc_id, info) - @classmethod - @DB.connection_context() - def update_meta_fields(cls, doc_id, meta_fields): - return cls.update_by_id(doc_id, {"meta_fields": meta_fields}) - - @classmethod - @DB.connection_context() - def get_meta_by_kbs(cls, kb_ids): - """ - Legacy metadata aggregator (backward-compatible). - - Does NOT expand list values and a list is kept as one string key. - Example: {"tags": ["foo","bar"]} -> meta["tags"]["['foo', 'bar']"] = [doc_id] - - Expects meta_fields is a dict. - Use when existing callers rely on the old list-as-string semantics. - """ - fields = [ - cls.model.id, - cls.model.meta_fields, - ] - meta = {} - for r in cls.model.select(*fields).where(cls.model.kb_id.in_(kb_ids)): - doc_id = r.id - for k,v in r.meta_fields.items(): - if k not in meta: - meta[k] = {} - if not isinstance(v, list): - v = [v] - for vv in v: - if vv not in meta[k]: - if isinstance(vv, list) or isinstance(vv, dict): - continue - meta[k][vv] = [] - meta[k][vv].append(doc_id) - return meta - - @classmethod - @DB.connection_context() - def get_flatted_meta_by_kbs(cls, kb_ids): - """ - - Parses stringified JSON meta_fields when possible and skips non-dict or unparsable values. - - Expands list values into individual entries. - Example: {"tags": ["foo","bar"], "author": "alice"} -> - meta["tags"]["foo"] = [doc_id], meta["tags"]["bar"] = [doc_id], meta["author"]["alice"] = [doc_id] - Prefer for metadata_condition filtering and scenarios that must respect list semantics. - """ - fields = [ - cls.model.id, - cls.model.meta_fields, - ] - meta = {} - for r in cls.model.select(*fields).where(cls.model.kb_id.in_(kb_ids)): - doc_id = r.id - meta_fields = r.meta_fields or {} - if isinstance(meta_fields, str): - try: - meta_fields = json.loads(meta_fields) - except Exception: - continue - if not isinstance(meta_fields, dict): - continue - for k, v in meta_fields.items(): - if k not in meta: - meta[k] = {} - values = v if isinstance(v, list) else [v] - for vv in values: - if vv is None: - continue - sv = str(vv) - if sv not in meta[k]: - meta[k][sv] = [] - meta[k][sv].append(doc_id) - return meta - - @classmethod - @DB.connection_context() - def get_metadata_summary(cls, kb_id, document_ids=None): - def _meta_value_type(value): - if value is None: - return None - if isinstance(value, list): - return "list" - if isinstance(value, bool): - return "string" - if isinstance(value, (int, float)): - return "number" - if re.match(r"\d{4}\-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}", str(value)): - return "time" - return "string" - - fields = [cls.model.id, cls.model.meta_fields] - summary = {} - type_counter = {} - query = cls.model.select(*fields).where(cls.model.kb_id == kb_id) - if document_ids: - query = query.where(cls.model.id.in_(document_ids)) - for r in query: - meta_fields = r.meta_fields or {} - if isinstance(meta_fields, str): - try: - meta_fields = json.loads(meta_fields) - except Exception: - continue - if not isinstance(meta_fields, dict): - continue - for k, v in meta_fields.items(): - value_type = _meta_value_type(v) - if value_type: - if k not in type_counter: - type_counter[k] = {} - type_counter[k][value_type] = type_counter[k].get(value_type, 0) + 1 - values = v if isinstance(v, list) else [v] - for vv in values: - if not vv: - continue - sv = str(vv) - if k not in summary: - summary[k] = {} - summary[k][sv] = summary[k].get(sv, 0) + 1 - result = {} - for k, v in summary.items(): - values = sorted([(val, cnt) for val, cnt in v.items()], key=lambda x: x[1], reverse=True) - type_counts = type_counter.get(k, {}) - value_type = "string" - if type_counts: - value_type = max(type_counts.items(), key=lambda item: item[1])[0] - result[k] = {"type": value_type, "values": values} - return result - - @classmethod - @DB.connection_context() - def batch_update_metadata(cls, kb_id, doc_ids, updates=None, deletes=None, adds=None): - updates = updates or [] - deletes = deletes or [] - if not doc_ids: - return 0 - - def _normalize_meta(meta): - if isinstance(meta, str): - try: - meta = json.loads(meta) - except Exception: - return {} - if not isinstance(meta, dict): - return {} - return deepcopy(meta) - - def _str_equal(a, b): - return str(a) == str(b) - - def _apply_updates(meta): - changed = False - for upd in updates: - key = upd.get("key") - if not key: - continue - - new_value = upd.get("value") - match_provided = upd.get("match") - if key not in meta: - if match_provided: - continue - meta[key] = dedupe_list(new_value) if isinstance(new_value, list) else new_value - changed = True - continue - - if isinstance(meta[key], list): - if not match_provided: - if isinstance(new_value, list): - meta[key] = dedupe_list(new_value) - else: - meta[key].append(new_value) - changed = True - else: - match_value = upd.get("match") - replaced = False - new_list = [] - for item in meta[key]: - if _str_equal(item, match_value): - new_list.append(new_value) - replaced = True - else: - new_list.append(item) - if replaced: - meta[key] = dedupe_list(new_list) - changed = True - else: - if not match_provided: - meta[key] = new_value - changed = True - else: - match_value = upd.get("match") - if _str_equal(meta[key], match_value): - meta[key] = new_value - changed = True - return changed - - def _apply_deletes(meta): - changed = False - for d in deletes: - key = d.get("key") - if not key or key not in meta: - continue - value = d.get("value", None) - if isinstance(meta[key], list): - if value is None: - del meta[key] - changed = True - continue - new_list = [item for item in meta[key] if not _str_equal(item, value)] - if len(new_list) != len(meta[key]): - if new_list: - meta[key] = new_list - else: - del meta[key] - changed = True - else: - if value is None or _str_equal(meta[key], value): - del meta[key] - changed = True - return changed - - updated_docs = 0 - with DB.atomic(): - rows = cls.model.select(cls.model.id, cls.model.meta_fields).where( - cls.model.id.in_(doc_ids) - ) - for r in rows: - meta = _normalize_meta(r.meta_fields or {}) - original_meta = deepcopy(meta) - changed = _apply_updates(meta) - changed = _apply_deletes(meta) or changed - if changed and meta != original_meta: - cls.model.update( - meta_fields=meta, - update_time=current_timestamp(), - update_date=get_format_time() - ).where(cls.model.id == r.id).execute() - updated_docs += 1 - return updated_docs - @classmethod @DB.connection_context() def update_progress(cls): diff --git a/common/doc_store/es_conn_base.py b/common/doc_store/es_conn_base.py index 3bbd8f7ca..6b725cdbe 100644 --- a/common/doc_store/es_conn_base.py +++ b/common/doc_store/es_conn_base.py @@ -24,6 +24,7 @@ from abc import abstractmethod from elasticsearch import NotFoundError from elasticsearch_dsl import Index from elastic_transport import ConnectionTimeout +from elasticsearch.client import IndicesClient from common.file_utils import get_project_base_directory from common.misc_utils import convert_bytes from common.doc_store.doc_store_base import DocStoreConnection, OrderByExpr, MatchExpr @@ -128,13 +129,34 @@ class ESConnectionBase(DocStoreConnection): if self.index_exist(index_name, dataset_id): return True try: - from elasticsearch.client import IndicesClient return IndicesClient(self.es).create(index=index_name, settings=self.mapping["settings"], mappings=self.mapping["mappings"]) except Exception: self.logger.exception("ESConnection.createIndex error %s" % index_name) + def create_doc_meta_idx(self, index_name: str): + """ + Create a document metadata index. + + Index name pattern: ragflow_doc_meta_{tenant_id} + - Per-tenant metadata index for storing document metadata fields + """ + if self.index_exist(index_name, ""): + return True + try: + fp_mapping = os.path.join(get_project_base_directory(), "conf", "doc_meta_es_mapping.json") + if not os.path.exists(fp_mapping): + self.logger.error(f"Document metadata mapping file not found at {fp_mapping}") + return False + + doc_meta_mapping = json.load(open(fp_mapping, "r")) + return IndicesClient(self.es).create(index=index_name, + settings=doc_meta_mapping["settings"], + mappings=doc_meta_mapping["mappings"]) + except Exception as e: + self.logger.exception(f"Error creating document metadata index {index_name}: {e}") + def delete_idx(self, index_name: str, dataset_id: str): if len(dataset_id) > 0: # The index need to be alive after any kb deletion since all kb under this tenant are in one index. diff --git a/common/doc_store/infinity_conn_base.py b/common/doc_store/infinity_conn_base.py index 218f12552..d6418695d 100644 --- a/common/doc_store/infinity_conn_base.py +++ b/common/doc_store/infinity_conn_base.py @@ -285,8 +285,65 @@ class InfinityConnectionBase(DocStoreConnection): self.logger.info(f"INFINITY created table {table_name}, vector size {vector_size}") return True + def create_doc_meta_idx(self, index_name: str): + """ + Create a document metadata table. + + Table name pattern: ragflow_doc_meta_{tenant_id} + - Per-tenant metadata table for storing document metadata fields + """ + table_name = index_name + inf_conn = self.connPool.get_conn() + inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore) + try: + fp_mapping = os.path.join(get_project_base_directory(), "conf", "doc_meta_infinity_mapping.json") + if not os.path.exists(fp_mapping): + self.logger.error(f"Document metadata mapping file not found at {fp_mapping}") + return False + schema = json.load(open(fp_mapping)) + inf_db.create_table( + table_name, + schema, + ConflictType.Ignore, + ) + + # Create secondary indexes on id and kb_id for better query performance + inf_table = inf_db.get_table(table_name) + + try: + inf_table.create_index( + f"idx_{table_name}_id", + IndexInfo("id", IndexType.Secondary), + ConflictType.Ignore, + ) + self.logger.debug(f"INFINITY created secondary index on id for table {table_name}") + except Exception as e: + self.logger.warning(f"Failed to create index on id for {table_name}: {e}") + + try: + inf_table.create_index( + f"idx_{table_name}_kb_id", + IndexInfo("kb_id", IndexType.Secondary), + ConflictType.Ignore, + ) + self.logger.debug(f"INFINITY created secondary index on kb_id for table {table_name}") + except Exception as e: + self.logger.warning(f"Failed to create index on kb_id for {table_name}: {e}") + + self.connPool.release_conn(inf_conn) + self.logger.debug(f"INFINITY created document metadata table {table_name} with secondary indexes") + return True + + except Exception as e: + self.connPool.release_conn(inf_conn) + self.logger.exception(f"Error creating document metadata table {table_name}: {e}") + return False + def delete_idx(self, index_name: str, dataset_id: str): - table_name = f"{index_name}_{dataset_id}" + if index_name.startswith("ragflow_doc_meta_"): + table_name = index_name + else: + table_name = f"{index_name}_{dataset_id}" inf_conn = self.connPool.get_conn() db_instance = inf_conn.get_database(self.dbName) db_instance.drop_table(table_name, ConflictType.Ignore) @@ -294,7 +351,10 @@ class InfinityConnectionBase(DocStoreConnection): self.logger.info(f"INFINITY dropped table {table_name}") def index_exist(self, index_name: str, dataset_id: str) -> bool: - table_name = f"{index_name}_{dataset_id}" + if index_name.startswith("ragflow_doc_meta_"): + table_name = index_name + else: + table_name = f"{index_name}_{dataset_id}" try: inf_conn = self.connPool.get_conn() db_instance = inf_conn.get_database(self.dbName) @@ -341,7 +401,10 @@ class InfinityConnectionBase(DocStoreConnection): def delete(self, condition: dict, index_name: str, dataset_id: str) -> int: inf_conn = self.connPool.get_conn() db_instance = inf_conn.get_database(self.dbName) - table_name = f"{index_name}_{dataset_id}" + if index_name.startswith("ragflow_doc_meta_"): + table_name = index_name + else: + table_name = f"{index_name}_{dataset_id}" try: table_instance = db_instance.get_table(table_name) except Exception: diff --git a/common/metadata_utils.py b/common/metadata_utils.py index 989bc7d04..772606714 100644 --- a/common/metadata_utils.py +++ b/common/metadata_utils.py @@ -106,10 +106,10 @@ def meta_filter(metas: dict, filters: list[dict], logic: str = "and"): else: if logic == "and": doc_ids = doc_ids & set(ids) + if not doc_ids: + return [] else: doc_ids = doc_ids | set(ids) - if not doc_ids: - return [] return list(doc_ids) diff --git a/conf/doc_meta_es_mapping.json b/conf/doc_meta_es_mapping.json new file mode 100644 index 000000000..eeab3b985 --- /dev/null +++ b/conf/doc_meta_es_mapping.json @@ -0,0 +1,29 @@ +{ + "settings": { + "index": { + "number_of_shards": 2, + "number_of_replicas": 0, + "refresh_interval": "1000ms" + } + }, + "mappings": { + "_source": { + "enabled": true + }, + "dynamic": "runtime", + "properties": { + "id": { + "type": "keyword", + "store": true + }, + "kb_id": { + "type": "keyword", + "store": true + }, + "meta_fields": { + "type": "object", + "dynamic": true + } + } + } +} diff --git a/conf/doc_meta_infinity_mapping.json b/conf/doc_meta_infinity_mapping.json new file mode 100644 index 000000000..471912c6e --- /dev/null +++ b/conf/doc_meta_infinity_mapping.json @@ -0,0 +1,5 @@ +{ + "id": {"type": "varchar", "default": ""}, + "kb_id": {"type": "varchar", "default": ""}, + "meta_fields": {"type": "json", "default": "{}"} +} \ No newline at end of file diff --git a/rag/prompts/generator.py b/rag/prompts/generator.py index f65bfeab6..5f834d85a 100644 --- a/rag/prompts/generator.py +++ b/rag/prompts/generator.py @@ -98,6 +98,7 @@ def message_fit_in(msg, max_length=4000): def kb_prompt(kbinfos, max_tokens, hash_id=False): from api.db.services.document_service import DocumentService + from api.db.services.doc_metadata_service import DocMetadataService knowledges = [get_value(ck, "content", "content_with_weight") for ck in kbinfos["chunks"]] kwlg_len = len(knowledges) @@ -114,7 +115,12 @@ def kb_prompt(kbinfos, max_tokens, hash_id=False): break docs = DocumentService.get_by_ids([get_value(ck, "doc_id", "document_id") for ck in kbinfos["chunks"][:chunks_num]]) - docs = {d.id: d.meta_fields for d in docs} + + docs_with_meta = {} + for d in docs: + meta = DocMetadataService.get_document_metadata(d.id) + docs_with_meta[d.id] = meta if meta else {} + docs = docs_with_meta def draw_node(k, line): if line is not None and not isinstance(line, str): diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 3e406e95c..c414f21ee 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -61,6 +61,7 @@ import numpy as np from peewee import DoesNotExist from common.constants import LLMType, ParserType, PipelineTaskType from api.db.services.document_service import DocumentService +from api.db.services.doc_metadata_service import DocMetadataService from api.db.services.llm_service import LLMBundle from api.db.services.task_service import TaskService, has_canceled, CANVAS_DEBUG_DOC_ID, GRAPH_RAPTOR_FAKE_DOC_ID from api.db.services.file2document_service import File2DocumentService @@ -438,12 +439,10 @@ async def build_chunks(task, progress_callback): metadata = update_metadata_to(metadata, doc["metadata_obj"]) del doc["metadata_obj"] if metadata: - e, doc = DocumentService.get_by_id(task["doc_id"]) - if e: - if isinstance(doc.meta_fields, str): - doc.meta_fields = json.loads(doc.meta_fields) - metadata = update_metadata_to(metadata, doc.meta_fields) - DocumentService.update_by_id(task["doc_id"], {"meta_fields": metadata}) + existing_meta = DocMetadataService.get_document_metadata(task["doc_id"]) + existing_meta = existing_meta if isinstance(existing_meta, dict) else {} + metadata = update_metadata_to(metadata, existing_meta) + DocMetadataService.update_document_metadata(task["doc_id"], metadata) progress_callback(msg="Question generation {} chunks completed in {:.2f}s".format(len(docs), timer() - st)) if task["kb_parser_config"].get("tag_kb_ids", []): @@ -735,12 +734,10 @@ async def run_dataflow(task: dict): del ck["positions"] if metadata: - e, doc = DocumentService.get_by_id(doc_id) - if e: - if isinstance(doc.meta_fields, str): - doc.meta_fields = json.loads(doc.meta_fields) - metadata = update_metadata_to(metadata, doc.meta_fields) - DocumentService.update_by_id(doc_id, {"meta_fields": metadata}) + existing_meta = DocMetadataService.get_document_metadata(doc_id) + existing_meta = existing_meta if isinstance(existing_meta, dict) else {} + metadata = update_metadata_to(metadata, existing_meta) + DocMetadataService.update_document_metadata(doc_id, metadata) start_ts = timer() set_progress(task_id, prog=0.82, msg="[DOC Engine]:\nStart to index...") diff --git a/rag/utils/es_conn.py b/rag/utils/es_conn.py index d873a5af5..8c1e506b4 100644 --- a/rag/utils/es_conn.py +++ b/rag/utils/es_conn.py @@ -162,7 +162,11 @@ class ESConnection(ESConnectionBase): self._connect() continue except Exception as e: - self.logger.exception(f"ESConnection.search {str(index_names)} query: " + str(q) + str(e)) + # Only log debug for NotFoundError(accepted when metadata index doesn't exist) + if 'NotFound' in str(e): + self.logger.debug(f"ESConnection.search {str(index_names)} query: " + str(q) + " - " + str(e)) + else: + self.logger.exception(f"ESConnection.search {str(index_names)} query: " + str(q) + str(e)) raise e self.logger.error(f"ESConnection.search timeout for {ATTEMPT_TIME} times!") diff --git a/rag/utils/infinity_conn.py b/rag/utils/infinity_conn.py index 916f919ee..7d2107700 100644 --- a/rag/utils/infinity_conn.py +++ b/rag/utils/infinity_conn.py @@ -149,8 +149,11 @@ class InfinityConnection(InfinityConnectionBase): if condition: table_found = False for indexName in index_names: - for kb_id in knowledgebase_ids: - table_name = f"{indexName}_{kb_id}" + if indexName.startswith("ragflow_doc_meta_"): + table_names_to_search = [indexName] + else: + table_names_to_search = [f"{indexName}_{kb_id}" for kb_id in knowledgebase_ids] + for table_name in table_names_to_search: try: filter_cond = self.equivalent_condition_to_str(condition, db_instance.get_table(table_name)) table_found = True @@ -221,8 +224,11 @@ class InfinityConnection(InfinityConnectionBase): total_hits_count = 0 # Scatter search tables and gather the results for indexName in index_names: - for knowledgebaseId in knowledgebase_ids: - table_name = f"{indexName}_{knowledgebaseId}" + if indexName.startswith("ragflow_doc_meta_"): + table_names_to_search = [indexName] + else: + table_names_to_search = [f"{indexName}_{kb_id}" for kb_id in knowledgebase_ids] + for table_name in table_names_to_search: try: table_instance = db_instance.get_table(table_name) except Exception: @@ -276,8 +282,11 @@ class InfinityConnection(InfinityConnectionBase): df_list = list() assert isinstance(knowledgebase_ids, list) table_list = list() - for knowledgebaseId in knowledgebase_ids: - table_name = f"{index_name}_{knowledgebaseId}" + if index_name.startswith("ragflow_doc_meta_"): + table_names_to_search = [index_name] + else: + table_names_to_search = [f"{index_name}_{kb_id}" for kb_id in knowledgebase_ids] + for table_name in table_names_to_search: table_list.append(table_name) try: table_instance = db_instance.get_table(table_name) @@ -301,7 +310,10 @@ class InfinityConnection(InfinityConnectionBase): def insert(self, documents: list[dict], index_name: str, knowledgebase_id: str = None) -> list[str]: inf_conn = self.connPool.get_conn() db_instance = inf_conn.get_database(self.dbName) - table_name = f"{index_name}_{knowledgebase_id}" + if index_name.startswith("ragflow_doc_meta_"): + table_name = index_name + else: + table_name = f"{index_name}_{knowledgebase_id}" try: table_instance = db_instance.get_table(table_name) except InfinityException as e: @@ -405,6 +417,11 @@ class InfinityConnection(InfinityConnectionBase): elif k in ["page_num_int", "top_int"]: assert isinstance(v, list) d[k] = "_".join(f"{num:08x}" for num in v) + elif k == "meta_fields": + if isinstance(v, dict): + d[k] = json.dumps(v, ensure_ascii=False) + else: + d[k] = v if v else "{}" else: d[k] = v for k in ["docnm_kwd", "title_tks", "title_sm_tks", "important_kwd", "important_tks", "content_with_weight", @@ -434,7 +451,10 @@ class InfinityConnection(InfinityConnectionBase): # logger.info(f"update position_int: {newValue['position_int']}") inf_conn = self.connPool.get_conn() db_instance = inf_conn.get_database(self.dbName) - table_name = f"{index_name}_{knowledgebase_id}" + if index_name.startswith("ragflow_doc_meta_"): + table_name = index_name + else: + table_name = f"{index_name}_{knowledgebase_id}" table_instance = db_instance.get_table(table_name) # if "exists" in condition: # del condition["exists"] diff --git a/test/testcases/test_http_api/test_file_management_within_dataset/test_metadata_retrieval.py b/test/testcases/test_http_api/test_file_management_within_dataset/test_metadata_retrieval.py new file mode 100644 index 000000000..adc6435dd --- /dev/null +++ b/test/testcases/test_http_api/test_file_management_within_dataset/test_metadata_retrieval.py @@ -0,0 +1,184 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +End-to-end test for metadata filtering during retrieval. + +Tests that chunks are only retrieved from documents matching the metadata condition. +""" + +import pytest +import logging +from common import ( + create_dataset, + delete_datasets, + list_documents, + update_document, +) +from utils import wait_for + + +@wait_for(30, 1, "Document parsing timeout") +def _condition_parsing_complete(_auth, dataset_id): + res = list_documents(_auth, dataset_id) + if res["code"] != 0: + return False + + for doc in res["data"]["docs"]: + status = doc.get("run", "UNKNOWN") + if status == "FAILED": + pytest.fail(f"Document parsing failed: {doc}") + return False + if status != "DONE": + return False + return True + + +@pytest.fixture(scope="function") +def add_dataset_with_metadata(HttpApiAuth): + # First create the dataset + res = create_dataset(HttpApiAuth, { + "name": f"test_metadata_{int(__import__('time').time())}", + "chunk_method": "naive" + }) + + assert res["code"] == 0, f"Failed to create dataset: {res}" + dataset_id = res["data"]["id"] + + # Then configure metadata via the update_metadata_setting endpoint + import requests + from configs import HOST_ADDRESS, VERSION + + metadata_config = { + "type": "object", + "properties": { + "character": { + "description": "Historical figure name", + "type": "string" + }, + "era": { + "description": "Historical era", + "type": "string" + }, + "achievements": { + "description": "Major achievements", + "type": "array", + "items": { + "type": "string" + } + } + } + } + + res = requests.post( + url=f"{HOST_ADDRESS}/{VERSION}/kb/update_metadata_setting", + headers={"Content-Type": "application/json"}, + auth=HttpApiAuth, + json={ + "kb_id": dataset_id, + "metadata": metadata_config, + "enable_metadata": False + } + ).json() + + assert res["code"] == 0, f"Failed to configure metadata: {res}" + + yield dataset_id + + # Cleanup + delete_datasets(HttpApiAuth, {"ids": [dataset_id]}) + + +@pytest.mark.p2 +class TestMetadataWithRetrieval: + """Test retrieval with metadata filtering.""" + + def test_retrieval_with_metadata_filter(self, HttpApiAuth, add_dataset_with_metadata, tmp_path): + """ + Test that retrieval respects metadata filters. + + Verifies that chunks are only retrieved from documents matching the metadata condition. + """ + from common import upload_documents, parse_documents, retrieval_chunks + + dataset_id = add_dataset_with_metadata + + # Create two documents with different metadata + content_doc1 = "Document about Zhuge Liang who lived in Three Kingdoms period." + content_doc2 = "Document about Cao Cao who lived in Late Eastern Han Dynasty." + + fp1 = tmp_path / "doc1_zhuge_liang.txt" + fp2 = tmp_path / "doc2_cao_cao.txt" + + with open(fp1, "w", encoding="utf-8") as f: + f.write(content_doc1) + with open(fp2, "w", encoding="utf-8") as f: + f.write(content_doc2) + + # Upload both documents + res = upload_documents(HttpApiAuth, dataset_id, [fp1, fp2]) + assert res["code"] == 0, f"Failed to upload documents: {res}" + + doc1_id = res["data"][0]["id"] + doc2_id = res["data"][1]["id"] + + # Add different metadata to each document + res = update_document(HttpApiAuth, dataset_id, doc1_id, { + "meta_fields": {"character": "Zhuge Liang", "era": "Three Kingdoms"} + }) + assert res["code"] == 0, f"Failed to update doc1 metadata: {res}" + + res = update_document(HttpApiAuth, dataset_id, doc2_id, { + "meta_fields": {"character": "Cao Cao", "era": "Late Eastern Han"} + }) + assert res["code"] == 0, f"Failed to update doc2 metadata: {res}" + + # Parse both documents + res = parse_documents(HttpApiAuth, dataset_id, {"document_ids": [doc1_id, doc2_id]}) + assert res["code"] == 0, f"Failed to trigger parsing: {res}" + + # Wait for parsing to complete + assert _condition_parsing_complete(HttpApiAuth, dataset_id), "Parsing timeout" + + # Test retrieval WITH metadata filter for "Zhuge Liang" + res = retrieval_chunks(HttpApiAuth, { + "question": "Zhuge Liang", + "dataset_ids": [dataset_id], + "metadata_condition": { + "logic": "and", + "conditions": [ + { + "name": "character", + "comparison_operator": "is", + "value": "Zhuge Liang" + } + ] + } + }) + assert res["code"] == 0, f"Retrieval with metadata filter failed: {res}" + + chunks_with_filter = res["data"]["chunks"] + doc_ids_with_filter = set(chunk.get("document_id", "") for chunk in chunks_with_filter) + + logging.info(f"✓ Retrieved {len(chunks_with_filter)} chunks from documents: {doc_ids_with_filter}") + + # Verify that filtered results only contain doc1 (Zhuge Liang) + if len(chunks_with_filter) > 0: + assert doc1_id in doc_ids_with_filter, f"Filtered results should contain doc1 (Zhuge Liang), but got: {doc_ids_with_filter}" + assert doc2_id not in doc_ids_with_filter, f"Filtered results should NOT contain doc2 (Cao Cao), but got: {doc_ids_with_filter}" + logging.info("Metadata filter correctly excluded chunks from non-matching documents") + else: + logging.warning("No chunks retrieved with filter - this might be due to embedding model not configured") diff --git a/test/testcases/test_http_api/test_file_management_within_dataset/test_metadata_summary.py b/test/testcases/test_http_api/test_file_management_within_dataset/test_metadata_summary.py index 931b48497..0791ead38 100644 --- a/test/testcases/test_http_api/test_file_management_within_dataset/test_metadata_summary.py +++ b/test/testcases/test_http_api/test_file_management_within_dataset/test_metadata_summary.py @@ -22,7 +22,9 @@ def _summary_to_counts(summary): counts = {} - for key, pairs in summary.items(): + for key, field_data in summary.items(): + # New format: {key: {"type": "...", "values": [[value, count], ...]}} + pairs = field_data["values"] counts[key] = {str(k): v for k, v in pairs} return counts