diff --git a/api/apps/document_app.py b/api/apps/document_app.py index ba52bd61c..1a987cd39 100644 --- a/api/apps/document_app.py +++ b/api/apps/document_app.py @@ -27,6 +27,7 @@ from api.db import VALID_FILE_TYPES, FileType from api.db.db_models import Task from api.db.services import duplicate_name from api.db.services.document_service import DocumentService, doc_upload_and_parse +from api.db.services.dialog_service import meta_filter, convert_conditions from api.db.services.file2document_service import File2DocumentService from api.db.services.file_service import FileService from api.db.services.knowledgebase_service import KnowledgebaseService @@ -246,9 +247,19 @@ async def list_docs(): return get_data_error_result(message=f"Invalid filter conditions: {', '.join(invalid_types)} type{'s' if len(invalid_types) > 1 else ''}") suffix = req.get("suffix", []) + metadata_condition = req.get("metadata_condition", {}) or {} + if metadata_condition and not isinstance(metadata_condition, dict): + return get_data_error_result(message="metadata_condition must be an object.") + + doc_ids_filter = None + if metadata_condition: + metas = DocumentService.get_flatted_meta_by_kbs([kb_id]) + doc_ids_filter = meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and")) + if metadata_condition.get("conditions") and not doc_ids_filter: + return get_json_result(data={"total": 0, "docs": []}) try: - docs, tol = DocumentService.get_by_kb_id(kb_id, page_number, items_per_page, orderby, desc, keywords, run_status, types, suffix) + docs, tol = DocumentService.get_by_kb_id(kb_id, page_number, items_per_page, orderby, desc, keywords, run_status, types, suffix, doc_ids_filter) if create_time_from or create_time_to: filtered_docs = [] @@ -319,6 +330,87 @@ async def doc_infos(): return get_json_result(data=list(docs.dicts())) +@manager.route("/metadata/summary", methods=["POST"]) # noqa: F821 +@login_required +async def metadata_summary(): + req = await get_request_json() + kb_id = req.get("kb_id") + if not kb_id: + return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR) + + tenants = UserTenantService.query(user_id=current_user.id) + for tenant in tenants: + if KnowledgebaseService.query(tenant_id=tenant.tenant_id, id=kb_id): + break + else: + return get_json_result(data=False, message="Only owner of knowledgebase authorized for this operation.", code=RetCode.OPERATING_ERROR) + + try: + summary = DocumentService.get_metadata_summary(kb_id) + return get_json_result(data={"summary": summary}) + except Exception as e: + return server_error_response(e) + + +@manager.route("/metadata/update", methods=["POST"]) # noqa: F821 +@login_required +async def metadata_update(): + req = await get_request_json() + kb_id = req.get("kb_id") + if not kb_id: + return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR) + + tenants = UserTenantService.query(user_id=current_user.id) + for tenant in tenants: + if KnowledgebaseService.query(tenant_id=tenant.tenant_id, id=kb_id): + break + else: + return get_json_result(data=False, message="Only owner of knowledgebase authorized for this operation.", code=RetCode.OPERATING_ERROR) + + selector = req.get("selector", {}) or {} + updates = req.get("updates", []) or [] + deletes = req.get("deletes", []) or [] + + if not isinstance(selector, dict): + return get_json_result(data=False, message="selector must be an object.", code=RetCode.ARGUMENT_ERROR) + if not isinstance(updates, list) or not isinstance(deletes, list): + return get_json_result(data=False, message="updates and deletes must be lists.", code=RetCode.ARGUMENT_ERROR) + + metadata_condition = selector.get("metadata_condition", {}) or {} + if metadata_condition and not isinstance(metadata_condition, dict): + return get_json_result(data=False, message="metadata_condition must be an object.", code=RetCode.ARGUMENT_ERROR) + + document_ids = selector.get("document_ids", []) or [] + if document_ids and not isinstance(document_ids, list): + return get_json_result(data=False, message="document_ids must be a list.", code=RetCode.ARGUMENT_ERROR) + + for upd in updates: + if not isinstance(upd, dict) or not upd.get("key") or "value" not in upd: + return get_json_result(data=False, message="Each update requires key and value.", code=RetCode.ARGUMENT_ERROR) + for d in deletes: + if not isinstance(d, dict) or not d.get("key"): + return get_json_result(data=False, message="Each delete requires key.", code=RetCode.ARGUMENT_ERROR) + + kb_doc_ids = KnowledgebaseService.list_documents_by_ids([kb_id]) + target_doc_ids = set(kb_doc_ids) + if document_ids: + invalid_ids = set(document_ids) - set(kb_doc_ids) + if invalid_ids: + return get_json_result(data=False, message=f"These documents do not belong to dataset {kb_id}: {', '.join(invalid_ids)}", code=RetCode.ARGUMENT_ERROR) + target_doc_ids = set(document_ids) + + if metadata_condition: + metas = DocumentService.get_flatted_meta_by_kbs([kb_id]) + filtered_ids = set(meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and"))) + target_doc_ids = target_doc_ids & filtered_ids + if metadata_condition.get("conditions") and not target_doc_ids: + return get_json_result(data={"updated": 0, "matched_docs": 0}) + + target_doc_ids = list(target_doc_ids) + updated = DocumentService.batch_update_metadata(kb_id, target_doc_ids, updates, deletes) + return get_json_result(data={"updated": updated, "matched_docs": len(target_doc_ids)}) + + @manager.route("/thumbnails", methods=["GET"]) # noqa: F821 # @login_required def thumbnails(): @@ -698,7 +790,10 @@ async def set_meta(): if not isinstance(meta, dict): return get_json_result(data=False, message="Only dictionary type supported.", code=RetCode.ARGUMENT_ERROR) for k, v in meta.items(): - if not isinstance(v, str) and not isinstance(v, int) and not isinstance(v, float): + if isinstance(v, list): + if not all(isinstance(i, (str, int, float)) for i in v): + return get_json_result(data=False, message=f"The type is not supported in list: {v}", code=RetCode.ARGUMENT_ERROR) + elif not isinstance(v, (str, int, float)): return get_json_result(data=False, message=f"The type is not supported: {v}", code=RetCode.ARGUMENT_ERROR) except Exception as e: return get_json_result(data=False, message=f"Json syntax error: {e}", code=RetCode.ARGUMENT_ERROR) diff --git a/api/apps/sdk/doc.py b/api/apps/sdk/doc.py index 0019e2a42..b65a20133 100644 --- a/api/apps/sdk/doc.py +++ b/api/apps/sdk/doc.py @@ -14,6 +14,7 @@ # limitations under the License. # import datetime +import json import logging import pathlib import re @@ -551,13 +552,29 @@ def list_docs(dataset_id, tenant_id): run_status = q.getlist("run") create_time_from = int(q.get("create_time_from", 0)) create_time_to = int(q.get("create_time_to", 0)) + metadata_condition_raw = q.get("metadata_condition") + metadata_condition = {} + if metadata_condition_raw: + try: + metadata_condition = json.loads(metadata_condition_raw) + except Exception: + return get_error_data_result(message="metadata_condition must be valid JSON.") + if metadata_condition and not isinstance(metadata_condition, dict): + return get_error_data_result(message="metadata_condition must be an object.") # map run status (text or numeric) - align with API parameter run_status_text_to_numeric = {"UNSTART": "0", "RUNNING": "1", "CANCEL": "2", "DONE": "3", "FAIL": "4"} run_status_converted = [run_status_text_to_numeric.get(v, v) for v in run_status] + doc_ids_filter = None + if metadata_condition: + metas = DocumentService.get_flatted_meta_by_kbs([dataset_id]) + doc_ids_filter = meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and")) + if metadata_condition.get("conditions") and not doc_ids_filter: + return get_result(data={"total": 0, "docs": []}) + docs, total = DocumentService.get_list( - dataset_id, page, page_size, orderby, desc, keywords, document_id, name, suffix, run_status_converted + dataset_id, page, page_size, orderby, desc, keywords, document_id, name, suffix, run_status_converted, doc_ids_filter ) # time range filter (0 means no bound) @@ -586,6 +603,70 @@ def list_docs(dataset_id, tenant_id): return get_result(data={"total": total, "docs": output_docs}) + +@manager.route("/datasets//metadata/summary", methods=["GET"]) # noqa: F821 +@token_required +def metadata_summary(dataset_id, tenant_id): + if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): + return get_error_data_result(message=f"You don't own the dataset {dataset_id}. ") + + try: + summary = DocumentService.get_metadata_summary(dataset_id) + return get_result(data={"summary": summary}) + except Exception as e: + return server_error_response(e) + + +@manager.route("/datasets//metadata/update", methods=["POST"]) # noqa: F821 +@token_required +async def metadata_batch_update(dataset_id, tenant_id): + if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id): + return get_error_data_result(message=f"You don't own the dataset {dataset_id}. ") + + req = await get_request_json() + selector = req.get("selector", {}) or {} + updates = req.get("updates", []) or [] + deletes = req.get("deletes", []) or [] + + if not isinstance(selector, dict): + return get_error_data_result(message="selector must be an object.") + if not isinstance(updates, list) or not isinstance(deletes, list): + return get_error_data_result(message="updates and deletes must be lists.") + + metadata_condition = selector.get("metadata_condition", {}) or {} + if metadata_condition and not isinstance(metadata_condition, dict): + return get_error_data_result(message="metadata_condition must be an object.") + + document_ids = selector.get("document_ids", []) or [] + if document_ids and not isinstance(document_ids, list): + return get_error_data_result(message="document_ids must be a list.") + + for upd in updates: + if not isinstance(upd, dict) or not upd.get("key") or "value" not in upd: + return get_error_data_result(message="Each update requires key and value.") + for d in deletes: + if not isinstance(d, dict) or not d.get("key"): + return get_error_data_result(message="Each delete requires key.") + + kb_doc_ids = KnowledgebaseService.list_documents_by_ids([dataset_id]) + target_doc_ids = set(kb_doc_ids) + if document_ids: + invalid_ids = set(document_ids) - set(kb_doc_ids) + if invalid_ids: + return get_error_data_result(message=f"These documents do not belong to dataset {dataset_id}: {', '.join(invalid_ids)}") + target_doc_ids = set(document_ids) + + if metadata_condition: + metas = DocumentService.get_flatted_meta_by_kbs([dataset_id]) + filtered_ids = set(meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and"))) + target_doc_ids = target_doc_ids & filtered_ids + if metadata_condition.get("conditions") and not target_doc_ids: + return get_result(data={"updated": 0, "matched_docs": 0}) + + target_doc_ids = list(target_doc_ids) + updated = DocumentService.batch_update_metadata(dataset_id, target_doc_ids, updates, deletes) + return get_result(data={"updated": updated, "matched_docs": len(target_doc_ids)}) + @manager.route("/datasets//documents", methods=["DELETE"]) # noqa: F821 @token_required async def delete(tenant_id, dataset_id): diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 43adf5d8e..6326ef5de 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -79,7 +79,7 @@ class DocumentService(CommonService): @classmethod @DB.connection_context() def get_list(cls, kb_id, page_number, items_per_page, - orderby, desc, keywords, id, name, suffix=None, run = None): + orderby, desc, keywords, id, name, suffix=None, run = None, doc_ids=None): fields = cls.get_cls_model_fields() docs = cls.model.select(*[*fields, UserCanvas.title]).join(File2Document, on = (File2Document.document_id == cls.model.id))\ .join(File, on = (File.id == File2Document.file_id))\ @@ -96,6 +96,8 @@ class DocumentService(CommonService): docs = docs.where( fn.LOWER(cls.model.name).contains(keywords.lower()) ) + if doc_ids: + docs = docs.where(cls.model.id.in_(doc_ids)) if suffix: docs = docs.where(cls.model.suffix.in_(suffix)) if run: @@ -123,7 +125,7 @@ class DocumentService(CommonService): @classmethod @DB.connection_context() def get_by_kb_id(cls, kb_id, page_number, items_per_page, - orderby, desc, keywords, run_status, types, suffix): + orderby, desc, keywords, run_status, types, suffix, doc_ids=None): fields = cls.get_cls_model_fields() if keywords: docs = cls.model.select(*[*fields, UserCanvas.title.alias("pipeline_name"), User.nickname])\ @@ -143,6 +145,8 @@ class DocumentService(CommonService): .join(User, on=(cls.model.created_by == User.id), join_type=JOIN.LEFT_OUTER)\ .where(cls.model.kb_id == kb_id) + if doc_ids: + docs = docs.where(cls.model.id.in_(doc_ids)) if run_status: docs = docs.where(cls.model.run.in_(run_status)) if types: @@ -644,6 +648,13 @@ class DocumentService(CommonService): @classmethod @DB.connection_context() def get_meta_by_kbs(cls, kb_ids): + """ + Legacy metadata aggregator (backward-compatible). + - Does NOT expand list values and a list is kept as one string key. + Example: {"tags": ["foo","bar"]} -> meta["tags"]["['foo', 'bar']"] = [doc_id] + - Expects meta_fields is a dict. + Use when existing callers rely on the old list-as-string semantics. + """ fields = [ cls.model.id, cls.model.meta_fields, @@ -660,6 +671,162 @@ class DocumentService(CommonService): meta[k][v].append(doc_id) return meta + @classmethod + @DB.connection_context() + def get_flatted_meta_by_kbs(cls, kb_ids): + """ + - Parses stringified JSON meta_fields when possible and skips non-dict or unparsable values. + - Expands list values into individual entries. + Example: {"tags": ["foo","bar"], "author": "alice"} -> + meta["tags"]["foo"] = [doc_id], meta["tags"]["bar"] = [doc_id], meta["author"]["alice"] = [doc_id] + Prefer for metadata_condition filtering and scenarios that must respect list semantics. + """ + fields = [ + cls.model.id, + cls.model.meta_fields, + ] + meta = {} + for r in cls.model.select(*fields).where(cls.model.kb_id.in_(kb_ids)): + doc_id = r.id + meta_fields = r.meta_fields or {} + if isinstance(meta_fields, str): + try: + meta_fields = json.loads(meta_fields) + except Exception: + continue + if not isinstance(meta_fields, dict): + continue + for k, v in meta_fields.items(): + if k not in meta: + meta[k] = {} + values = v if isinstance(v, list) else [v] + for vv in values: + if vv is None: + continue + sv = str(vv) + if sv not in meta[k]: + meta[k][sv] = [] + meta[k][sv].append(doc_id) + return meta + + @classmethod + @DB.connection_context() + def get_metadata_summary(cls, kb_id): + fields = [cls.model.id, cls.model.meta_fields] + summary = {} + for r in cls.model.select(*fields).where(cls.model.kb_id == kb_id): + meta_fields = r.meta_fields or {} + if isinstance(meta_fields, str): + try: + meta_fields = json.loads(meta_fields) + except Exception: + continue + if not isinstance(meta_fields, dict): + continue + for k, v in meta_fields.items(): + values = v if isinstance(v, list) else [v] + for vv in values: + if not vv: + continue + sv = str(vv) + if k not in summary: + summary[k] = {} + summary[k][sv] = summary[k].get(sv, 0) + 1 + return {k: sorted([(val, cnt) for val, cnt in v.items()], key=lambda x: x[1], reverse=True) for k, v in summary.items()} + + @classmethod + @DB.connection_context() + def batch_update_metadata(cls, kb_id, doc_ids, updates=None, deletes=None): + updates = updates or [] + deletes = deletes or [] + if not doc_ids: + return 0 + + def _normalize_meta(meta): + if isinstance(meta, str): + try: + meta = json.loads(meta) + except Exception: + return {} + if not isinstance(meta, dict): + return {} + return deepcopy(meta) + + def _str_equal(a, b): + return str(a) == str(b) + + def _apply_updates(meta): + changed = False + for upd in updates: + key = upd.get("key") + if not key or key not in meta: + continue + new_value = upd.get("value") + match_value = upd.get("match", new_value) + if isinstance(meta[key], list): + replaced = False + new_list = [] + for item in meta[key]: + if match_value and _str_equal(item, match_value): + new_list.append(new_value) + replaced = True + else: + new_list.append(item) + if replaced: + meta[key] = new_list + changed = True + else: + if not match_value: + continue + if _str_equal(meta[key], match_value): + meta[key] = new_value + changed = True + return changed + + def _apply_deletes(meta): + changed = False + for d in deletes: + key = d.get("key") + if not key or key not in meta: + continue + value = d.get("value", None) + if isinstance(meta[key], list): + if value is None: + del meta[key] + changed = True + continue + new_list = [item for item in meta[key] if not _str_equal(item, value)] + if len(new_list) != len(meta[key]): + if new_list: + meta[key] = new_list + else: + del meta[key] + changed = True + else: + if value is None or _str_equal(meta[key], value): + del meta[key] + changed = True + return changed + + updated_docs = 0 + with DB.atomic(): + rows = cls.model.select(cls.model.id, cls.model.meta_fields).where( + (cls.model.id.in_(doc_ids)) & (cls.model.kb_id == kb_id) + ) + for r in rows: + meta = _normalize_meta(r.meta_fields or {}) + original_meta = deepcopy(meta) + changed = _apply_updates(meta) + changed = _apply_deletes(meta) or changed + if changed and meta != original_meta: + cls.model.update( + meta_fields=meta, + update_time=current_timestamp(), + update_date=get_format_time() + ).where(cls.model.id == r.id).execute() + updated_docs += 1 + return updated_docs + @classmethod @DB.connection_context() def update_progress(cls): diff --git a/docs/references/http_api_reference.md b/docs/references/http_api_reference.md index e33919c7e..718760dda 100644 --- a/docs/references/http_api_reference.md +++ b/docs/references/http_api_reference.md @@ -1477,7 +1477,7 @@ Failure: ### List documents -**GET** `/api/v1/datasets/{dataset_id}/documents?page={page}&page_size={page_size}&orderby={orderby}&desc={desc}&keywords={keywords}&id={document_id}&name={document_name}&create_time_from={timestamp}&create_time_to={timestamp}&suffix={file_suffix}&run={run_status}` +**GET** `/api/v1/datasets/{dataset_id}/documents?page={page}&page_size={page_size}&orderby={orderby}&desc={desc}&keywords={keywords}&id={document_id}&name={document_name}&create_time_from={timestamp}&create_time_to={timestamp}&suffix={file_suffix}&run={run_status}&metadata_condition={json}` Lists documents in a specified dataset. @@ -1492,6 +1492,7 @@ Lists documents in a specified dataset. ##### Request examples **A basic request with pagination:** + ```bash curl --request GET \ --url http://{address}/api/v1/datasets/{dataset_id}/documents?page=1&page_size=10 \ @@ -1534,6 +1535,11 @@ curl --request GET \ - `3` / `DONE`: Document processing completed successfully - `4` / `FAIL`: Document processing failed Defaults to all statuses. +- `metadata_condition`: (*Filter parameter*), `object` (JSON in query) + Optional metadata filter applied to documents when `document_ids` is not provided. Uses the same structure as retrieval: + - `logic`: `"and"` (default) or `"or"` + - `conditions`: array of `{ "name": string, "comparison_operator": string, "value": string }` + - `comparison_operator` supports: `is`, `not is`, `contains`, `not contains`, `in`, `not in`, `start with`, `end with`, `>`, `<`, `≥`, `≤`, `empty`, `not empty` ##### Usage examples @@ -1545,6 +1551,15 @@ curl --request GET \ --header 'Authorization: Bearer ' ``` +**Filter by metadata (query JSON):** + +```bash +curl -G \ + --url "http://localhost:9222/api/v1/datasets/{{KB_ID}}/documents" \ + --header 'Authorization: Bearer ' \ + --data-urlencode 'metadata_condition={"logic":"and","conditions":[{"name":"tags","comparison_operator":"is","value":"bar"},{"name":"author","comparison_operator":"is","value":"alice"}]}' +``` + #### Response Success: @@ -2088,6 +2103,108 @@ Failure: --- +### Dataset metadata summary + +**GET** `/api/v1/datasets/{dataset_id}/metadata/summary` + +Aggregates metadata values across all documents in a dataset. + +#### Request + +- Method: GET +- URL: `/api/v1/datasets/{dataset_id}/metadata/summary` +- Headers: + - `'Authorization: Bearer '` + +##### Response + +Success: + +```json +{ + "code": 0, + "data": { + "summary": { + "tags": [["bar", 2], ["foo", 1], ["baz", 1]], + "author": [["alice", 2], ["bob", 1]] + } + } +} +``` + +--- + +### Dataset metadata update + +**POST** `/api/v1/datasets/{dataset_id}/metadata/update` + +Batch update or delete document-level metadata in a dataset. If both `document_ids` and `metadata_condition` are omitted, all documents in the dataset are selected. When both are provided, the intersection is used. + +#### Request + +- Method: POST +- URL: `/api/v1/datasets/{dataset_id}/metadata/update` +- Headers: + - `'content-Type: application/json'` + - `'Authorization: Bearer '` +- Body: + - `selector`: `object`, optional + - `document_ids`: `list[string]`, optional + - `metadata_condition`: `object`, optional + - `logic`: `"and"` (default) or `"or"` + - `conditions`: array of `{ "name": string, "comparison_operator": string, "value": string }` + - `comparison_operator` supports: `is`, `not is`, `contains`, `not contains`, `in`, `not in`, `start with`, `end with`, `>`, `<`, `≥`, `≤`, `empty`, `not empty` + - `updates`: `array`, optional + - items: `{ "key": string, "value": any, "match": any (optional) }` + - For lists: replace elements equal to `match` (or `value` when `match` omitted) with `value`. + - For scalars: replace when current value equals `match` (or `value` when `match` omitted). + - `deletes`: `array`, optional + - items: `{ "key": string, "value": any (optional) }` + - For lists: remove elements equal to `value`; if list becomes empty, remove the key. + - For scalars: remove the key when `value` matches or when `value` is omitted. + +##### Request example + +```bash +curl --request POST \ + --url http://{address}/api/v1/datasets/{dataset_id}/metadata/update \ + --header 'Content-Type: application/json' \ + --header 'Authorization: Bearer ' \ + --data '{ + "selector": { + "metadata_condition": { + "logic": "and", + "conditions": [ + {"name": "author", "comparison_operator": "is", "value": "alice"} + ] + } + }, + "updates": [ + {"key": "tags", "match": "foo", "value": "foo_new"} + ], + "deletes": [ + {"key": "obsolete_key"}, + {"key": "author", "value": "alice"} + ] + }' +``` + +##### Response + +Success: + +```json +{ + "code": 0, + "data": { + "updated": 1, + "matched_docs": 2 + } +} +``` + +--- + ### Retrieve chunks **POST** `/api/v1/retrieval` @@ -2117,6 +2234,7 @@ Retrieves chunks from specified datasets. - `"metadata_condition"`: `object` - `"use_kg"`: `boolean` - `"toc_enhance"`: `boolean` + ##### Request example ```bash @@ -2189,7 +2307,7 @@ curl --request POST \ - `"conditions"`: (*Body parameter*), `array` A list of metadata filter conditions. - `"name"`: `string` - The metadata field name to filter by, e.g., `"author"`, `"company"`, `"url"`. Ensure this parameter before use. See [Set metadata](../guides/dataset/set_metadata.md) for details. - - `comparison_operator`: `string` - The comparison operator. Can be one of: + - `comparison_operator`: `string` - The comparison operator. Can be one of: - `"contains"` - `"not contains"` - `"start with"` @@ -2203,7 +2321,6 @@ curl --request POST \ - `"≤"` - `"value"`: `string` - The value to compare. - #### Response Success: @@ -4450,7 +4567,9 @@ Failure: --- ### System + --- + ### Check system health **GET** `/v1/system/healthz` @@ -4519,6 +4638,7 @@ Content-Type: application/json ``` Explanation: + - Each service is reported as "ok" or "nok". - The top-level `status` reflects overall health. - If any service is "nok", detailed error info appears in `_meta`.