mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-19 20:16:49 +08:00
Feat: enhance metadata operation (#11874)
### What problem does this PR solve? Add metadata condition in document list. Add metadata bulk update. Add metadata summary. ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Documentation Update
This commit is contained in:
@ -79,7 +79,7 @@ class DocumentService(CommonService):
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def get_list(cls, kb_id, page_number, items_per_page,
|
||||
orderby, desc, keywords, id, name, suffix=None, run = None):
|
||||
orderby, desc, keywords, id, name, suffix=None, run = None, doc_ids=None):
|
||||
fields = cls.get_cls_model_fields()
|
||||
docs = cls.model.select(*[*fields, UserCanvas.title]).join(File2Document, on = (File2Document.document_id == cls.model.id))\
|
||||
.join(File, on = (File.id == File2Document.file_id))\
|
||||
@ -96,6 +96,8 @@ class DocumentService(CommonService):
|
||||
docs = docs.where(
|
||||
fn.LOWER(cls.model.name).contains(keywords.lower())
|
||||
)
|
||||
if doc_ids:
|
||||
docs = docs.where(cls.model.id.in_(doc_ids))
|
||||
if suffix:
|
||||
docs = docs.where(cls.model.suffix.in_(suffix))
|
||||
if run:
|
||||
@ -123,7 +125,7 @@ class DocumentService(CommonService):
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def get_by_kb_id(cls, kb_id, page_number, items_per_page,
|
||||
orderby, desc, keywords, run_status, types, suffix):
|
||||
orderby, desc, keywords, run_status, types, suffix, doc_ids=None):
|
||||
fields = cls.get_cls_model_fields()
|
||||
if keywords:
|
||||
docs = cls.model.select(*[*fields, UserCanvas.title.alias("pipeline_name"), User.nickname])\
|
||||
@ -143,6 +145,8 @@ class DocumentService(CommonService):
|
||||
.join(User, on=(cls.model.created_by == User.id), join_type=JOIN.LEFT_OUTER)\
|
||||
.where(cls.model.kb_id == kb_id)
|
||||
|
||||
if doc_ids:
|
||||
docs = docs.where(cls.model.id.in_(doc_ids))
|
||||
if run_status:
|
||||
docs = docs.where(cls.model.run.in_(run_status))
|
||||
if types:
|
||||
@ -644,6 +648,13 @@ class DocumentService(CommonService):
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def get_meta_by_kbs(cls, kb_ids):
|
||||
"""
|
||||
Legacy metadata aggregator (backward-compatible).
|
||||
- Does NOT expand list values and a list is kept as one string key.
|
||||
Example: {"tags": ["foo","bar"]} -> meta["tags"]["['foo', 'bar']"] = [doc_id]
|
||||
- Expects meta_fields is a dict.
|
||||
Use when existing callers rely on the old list-as-string semantics.
|
||||
"""
|
||||
fields = [
|
||||
cls.model.id,
|
||||
cls.model.meta_fields,
|
||||
@ -660,6 +671,162 @@ class DocumentService(CommonService):
|
||||
meta[k][v].append(doc_id)
|
||||
return meta
|
||||
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def get_flatted_meta_by_kbs(cls, kb_ids):
|
||||
"""
|
||||
- Parses stringified JSON meta_fields when possible and skips non-dict or unparsable values.
|
||||
- Expands list values into individual entries.
|
||||
Example: {"tags": ["foo","bar"], "author": "alice"} ->
|
||||
meta["tags"]["foo"] = [doc_id], meta["tags"]["bar"] = [doc_id], meta["author"]["alice"] = [doc_id]
|
||||
Prefer for metadata_condition filtering and scenarios that must respect list semantics.
|
||||
"""
|
||||
fields = [
|
||||
cls.model.id,
|
||||
cls.model.meta_fields,
|
||||
]
|
||||
meta = {}
|
||||
for r in cls.model.select(*fields).where(cls.model.kb_id.in_(kb_ids)):
|
||||
doc_id = r.id
|
||||
meta_fields = r.meta_fields or {}
|
||||
if isinstance(meta_fields, str):
|
||||
try:
|
||||
meta_fields = json.loads(meta_fields)
|
||||
except Exception:
|
||||
continue
|
||||
if not isinstance(meta_fields, dict):
|
||||
continue
|
||||
for k, v in meta_fields.items():
|
||||
if k not in meta:
|
||||
meta[k] = {}
|
||||
values = v if isinstance(v, list) else [v]
|
||||
for vv in values:
|
||||
if vv is None:
|
||||
continue
|
||||
sv = str(vv)
|
||||
if sv not in meta[k]:
|
||||
meta[k][sv] = []
|
||||
meta[k][sv].append(doc_id)
|
||||
return meta
|
||||
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def get_metadata_summary(cls, kb_id):
|
||||
fields = [cls.model.id, cls.model.meta_fields]
|
||||
summary = {}
|
||||
for r in cls.model.select(*fields).where(cls.model.kb_id == kb_id):
|
||||
meta_fields = r.meta_fields or {}
|
||||
if isinstance(meta_fields, str):
|
||||
try:
|
||||
meta_fields = json.loads(meta_fields)
|
||||
except Exception:
|
||||
continue
|
||||
if not isinstance(meta_fields, dict):
|
||||
continue
|
||||
for k, v in meta_fields.items():
|
||||
values = v if isinstance(v, list) else [v]
|
||||
for vv in values:
|
||||
if not vv:
|
||||
continue
|
||||
sv = str(vv)
|
||||
if k not in summary:
|
||||
summary[k] = {}
|
||||
summary[k][sv] = summary[k].get(sv, 0) + 1
|
||||
return {k: sorted([(val, cnt) for val, cnt in v.items()], key=lambda x: x[1], reverse=True) for k, v in summary.items()}
|
||||
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def batch_update_metadata(cls, kb_id, doc_ids, updates=None, deletes=None):
|
||||
updates = updates or []
|
||||
deletes = deletes or []
|
||||
if not doc_ids:
|
||||
return 0
|
||||
|
||||
def _normalize_meta(meta):
|
||||
if isinstance(meta, str):
|
||||
try:
|
||||
meta = json.loads(meta)
|
||||
except Exception:
|
||||
return {}
|
||||
if not isinstance(meta, dict):
|
||||
return {}
|
||||
return deepcopy(meta)
|
||||
|
||||
def _str_equal(a, b):
|
||||
return str(a) == str(b)
|
||||
|
||||
def _apply_updates(meta):
|
||||
changed = False
|
||||
for upd in updates:
|
||||
key = upd.get("key")
|
||||
if not key or key not in meta:
|
||||
continue
|
||||
new_value = upd.get("value")
|
||||
match_value = upd.get("match", new_value)
|
||||
if isinstance(meta[key], list):
|
||||
replaced = False
|
||||
new_list = []
|
||||
for item in meta[key]:
|
||||
if match_value and _str_equal(item, match_value):
|
||||
new_list.append(new_value)
|
||||
replaced = True
|
||||
else:
|
||||
new_list.append(item)
|
||||
if replaced:
|
||||
meta[key] = new_list
|
||||
changed = True
|
||||
else:
|
||||
if not match_value:
|
||||
continue
|
||||
if _str_equal(meta[key], match_value):
|
||||
meta[key] = new_value
|
||||
changed = True
|
||||
return changed
|
||||
|
||||
def _apply_deletes(meta):
|
||||
changed = False
|
||||
for d in deletes:
|
||||
key = d.get("key")
|
||||
if not key or key not in meta:
|
||||
continue
|
||||
value = d.get("value", None)
|
||||
if isinstance(meta[key], list):
|
||||
if value is None:
|
||||
del meta[key]
|
||||
changed = True
|
||||
continue
|
||||
new_list = [item for item in meta[key] if not _str_equal(item, value)]
|
||||
if len(new_list) != len(meta[key]):
|
||||
if new_list:
|
||||
meta[key] = new_list
|
||||
else:
|
||||
del meta[key]
|
||||
changed = True
|
||||
else:
|
||||
if value is None or _str_equal(meta[key], value):
|
||||
del meta[key]
|
||||
changed = True
|
||||
return changed
|
||||
|
||||
updated_docs = 0
|
||||
with DB.atomic():
|
||||
rows = cls.model.select(cls.model.id, cls.model.meta_fields).where(
|
||||
(cls.model.id.in_(doc_ids)) & (cls.model.kb_id == kb_id)
|
||||
)
|
||||
for r in rows:
|
||||
meta = _normalize_meta(r.meta_fields or {})
|
||||
original_meta = deepcopy(meta)
|
||||
changed = _apply_updates(meta)
|
||||
changed = _apply_deletes(meta) or changed
|
||||
if changed and meta != original_meta:
|
||||
cls.model.update(
|
||||
meta_fields=meta,
|
||||
update_time=current_timestamp(),
|
||||
update_date=get_format_time()
|
||||
).where(cls.model.id == r.id).execute()
|
||||
updated_docs += 1
|
||||
return updated_docs
|
||||
|
||||
@classmethod
|
||||
@DB.connection_context()
|
||||
def update_progress(cls):
|
||||
|
||||
Reference in New Issue
Block a user