Refa: refactor metadata filter (#11907)

### What problem does this PR solve?

Refactor metadata filter.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Refactoring

---------

Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>
This commit is contained in:
Yongteng Lei
2025-12-12 17:12:38 +08:00
committed by GitHub
parent 0fcb1680fd
commit 0f0fb53256
10 changed files with 229 additions and 269 deletions

View File

@ -32,6 +32,7 @@ from api.db.services.document_service import DocumentService
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.langfuse_service import TenantLangfuseService
from api.db.services.llm_service import LLMBundle
from common.metadata_utils import apply_meta_data_filter
from api.db.services.tenant_llm_service import TenantLLMService
from common.time_utils import current_timestamp, datetime_format
from graphrag.general.mind_map_extractor import MindMapExtractor
@ -39,7 +40,7 @@ from rag.app.resume import forbidden_select_fields4resume
from rag.app.tag import label_question
from rag.nlp.search import index_name
from rag.prompts.generator import chunks_format, citation_prompt, cross_languages, full_question, kb_prompt, keyword_extraction, message_fit_in, \
gen_meta_filter, PROMPT_JINJA_ENV, ASK_SUMMARY
PROMPT_JINJA_ENV, ASK_SUMMARY
from common.token_utils import num_tokens_from_string
from rag.utils.tavily_conn import Tavily
from common.string_utils import remove_redundant_spaces
@ -277,77 +278,6 @@ def repair_bad_citation_formats(answer: str, kbinfos: dict, idx: set):
return answer, idx
def convert_conditions(metadata_condition):
if metadata_condition is None:
metadata_condition = {}
op_mapping = {
"is": "=",
"not is": ""
}
return [
{
"op": op_mapping.get(cond["comparison_operator"], cond["comparison_operator"]),
"key": cond["name"],
"value": cond["value"]
}
for cond in metadata_condition.get("conditions", [])
]
def meta_filter(metas: dict, filters: list[dict], logic: str = "and"):
doc_ids = set([])
def filter_out(v2docs, operator, value):
ids = []
for input, docids in v2docs.items():
if operator in ["=", "", ">", "<", "", ""]:
try:
input = float(input)
value = float(value)
except Exception:
input = str(input)
value = str(value)
for conds in [
(operator == "contains", str(value).lower() in str(input).lower()),
(operator == "not contains", str(value).lower() not in str(input).lower()),
(operator == "in", str(input).lower() in str(value).lower()),
(operator == "not in", str(input).lower() not in str(value).lower()),
(operator == "start with", str(input).lower().startswith(str(value).lower())),
(operator == "end with", str(input).lower().endswith(str(value).lower())),
(operator == "empty", not input),
(operator == "not empty", input),
(operator == "=", input == value),
(operator == "", input != value),
(operator == ">", input > value),
(operator == "<", input < value),
(operator == "", input >= value),
(operator == "", input <= value),
]:
try:
if all(conds):
ids.extend(docids)
break
except Exception:
pass
return ids
for k, v2docs in metas.items():
for f in filters:
if k != f["key"]:
continue
ids = filter_out(v2docs, f["op"], f["value"])
if not doc_ids:
doc_ids = set(ids)
else:
if logic == "and":
doc_ids = doc_ids & set(ids)
else:
doc_ids = doc_ids | set(ids)
if not doc_ids:
return []
return list(doc_ids)
async def async_chat(dialog, messages, stream=True, **kwargs):
assert messages[-1]["role"] == "user", "The last content of this conversation is not from user."
if not dialog.kb_ids and not dialog.prompt_config.get("tavily_api_key"):
@ -420,25 +350,13 @@ async def async_chat(dialog, messages, stream=True, **kwargs):
if dialog.meta_data_filter:
metas = DocumentService.get_meta_by_kbs(dialog.kb_ids)
if dialog.meta_data_filter.get("method") == "auto":
filters: dict = await gen_meta_filter(chat_mdl, metas, questions[-1])
attachments.extend(meta_filter(metas, filters["conditions"], filters.get("logic", "and")))
if not attachments:
attachments = None
elif dialog.meta_data_filter.get("method") == "semi_auto":
selected_keys = dialog.meta_data_filter.get("semi_auto", [])
if selected_keys:
filtered_metas = {key: metas[key] for key in selected_keys if key in metas}
if filtered_metas:
filters: dict = await gen_meta_filter(chat_mdl, filtered_metas, questions[-1])
attachments.extend(meta_filter(metas, filters["conditions"], filters.get("logic", "and")))
if not attachments:
attachments = None
elif dialog.meta_data_filter.get("method") == "manual":
conds = dialog.meta_data_filter["manual"]
attachments.extend(meta_filter(metas, conds, dialog.meta_data_filter.get("logic", "and")))
if conds and not attachments:
attachments = ["-999"]
attachments = await apply_meta_data_filter(
dialog.meta_data_filter,
metas,
questions[-1],
chat_mdl,
attachments,
)
if prompt_config.get("keyword", False):
questions[-1] += await keyword_extraction(chat_mdl, questions[-1])
@ -838,24 +756,7 @@ async def async_ask(question, kb_ids, tenant_id, chat_llm_name=None, search_conf
if meta_data_filter:
metas = DocumentService.get_meta_by_kbs(kb_ids)
if meta_data_filter.get("method") == "auto":
filters: dict = await gen_meta_filter(chat_mdl, metas, question)
doc_ids.extend(meta_filter(metas, filters["conditions"], filters.get("logic", "and")))
if not doc_ids:
doc_ids = None
elif meta_data_filter.get("method") == "semi_auto":
selected_keys = meta_data_filter.get("semi_auto", [])
if selected_keys:
filtered_metas = {key: metas[key] for key in selected_keys if key in metas}
if filtered_metas:
filters: dict = await gen_meta_filter(chat_mdl, filtered_metas, question)
doc_ids.extend(meta_filter(metas, filters["conditions"], filters.get("logic", "and")))
if not doc_ids:
doc_ids = None
elif meta_data_filter.get("method") == "manual":
doc_ids.extend(meta_filter(metas, meta_data_filter["manual"], meta_data_filter.get("logic", "and")))
if meta_data_filter["manual"] and not doc_ids:
doc_ids = ["-999"]
doc_ids = await apply_meta_data_filter(meta_data_filter, metas, question, chat_mdl, doc_ids)
kbinfos = retriever.retrieval(
question=question,
@ -922,24 +823,7 @@ async def gen_mindmap(question, kb_ids, tenant_id, search_config={}):
if meta_data_filter:
metas = DocumentService.get_meta_by_kbs(kb_ids)
if meta_data_filter.get("method") == "auto":
filters: dict = await gen_meta_filter(chat_mdl, metas, question)
doc_ids.extend(meta_filter(metas, filters["conditions"], filters.get("logic", "and")))
if not doc_ids:
doc_ids = None
elif meta_data_filter.get("method") == "semi_auto":
selected_keys = meta_data_filter.get("semi_auto", [])
if selected_keys:
filtered_metas = {key: metas[key] for key in selected_keys if key in metas}
if filtered_metas:
filters: dict = await gen_meta_filter(chat_mdl, filtered_metas, question)
doc_ids.extend(meta_filter(metas, filters["conditions"], filters.get("logic", "and")))
if not doc_ids:
doc_ids = None
elif meta_data_filter.get("method") == "manual":
doc_ids.extend(meta_filter(metas, meta_data_filter["manual"], meta_data_filter.get("logic", "and")))
if meta_data_filter["manual"] and not doc_ids:
doc_ids = ["-999"]
doc_ids = await apply_meta_data_filter(meta_data_filter, metas, question, chat_mdl, doc_ids)
ranks = settings.retriever.retrieval(
question=question,