Files
ragflow/api/db/services/doc_metadata_service.py
qinling0210 9a5208976c Put document metadata in ES/Infinity (#12826)
### What problem does this PR solve?

Put document metadata in ES/Infinity.

Index name of meta data: ragflow_doc_meta_{tenant_id}

### Type of change

- [x] Refactoring
2026-01-28 13:29:34 +08:00

1074 lines
42 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Document Metadata Service
Manages document-level metadata storage in ES/Infinity.
This is the SOLE source of truth for document metadata - MySQL meta_fields column has been removed.
"""
import json
import logging
from copy import deepcopy
from typing import Dict, List, Optional
from api.db.db_models import DB, Document
from common import settings
from common.metadata_utils import dedupe_list
class DocMetadataService:
"""Service for managing document metadata in ES/Infinity"""
@staticmethod
def _get_doc_meta_index_name(tenant_id: str) -> str:
"""
Get the index name for document metadata.
Args:
tenant_id: Tenant ID
Returns:
Index name for document metadata
"""
return f"ragflow_doc_meta_{tenant_id}"
@staticmethod
def _extract_metadata(flat_meta: Dict) -> Dict:
"""
Extract metadata from ES/Infinity document format.
Args:
flat_meta: Raw document from ES/Infinity with meta_fields field
Returns:
Simple metadata dictionary
"""
if not flat_meta:
return {}
meta_fields = flat_meta.get('meta_fields')
if not meta_fields:
return {}
# Parse JSON string if needed
if isinstance(meta_fields, str):
import json
try:
return json.loads(meta_fields)
except json.JSONDecodeError:
return {}
# Already a dict, return as-is
if isinstance(meta_fields, dict):
return meta_fields
return {}
@staticmethod
def _extract_doc_id(doc: Dict, hit: Dict = None) -> str:
"""
Extract document ID from various formats.
Args:
doc: Document dictionary (from DataFrame or list format)
hit: Hit dictionary (from ES format with _id field)
Returns:
Document ID or empty string
"""
if hit:
# ES format: doc is in _source, id is in _id
return hit.get('_id', '')
# DataFrame or list format: check multiple possible fields
return doc.get("doc_id") or doc.get("_id") or doc.get("id", "")
@classmethod
def _iter_search_results(cls, results):
"""
Iterate over search results in various formats (DataFrame, ES, list).
Yields:
Tuple of (doc_id, doc_dict) for each document
Args:
results: Search results from ES/Infinity in any format
"""
# Handle tuple return from Infinity: (DataFrame, int)
# Check this FIRST because pandas DataFrames also have __getitem__
if isinstance(results, tuple) and len(results) == 2:
results = results[0] # Extract DataFrame from tuple
# Check if results is a pandas DataFrame (from Infinity)
if hasattr(results, 'iterrows'):
# Handle pandas DataFrame - use iterrows() to iterate over rows
for _, row in results.iterrows():
doc = dict(row) # Convert Series to dict
doc_id = cls._extract_doc_id(doc)
if doc_id:
yield doc_id, doc
# Check if ES format (has 'hits' key)
# Note: ES returns ObjectApiResponse which is dict-like but not isinstance(dict)
elif hasattr(results, '__getitem__') and 'hits' in results:
# ES format: {"hits": {"hits": [{"_source": {...}, "_id": "..."}]}}
hits = results.get('hits', {}).get('hits', [])
for hit in hits:
doc = hit.get('_source', {})
doc_id = cls._extract_doc_id(doc, hit)
if doc_id:
yield doc_id, doc
# Handle list of dicts or other formats
elif isinstance(results, list):
for res in results:
if isinstance(res, dict):
docs = [res]
else:
docs = res
for doc in docs:
doc_id = cls._extract_doc_id(doc)
if doc_id:
yield doc_id, doc
@classmethod
def _search_metadata(cls, kb_id: str, condition: Dict = None, limit: int = 10000):
"""
Common search logic for metadata queries.
Args:
kb_id: Knowledge base ID
condition: Optional search condition (defaults to {"kb_id": kb_id})
limit: Max results to return
Returns:
Search results from ES/Infinity
"""
from api.db.db_models import Knowledgebase
from common.doc_store.doc_store_base import OrderByExpr
kb = Knowledgebase.get_by_id(kb_id)
if not kb:
return None
tenant_id = kb.tenant_id
index_name = cls._get_doc_meta_index_name(tenant_id)
if condition is None:
condition = {"kb_id": kb_id}
order_by = OrderByExpr()
return settings.docStoreConn.search(
select_fields=["*"],
highlight_fields=[],
condition=condition,
match_expressions=[],
order_by=order_by,
offset=0,
limit=limit,
index_names=index_name,
knowledgebase_ids=[kb_id]
)
@classmethod
def _split_combined_values(cls, meta_fields: Dict) -> Dict:
"""
Post-process metadata to split combined values by common delimiters.
For example: "关羽、孙权、张辽" -> ["关羽", "孙权", "张辽"]
This fixes LLM extraction where multiple values are extracted as one combined value.
Also removes duplicates after splitting.
Args:
meta_fields: Metadata dictionary
Returns:
Processed metadata with split values
"""
import re
if not meta_fields or not isinstance(meta_fields, dict):
return meta_fields
processed = {}
for key, value in meta_fields.items():
if isinstance(value, list):
# Process each item in the list
new_values = []
for item in value:
if isinstance(item, str):
# Split by common delimiters: Chinese comma (、), regular comma (,), pipe (|), semicolon (;), Chinese semicolon ()
# Also handle mixed delimiters and spaces
split_items = re.split(r'[、,;|]+', item.strip())
# Trim whitespace and filter empty strings
split_items = [s.strip() for s in split_items if s.strip()]
if split_items:
new_values.extend(split_items)
else:
# Keep original if no split happened
new_values.append(item)
else:
new_values.append(item)
# Remove duplicates while preserving order
processed[key] = list(dict.fromkeys(new_values))
else:
processed[key] = value
if processed != meta_fields:
logging.debug(f"[METADATA SPLIT] Split combined values: {meta_fields} -> {processed}")
return processed
@classmethod
@DB.connection_context()
def insert_document_metadata(cls, doc_id: str, meta_fields: Dict) -> bool:
"""
Insert document metadata into ES/Infinity.
Args:
doc_id: Document ID
meta_fields: Metadata dictionary
Returns:
True if successful, False otherwise
"""
try:
from api.db.db_models import Knowledgebase
# Get document with tenant_id (need to join with Knowledgebase)
doc_query = Document.select(Document, Knowledgebase.tenant_id).join(
Knowledgebase, on=(Knowledgebase.id == Document.kb_id)
).where(Document.id == doc_id)
doc = doc_query.first()
if not doc:
logging.warning(f"Document {doc_id} not found for metadata insertion")
return False
# Extract document fields
doc_obj = doc # This is the Document object
tenant_id = doc.knowledgebase.tenant_id # Get tenant_id from joined Knowledgebase
kb_id = doc_obj.kb_id
# Prepare metadata document
doc_meta = {
"id": doc_obj.id,
"kb_id": kb_id,
}
# Store metadata as JSON object in meta_fields column (same as MySQL structure)
if meta_fields:
# Post-process to split combined values by common delimiters
meta_fields = cls._split_combined_values(meta_fields)
doc_meta["meta_fields"] = meta_fields
else:
doc_meta["meta_fields"] = {}
# Ensure index/table exists (per-tenant for both ES and Infinity)
index_name = cls._get_doc_meta_index_name(tenant_id)
# Check if table exists
table_exists = settings.docStoreConn.index_exist(index_name, kb_id)
logging.debug(f"Metadata table exists check: {index_name} -> {table_exists}")
# Create index if it doesn't exist
if not table_exists:
logging.debug(f"Creating metadata table: {index_name}")
# Both ES and Infinity now use per-tenant metadata tables
result = settings.docStoreConn.create_doc_meta_idx(index_name)
logging.debug(f"Table creation result: {result}")
else:
logging.debug(f"Metadata table already exists: {index_name}")
# Insert into ES/Infinity
result = settings.docStoreConn.insert(
[doc_meta],
index_name,
kb_id
)
if result:
logging.error(f"Failed to insert metadata for document {doc_id}: {result}")
return False
logging.debug(f"Successfully inserted metadata for document {doc_id}")
return True
except Exception as e:
logging.error(f"Error inserting metadata for document {doc_id}: {e}")
return False
@classmethod
@DB.connection_context()
def update_document_metadata(cls, doc_id: str, meta_fields: Dict) -> bool:
"""
Update document metadata in ES/Infinity.
For Elasticsearch: Uses partial update to directly update the meta_fields field.
For Infinity: Falls back to delete+insert (Infinity doesn't support partial updates well).
Args:
doc_id: Document ID
meta_fields: Metadata dictionary
Returns:
True if successful, False otherwise
"""
try:
from api.db.db_models import Knowledgebase
# Get document with tenant_id
doc_query = Document.select(Document, Knowledgebase.tenant_id).join(
Knowledgebase, on=(Knowledgebase.id == Document.kb_id)
).where(Document.id == doc_id)
doc = doc_query.first()
if not doc:
logging.warning(f"Document {doc_id} not found for metadata update")
return False
# Extract fields
doc_obj = doc
tenant_id = doc.knowledgebase.tenant_id
kb_id = doc_obj.kb_id
index_name = cls._get_doc_meta_index_name(tenant_id)
# Post-process to split combined values
processed_meta = cls._split_combined_values(meta_fields)
logging.debug(f"[update_document_metadata] Updating doc_id: {doc_id}, kb_id: {kb_id}, meta_fields: {processed_meta}")
# For Elasticsearch, use efficient partial update
if not settings.DOC_ENGINE_INFINITY:
try:
# Use ES partial update API - much more efficient than delete+insert
settings.docStoreConn.es.update(
index=index_name,
id=doc_id,
refresh=True, # Make changes immediately visible
doc={"meta_fields": processed_meta}
)
logging.debug(f"Successfully updated metadata for document {doc_id} using ES partial update")
return True
except Exception as e:
logging.error(f"ES partial update failed for document {doc_id}: {e}")
# Fall back to delete+insert if partial update fails
logging.info(f"Falling back to delete+insert for document {doc_id}")
# For Infinity or as fallback: use delete+insert
logging.debug(f"[update_document_metadata] Using delete+insert method for doc_id: {doc_id}")
cls.delete_document_metadata(doc_id, skip_empty_check=True)
return cls.insert_document_metadata(doc_id, processed_meta)
except Exception as e:
logging.error(f"Error updating metadata for document {doc_id}: {e}")
return False
@classmethod
@DB.connection_context()
def delete_document_metadata(cls, doc_id: str, skip_empty_check: bool = False) -> bool:
"""
Delete document metadata from ES/Infinity.
Also drops the metadata table if it becomes empty (efficiently).
If document has no metadata in the table, this is a no-op.
Args:
doc_id: Document ID
skip_empty_check: If True, skip checking/dropping empty table (for bulk deletions)
Returns:
True if successful (or no metadata to delete), False otherwise
"""
try:
from api.db.db_models import Knowledgebase
logging.debug(f"[METADATA DELETE] Starting metadata deletion for document: {doc_id}")
# Get document with tenant_id
doc_query = Document.select(Document, Knowledgebase.tenant_id).join(
Knowledgebase, on=(Knowledgebase.id == Document.kb_id)
).where(Document.id == doc_id)
doc = doc_query.first()
if not doc:
logging.warning(f"Document {doc_id} not found for metadata deletion")
return False
tenant_id = doc.knowledgebase.tenant_id
kb_id = doc.kb_id
index_name = cls._get_doc_meta_index_name(tenant_id)
logging.debug(f"[delete_document_metadata] Deleting doc_id: {doc_id}, kb_id: {kb_id}, index: {index_name}")
# Check if metadata table exists before attempting deletion
# This is the key optimization - no table = no metadata = nothing to delete
if not settings.docStoreConn.index_exist(index_name, ""):
logging.debug(f"Metadata table {index_name} does not exist, skipping metadata deletion for document {doc_id}")
return True # No metadata to delete is considered success
# Try to get the metadata to confirm it exists before deleting
# This is more efficient than attempting delete on non-existent records
try:
existing_metadata = settings.docStoreConn.get(
doc_id,
index_name,
[""] # Empty list for metadata tables
)
logging.debug(f"[METADATA DELETE] Get result: {existing_metadata is not None}")
if not existing_metadata:
logging.debug(f"[METADATA DELETE] Document {doc_id} has no metadata in table, skipping deletion")
# Only check/drop table if not skipped (tenant deletion will handle it)
if not skip_empty_check:
cls._drop_empty_metadata_table(index_name, tenant_id)
return True # No metadata to delete is success
except Exception as e:
# If get fails, document might not exist in metadata table, which is fine
logging.error(f"[METADATA DELETE] Get failed: {e}")
# Continue to check/drop table if needed
# Delete from ES/Infinity (only if metadata exists)
# For metadata tables, pass kb_id for the delete operation
# The delete() method will detect it's a metadata table and skip the kb_id filter
logging.debug(f"[METADATA DELETE] Deleting metadata with condition: {{'id': '{doc_id}'}}")
deleted_count = settings.docStoreConn.delete(
{"id": doc_id},
index_name,
kb_id # Pass actual kb_id (delete() will handle metadata tables correctly)
)
logging.debug(f"[METADATA DELETE] Deleted count: {deleted_count}")
# Only check if table should be dropped if not skipped (for bulk operations)
# Note: delete operation already uses refresh=True, so data is immediately available
if not skip_empty_check:
# Check by querying the actual metadata table (not MySQL)
cls._drop_empty_metadata_table(index_name, tenant_id)
logging.debug(f"Successfully deleted metadata for document {doc_id}")
return True
except Exception as e:
logging.error(f"Error deleting metadata for document {doc_id}: {e}")
return False
@classmethod
def _drop_empty_metadata_table(cls, index_name: str, tenant_id: str) -> None:
"""
Check if metadata table is empty and drop it if so.
Uses optimized count query instead of full search.
This prevents accumulation of empty metadata tables.
Args:
index_name: Metadata table/index name
tenant_id: Tenant ID
"""
try:
logging.debug(f"[DROP EMPTY TABLE] Starting empty table check for: {index_name}")
# Check if table exists first (cheap operation)
if not settings.docStoreConn.index_exist(index_name, ""):
logging.debug(f"[DROP EMPTY TABLE] Metadata table {index_name} does not exist, skipping")
return
logging.debug(f"[DROP EMPTY TABLE] Table {index_name} exists, checking if empty...")
# Use ES count API for accurate count
# Note: No need to refresh since delete operation already uses refresh=True
try:
count_response = settings.docStoreConn.es.count(index=index_name)
total_count = count_response['count']
logging.debug(f"[DROP EMPTY TABLE] ES count API result: {total_count} documents")
is_empty = (total_count == 0)
except Exception as e:
logging.warning(f"[DROP EMPTY TABLE] Count API failed, falling back to search: {e}")
# Fallback to search if count fails
from common.doc_store.doc_store_base import OrderByExpr
results = settings.docStoreConn.search(
select_fields=["id"],
highlight_fields=[],
condition={},
match_expressions=[],
order_by=OrderByExpr(),
offset=0,
limit=1, # Only need 1 result to know if table is non-empty
index_names=index_name,
knowledgebase_ids=[""] # Metadata tables don't filter by KB
)
logging.debug(f"[DROP EMPTY TABLE] Search results type: {type(results)}, results: {results}")
# Check if empty based on return type (fallback search only)
if isinstance(results, tuple) and len(results) == 2:
# Infinity returns (DataFrame, int)
df, total = results
logging.debug(f"[DROP EMPTY TABLE] Infinity format - total: {total}, df length: {len(df) if hasattr(df, '__len__') else 'N/A'}")
is_empty = (total == 0 or (hasattr(df, '__len__') and len(df) == 0))
elif hasattr(results, 'get') and 'hits' in results:
# ES format - MUST check this before hasattr(results, '__len__')
# because ES response objects also have __len__
total = results.get('hits', {}).get('total', {})
hits = results.get('hits', {}).get('hits', [])
# ES 7.x+: total is a dict like {'value': 0, 'relation': 'eq'}
# ES 6.x: total is an int
if isinstance(total, dict):
total_count = total.get('value', 0)
else:
total_count = total
logging.debug(f"[DROP EMPTY TABLE] ES format - total: {total_count}, hits count: {len(hits)}")
is_empty = (total_count == 0 or len(hits) == 0)
elif hasattr(results, '__len__'):
# DataFrame or list (check this AFTER ES format)
result_len = len(results)
logging.debug(f"[DROP EMPTY TABLE] List/DataFrame format - length: {result_len}")
is_empty = result_len == 0
else:
logging.warning(f"[DROP EMPTY TABLE] Unknown result format: {type(results)}")
is_empty = False
if is_empty:
logging.debug(f"[DROP EMPTY TABLE] Metadata table {index_name} is empty, dropping it")
drop_result = settings.docStoreConn.delete_idx(index_name, "")
logging.debug(f"[DROP EMPTY TABLE] Drop result: {drop_result}")
else:
logging.debug(f"[DROP EMPTY TABLE] Metadata table {index_name} still has documents, keeping it")
except Exception as e:
# Log but don't fail - metadata deletion was successful
logging.error(f"[DROP EMPTY TABLE] Failed to check/drop empty metadata table {index_name}: {e}")
@classmethod
@DB.connection_context()
def get_document_metadata(cls, doc_id: str) -> Dict:
"""
Get document metadata from ES/Infinity.
Args:
doc_id: Document ID
Returns:
Metadata dictionary, empty dict if not found
"""
try:
from api.db.db_models import Knowledgebase
# Get document with tenant_id
doc_query = Document.select(Document, Knowledgebase.tenant_id).join(
Knowledgebase, on=(Knowledgebase.id == Document.kb_id)
).where(Document.id == doc_id)
doc = doc_query.first()
if not doc:
logging.warning(f"Document {doc_id} not found")
return {}
# Extract fields
doc_obj = doc
tenant_id = doc.knowledgebase.tenant_id
kb_id = doc_obj.kb_id
index_name = cls._get_doc_meta_index_name(tenant_id)
# Try to get metadata from ES/Infinity
metadata_doc = settings.docStoreConn.get(
doc_id,
index_name,
[kb_id]
)
if metadata_doc:
# Extract and unflatten metadata
return cls._extract_metadata(metadata_doc)
return {}
except Exception as e:
logging.error(f"Error getting metadata for document {doc_id}: {e}")
return {}
@classmethod
@DB.connection_context()
def get_meta_by_kbs(cls, kb_ids: List[str]) -> Dict:
"""
Get metadata for documents in knowledge bases (Legacy).
Legacy metadata aggregator (backward-compatible).
- Does NOT expand list values and a list is kept as one string key.
Example: {"tags": ["foo","bar"]} -> meta["tags"]["['foo', 'bar']"] = [doc_id]
- Expects meta_fields is a dict.
Use when existing callers rely on the old list-as-string semantics.
Args:
kb_ids: List of knowledge base IDs
Returns:
Metadata dictionary in format: {field_name: {value: [doc_ids]}}
"""
try:
from api.db.db_models import Knowledgebase
from common.doc_store.doc_store_base import OrderByExpr
# Get tenant_id from first KB
kb = Knowledgebase.get_by_id(kb_ids[0])
if not kb:
return {}
tenant_id = kb.tenant_id
index_name = cls._get_doc_meta_index_name(tenant_id)
condition = {"kb_id": kb_ids}
order_by = OrderByExpr()
# Query with large limit
results = settings.docStoreConn.search(
select_fields=["*"],
highlight_fields=[],
condition=condition,
match_expressions=[],
order_by=order_by,
offset=0,
limit=10000,
index_names=index_name,
knowledgebase_ids=kb_ids
)
logging.debug(f"[get_meta_by_kbs] index_name: {index_name}, kb_ids: {kb_ids}")
# Aggregate metadata (legacy: keeps lists as string keys)
meta = {}
# Use helper to iterate over results in any format
for doc_id, doc in cls._iter_search_results(results):
# Extract metadata fields (exclude system fields)
doc_meta = cls._extract_metadata(doc)
# Legacy: Keep lists as string keys (do NOT expand)
for k, v in doc_meta.items():
if k not in meta:
meta[k] = {}
# If not list, make it a list
if not isinstance(v, list):
v = [v]
# Legacy: Use the entire list as a string key
# Skip nested lists/dicts
if isinstance(v, list) and any(isinstance(x, (list, dict)) for x in v):
continue
list_key = str(v)
if list_key not in meta[k]:
meta[k][list_key] = []
meta[k][list_key].append(doc_id)
logging.debug(f"[get_meta_by_kbs] KBs: {kb_ids}, Returning metadata: {meta}")
return meta
except Exception as e:
logging.error(f"Error getting metadata for KBs {kb_ids}: {e}")
return {}
@classmethod
@DB.connection_context()
def get_flatted_meta_by_kbs(cls, kb_ids: List[str]) -> Dict:
"""
Get flattened metadata for documents in knowledge bases.
- Parses stringified JSON meta_fields when possible and skips non-dict or unparsable values.
- Expands list values into individual entries.
Example: {"tags": ["foo","bar"], "author": "alice"} ->
meta["tags"]["foo"] = [doc_id], meta["tags"]["bar"] = [doc_id], meta["author"]["alice"] = [doc_id]
Prefer for metadata_condition filtering and scenarios that must respect list semantics.
Args:
kb_ids: List of knowledge base IDs
Returns:
Metadata dictionary in format: {field_name: {value: [doc_ids]}}
"""
try:
from api.db.db_models import Knowledgebase
from common.doc_store.doc_store_base import OrderByExpr
# Get tenant_id from first KB
kb = Knowledgebase.get_by_id(kb_ids[0])
if not kb:
return {}
tenant_id = kb.tenant_id
index_name = cls._get_doc_meta_index_name(tenant_id)
condition = {"kb_id": kb_ids}
order_by = OrderByExpr()
# Query with large limit
results = settings.docStoreConn.search(
select_fields=["*"], # Get all fields
highlight_fields=[],
condition=condition,
match_expressions=[],
order_by=order_by,
offset=0,
limit=10000,
index_names=index_name,
knowledgebase_ids=kb_ids
)
logging.debug(f"[get_flatted_meta_by_kbs] index_name: {index_name}, kb_ids: {kb_ids}")
logging.debug(f"[get_flatted_meta_by_kbs] results type: {type(results)}")
# Aggregate metadata
meta = {}
# Use helper to iterate over results in any format
for doc_id, doc in cls._iter_search_results(results):
# Extract metadata fields (exclude system fields)
doc_meta = cls._extract_metadata(doc)
for k, v in doc_meta.items():
if k not in meta:
meta[k] = {}
values = v if isinstance(v, list) else [v]
for vv in values:
if vv is None:
continue
sv = str(vv)
if sv not in meta[k]:
meta[k][sv] = []
meta[k][sv].append(doc_id)
logging.debug(f"[get_flatted_meta_by_kbs] KBs: {kb_ids}, Returning metadata: {meta}")
return meta
except Exception as e:
logging.error(f"Error getting flattened metadata for KBs {kb_ids}: {e}")
return {}
@classmethod
def get_metadata_for_documents(cls, doc_ids: Optional[List[str]], kb_id: str) -> Dict[str, Dict]:
"""
Get metadata fields for specific documents.
Returns a mapping of doc_id -> meta_fields
Args:
doc_ids: List of document IDs (if None, gets all documents with metadata for the KB)
kb_id: Knowledge base ID
Returns:
Dictionary mapping doc_id to meta_fields dict
"""
try:
results = cls._search_metadata(kb_id, condition={"kb_id": kb_id})
if not results:
return {}
# Build mapping: doc_id -> meta_fields
meta_mapping = {}
# If doc_ids is provided, create a set for efficient lookup
doc_ids_set = set(doc_ids) if doc_ids else None
# Use helper to iterate over results in any format
for doc_id, doc in cls._iter_search_results(results):
# Filter by doc_ids if provided
if doc_ids_set is not None and doc_id not in doc_ids_set:
continue
# Extract metadata (handles both JSON strings and dicts)
doc_meta = cls._extract_metadata(doc)
if doc_meta:
meta_mapping[doc_id] = doc_meta
logging.debug(f"[get_metadata_for_documents] Found metadata for {len(meta_mapping)}/{len(doc_ids) if doc_ids else 'all'} documents")
return meta_mapping
except Exception as e:
logging.error(f"Error getting metadata for documents: {e}")
return {}
@classmethod
@DB.connection_context()
def get_metadata_summary(cls, kb_id: str, doc_ids=None) -> Dict:
"""
Get metadata summary for documents in a knowledge base.
Args:
kb_id: Knowledge base ID
doc_ids: Optional list of document IDs to filter by
Returns:
Dictionary with metadata field statistics in format:
{
"field_name": {
"type": "string" | "number" | "list",
"values": [("value1", count1), ("value2", count2), ...] # sorted by count desc
}
}
"""
def _meta_value_type(value):
"""Determine the type of a metadata value."""
if value is None:
return None
if isinstance(value, list):
return "list"
if isinstance(value, bool):
return "string"
if isinstance(value, (int, float)):
return "number"
return "string"
try:
results = cls._search_metadata(kb_id, condition={"kb_id": kb_id})
if not results:
return {}
# If doc_ids are provided, we'll filter after the search
doc_ids_set = set(doc_ids) if doc_ids else None
# Aggregate metadata
summary = {}
type_counter = {}
logging.debug(f"[METADATA SUMMARY] KB: {kb_id}, doc_ids: {doc_ids}")
# Use helper to iterate over results in any format
for doc_id, doc in cls._iter_search_results(results):
# Check doc_ids filter
if doc_ids_set and doc_id not in doc_ids_set:
continue
doc_meta = cls._extract_metadata(doc)
for k, v in doc_meta.items():
# Track type counts for this field
value_type = _meta_value_type(v)
if value_type:
if k not in type_counter:
type_counter[k] = {}
type_counter[k][value_type] = type_counter[k].get(value_type, 0) + 1
# Aggregate value counts
values = v if isinstance(v, list) else [v]
for vv in values:
if not vv:
continue
sv = str(vv)
if k not in summary:
summary[k] = {}
summary[k][sv] = summary[k].get(sv, 0) + 1
# Build result with type information and sorted values
result = {}
for k, v in summary.items():
values = sorted([(val, cnt) for val, cnt in v.items()], key=lambda x: x[1], reverse=True)
type_counts = type_counter.get(k, {})
value_type = "string"
if type_counts:
value_type = max(type_counts.items(), key=lambda item: item[1])[0]
result[k] = {"type": value_type, "values": values}
logging.debug(f"[METADATA SUMMARY] Final result: {result}")
return result
except Exception as e:
logging.error(f"Error getting metadata summary for KB {kb_id}: {e}")
return {}
@classmethod
@DB.connection_context()
def batch_update_metadata(cls, kb_id: str, doc_ids: List[str], updates=None, deletes=None) -> int:
"""
Batch update metadata for documents in a knowledge base.
Args:
kb_id: Knowledge base ID
doc_ids: List of document IDs to update
updates: List of update operations, each with:
- key: field name to update
- value: new value
- match (optional): only update if current value matches this
deletes: List of delete operations, each with:
- key: field name to delete from
- value (optional): specific value to delete (if not provided, deletes the entire field)
Returns:
Number of documents updated
Examples:
updates = [{"key": "author", "value": "John"}]
updates = [{"key": "tags", "value": "new", "match": "old"}] # Replace "old" with "new" in tags list
deletes = [{"key": "author"}] # Delete entire author field
deletes = [{"key": "tags", "value": "obsolete"}] # Remove "obsolete" from tags list
"""
updates = updates or []
deletes = deletes or []
if not doc_ids:
return 0
def _normalize_meta(meta):
"""Normalize metadata to a dict."""
if isinstance(meta, str):
try:
meta = json.loads(meta)
except Exception:
return {}
if not isinstance(meta, dict):
return {}
return deepcopy(meta)
def _str_equal(a, b):
"""Compare two values as strings."""
return str(a) == str(b)
def _apply_updates(meta):
"""Apply update operations to metadata."""
changed = False
for upd in updates:
key = upd.get("key")
if not key:
continue
new_value = upd.get("value")
match_value = upd.get("match", None)
match_provided = match_value is not None and match_value != ""
if key not in meta:
if match_provided:
continue
meta[key] = dedupe_list(new_value) if isinstance(new_value, list) else new_value
changed = True
continue
if isinstance(meta[key], list):
if not match_provided:
# No match provided, append new_value to the list
if isinstance(new_value, list):
meta[key] = dedupe_list(meta[key] + new_value)
else:
meta[key] = dedupe_list(meta[key] + [new_value])
changed = True
else:
# Replace items matching match_value with new_value
replaced = False
new_list = []
for item in meta[key]:
if _str_equal(item, match_value):
new_list.append(new_value)
replaced = True
else:
new_list.append(item)
if replaced:
meta[key] = dedupe_list(new_list)
changed = True
else:
if not match_provided:
meta[key] = new_value
changed = True
else:
if _str_equal(meta[key], match_value):
meta[key] = new_value
changed = True
return changed
def _apply_deletes(meta):
"""Apply delete operations to metadata."""
changed = False
for d in deletes:
key = d.get("key")
if not key or key not in meta:
continue
value = d.get("value", None)
if isinstance(meta[key], list):
if value is None:
del meta[key]
changed = True
continue
new_list = [item for item in meta[key] if not _str_equal(item, value)]
if len(new_list) != len(meta[key]):
if new_list:
meta[key] = new_list
else:
del meta[key]
changed = True
else:
if value is None or _str_equal(meta[key], value):
del meta[key]
changed = True
return changed
try:
results = cls._search_metadata(kb_id, condition=None)
if not results:
results = [] # Treat as empty list if None
updated_docs = 0
doc_ids_set = set(doc_ids)
found_doc_ids = set()
logging.debug(f"[batch_update_metadata] Searching for doc_ids: {doc_ids}")
# Use helper to iterate over results in any format
for doc_id, doc in cls._iter_search_results(results):
# Filter to only process requested doc_ids
if doc_id not in doc_ids_set:
continue
found_doc_ids.add(doc_id)
# Get current metadata
current_meta = cls._extract_metadata(doc)
meta = _normalize_meta(current_meta)
original_meta = deepcopy(meta)
logging.debug(f"[batch_update_metadata] Doc {doc_id}: current_meta={current_meta}, meta={meta}")
logging.debug(f"[batch_update_metadata] Updates to apply: {updates}, Deletes: {deletes}")
# Apply updates and deletes
changed = _apply_updates(meta)
logging.debug(f"[batch_update_metadata] After _apply_updates: changed={changed}, meta={meta}")
changed = _apply_deletes(meta) or changed
logging.debug(f"[batch_update_metadata] After _apply_deletes: changed={changed}, meta={meta}")
# Update if changed
if changed and meta != original_meta:
logging.debug(f"[batch_update_metadata] Updating doc_id: {doc_id}, meta: {meta}")
# If metadata is empty, delete the row entirely instead of keeping empty metadata
if not meta:
cls.delete_document_metadata(doc_id, skip_empty_check=True)
else:
cls.update_document_metadata(doc_id, meta)
updated_docs += 1
# Handle documents that don't have metadata rows yet
# These documents weren't in the search results, so we need to insert new metadata for them
missing_doc_ids = doc_ids_set - found_doc_ids
if missing_doc_ids and updates:
logging.debug(f"[batch_update_metadata] Inserting new metadata for documents without metadata rows: {missing_doc_ids}")
for doc_id in missing_doc_ids:
# Apply updates to create new metadata
meta = {}
_apply_updates(meta)
if meta:
# Only insert if there's actual metadata to add
cls.update_document_metadata(doc_id, meta)
updated_docs += 1
logging.debug(f"[batch_update_metadata] Inserted metadata for doc_id: {doc_id}, meta: {meta}")
logging.debug(f"[batch_update_metadata] KB: {kb_id}, doc_ids: {doc_ids}, updated: {updated_docs}")
return updated_docs
except Exception as e:
logging.error(f"Error in batch_update_metadata for KB {kb_id}: {e}")
return 0