From 968ffc7ef3f1e60eaf4eec8f2386ec065746479d Mon Sep 17 00:00:00 2001 From: Liu An Date: Mon, 9 Jun 2025 13:29:56 +0800 Subject: [PATCH] Refa: dataset operations to simplify error handling (#8132) ### What problem does this PR solve? - Consolidate database operations within single try-except blocks in the methods ### Type of change - [x] Refactoring --- api/apps/sdk/dataset.py | 174 ++++++++++++++++------------------------ 1 file changed, 68 insertions(+), 106 deletions(-) diff --git a/api/apps/sdk/dataset.py b/api/apps/sdk/dataset.py index d49a2c07c..ea6540155 100644 --- a/api/apps/sdk/dataset.py +++ b/api/apps/sdk/dataset.py @@ -124,48 +124,36 @@ def create(tenant_id): try: if KnowledgebaseService.get_or_none(name=req["name"], tenant_id=tenant_id, status=StatusEnum.VALID.value): return get_error_operating_result(message=f"Dataset name '{req['name']}' already exists") - except OperationalError as e: - logging.exception(e) - return get_error_data_result(message="Database operation failed") - req["parser_config"] = get_parser_config(req["parser_id"], req["parser_config"]) - req["id"] = get_uuid() - req["tenant_id"] = tenant_id - req["created_by"] = tenant_id + req["parser_config"] = get_parser_config(req["parser_id"], req["parser_config"]) + req["id"] = get_uuid() + req["tenant_id"] = tenant_id + req["created_by"] = tenant_id - try: ok, t = TenantService.get_by_id(tenant_id) if not ok: return get_error_permission_result(message="Tenant not found") - except OperationalError as e: - logging.exception(e) - return get_error_data_result(message="Database operation failed") - if not req.get("embd_id"): - req["embd_id"] = t.embd_id - else: - ok, err = verify_embedding_availability(req["embd_id"], tenant_id) - if not ok: - return err + if not req.get("embd_id"): + req["embd_id"] = t.embd_id + else: + ok, err = verify_embedding_availability(req["embd_id"], tenant_id) + if not ok: + return err - try: if not KnowledgebaseService.save(**req): return get_error_data_result(message="Create dataset error.(Database error)") - except OperationalError as e: - logging.exception(e) - return get_error_data_result(message="Database operation failed") - try: ok, k = KnowledgebaseService.get_by_id(req["id"]) if not ok: return get_error_data_result(message="Dataset created failed") + + response_data = remap_dictionary_keys(k.to_dict()) + return get_result(data=response_data) except OperationalError as e: logging.exception(e) return get_error_data_result(message="Database operation failed") - response_data = remap_dictionary_keys(k.to_dict()) - return get_result(data=response_data) - @manager.route("/datasets", methods=["DELETE"]) # noqa: F821 @token_required @@ -211,34 +199,27 @@ def delete(tenant_id): if err is not None: return get_error_argument_result(err) - kb_id_instance_pairs = [] - if req["ids"] is None: - try: + try: + kb_id_instance_pairs = [] + if req["ids"] is None: kbs = KnowledgebaseService.query(tenant_id=tenant_id) for kb in kbs: kb_id_instance_pairs.append((kb.id, kb)) - except OperationalError as e: - logging.exception(e) - return get_error_data_result(message="Database operation failed") - else: - error_kb_ids = [] - for kb_id in req["ids"]: - try: + + else: + error_kb_ids = [] + for kb_id in req["ids"]: kb = KnowledgebaseService.get_or_none(id=kb_id, tenant_id=tenant_id) if kb is None: error_kb_ids.append(kb_id) continue kb_id_instance_pairs.append((kb_id, kb)) - except OperationalError as e: - logging.exception(e) - return get_error_data_result(message="Database operation failed") - if len(error_kb_ids) > 0: - return get_error_permission_result(message=f"""User '{tenant_id}' lacks permission for datasets: '{", ".join(error_kb_ids)}'""") + if len(error_kb_ids) > 0: + return get_error_permission_result(message=f"""User '{tenant_id}' lacks permission for datasets: '{", ".join(error_kb_ids)}'""") - errors = [] - success_count = 0 - for kb_id, kb in kb_id_instance_pairs: - try: + errors = [] + success_count = 0 + for kb_id, kb in kb_id_instance_pairs: for doc in DocumentService.query(kb_id=kb_id): if not DocumentService.remove_document(doc, tenant_id): errors.append(f"Remove document '{doc.id}' error for dataset '{kb_id}'") @@ -256,18 +237,18 @@ def delete(tenant_id): errors.append(f"Delete dataset error for {kb_id}") continue success_count += 1 - except OperationalError as e: - logging.exception(e) - return get_error_data_result(message="Database operation failed") - if not errors: - return get_result() + if not errors: + return get_result() - error_message = f"Successfully deleted {success_count} datasets, {len(errors)} failed. Details: {'; '.join(errors)[:128]}..." - if success_count == 0: - return get_error_data_result(message=error_message) + error_message = f"Successfully deleted {success_count} datasets, {len(errors)} failed. Details: {'; '.join(errors)[:128]}..." + if success_count == 0: + return get_error_data_result(message=error_message) - return get_result(data={"success_count": success_count, "errors": errors[:5]}, message=error_message) + return get_result(data={"success_count": success_count, "errors": errors[:5]}, message=error_message) + except OperationalError as e: + logging.exception(e) + return get_error_data_result(message="Database operation failed") @manager.route("/datasets/", methods=["PUT"]) # noqa: F821 @@ -349,53 +330,41 @@ def update(tenant_id, dataset_id): kb = KnowledgebaseService.get_or_none(id=dataset_id, tenant_id=tenant_id) if kb is None: return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{dataset_id}'") - except OperationalError as e: - logging.exception(e) - return get_error_data_result(message="Database operation failed") - if req.get("parser_config"): - req["parser_config"] = deep_merge(kb.parser_config, req["parser_config"]) + if req.get("parser_config"): + req["parser_config"] = deep_merge(kb.parser_config, req["parser_config"]) - if (chunk_method := req.get("parser_id")) and chunk_method != kb.parser_id: - if not req.get("parser_config"): - req["parser_config"] = get_parser_config(chunk_method, None) - elif "parser_config" in req and not req["parser_config"]: - del req["parser_config"] + if (chunk_method := req.get("parser_id")) and chunk_method != kb.parser_id: + if not req.get("parser_config"): + req["parser_config"] = get_parser_config(chunk_method, None) + elif "parser_config" in req and not req["parser_config"]: + del req["parser_config"] - if "name" in req and req["name"].lower() != kb.name.lower(): - try: + if "name" in req and req["name"].lower() != kb.name.lower(): exists = KnowledgebaseService.get_or_none(name=req["name"], tenant_id=tenant_id, status=StatusEnum.VALID.value) if exists: return get_error_data_result(message=f"Dataset name '{req['name']}' already exists") - except OperationalError as e: - logging.exception(e) - return get_error_data_result(message="Database operation failed") - if "embd_id" in req: - if kb.chunk_num != 0 and req["embd_id"] != kb.embd_id: - return get_error_data_result(message=f"When chunk_num ({kb.chunk_num}) > 0, embedding_model must remain {kb.embd_id}") - ok, err = verify_embedding_availability(req["embd_id"], tenant_id) - if not ok: - return err + if "embd_id" in req: + if kb.chunk_num != 0 and req["embd_id"] != kb.embd_id: + return get_error_data_result(message=f"When chunk_num ({kb.chunk_num}) > 0, embedding_model must remain {kb.embd_id}") + ok, err = verify_embedding_availability(req["embd_id"], tenant_id) + if not ok: + return err - try: if not KnowledgebaseService.update_by_id(kb.id, req): return get_error_data_result(message="Update dataset error.(Database error)") - except OperationalError as e: - logging.exception(e) - return get_error_data_result(message="Database operation failed") - try: ok, k = KnowledgebaseService.get_by_id(kb.id) if not ok: return get_error_data_result(message="Dataset created failed") + + response_data = remap_dictionary_keys(k.to_dict()) + return get_result(data=response_data) except OperationalError as e: logging.exception(e) return get_error_data_result(message="Database operation failed") - response_data = remap_dictionary_keys(k.to_dict()) - return get_result(data=response_data) - @manager.route("/datasets", methods=["GET"]) # noqa: F821 @token_required @@ -459,26 +428,19 @@ def list_datasets(tenant_id): if err is not None: return get_error_argument_result(err) - kb_id = request.args.get("id") - name = args.get("name") - if kb_id: - try: - kbs = KnowledgebaseService.get_kb_by_id(kb_id, tenant_id) - except OperationalError as e: - logging.exception(e) - return get_error_data_result(message="Database operation failed") - if not kbs: - return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{kb_id}'") - if name: - try: - kbs = KnowledgebaseService.get_kb_by_name(name, tenant_id) - except OperationalError as e: - logging.exception(e) - return get_error_data_result(message="Database operation failed") - if not kbs: - return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{name}'") - try: + kb_id = request.args.get("id") + name = args.get("name") + if kb_id: + kbs = KnowledgebaseService.get_kb_by_id(kb_id, tenant_id) + + if not kbs: + return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{kb_id}'") + if name: + kbs = KnowledgebaseService.get_kb_by_name(name, tenant_id) + if not kbs: + return get_error_permission_result(message=f"User '{tenant_id}' lacks permission for dataset '{name}'") + tenants = TenantService.get_joined_tenants_by_user_id(tenant_id) kbs = KnowledgebaseService.get_list( [m["tenant_id"] for m in tenants], @@ -490,11 +452,11 @@ def list_datasets(tenant_id): kb_id, name, ) + + response_data_list = [] + for kb in kbs: + response_data_list.append(remap_dictionary_keys(kb)) + return get_result(data=response_data_list) except OperationalError as e: logging.exception(e) return get_error_data_result(message="Database operation failed") - - response_data_list = [] - for kb in kbs: - response_data_list.append(remap_dictionary_keys(kb)) - return get_result(data=response_data_list)