From 237e59532bfae5fd621fb6d81bc485464e4cde65 Mon Sep 17 00:00:00 2001 From: Yongteng Lei Date: Mon, 14 Jul 2025 14:36:56 +0800 Subject: [PATCH] Feat: refine create and list operations for MCP dashboard (#8823) ### What problem does this PR solve? Refine MCP dashboard create and list operations. ### Type of change - [x] Refactoring --- api/apps/mcp_server_app.py | 115 ++++++++++++++++++++++++++++--------- api/utils/mcp_server.py | 34 +++++++++++ 2 files changed, 122 insertions(+), 27 deletions(-) create mode 100644 api/utils/mcp_server.py diff --git a/api/apps/mcp_server_app.py b/api/apps/mcp_server_app.py index c01c63777..faa53328e 100644 --- a/api/apps/mcp_server_app.py +++ b/api/apps/mcp_server_app.py @@ -8,6 +8,7 @@ from api.db.services.user_service import TenantService from api.settings import RetCode from api.utils import get_uuid from api.utils.api_utils import get_data_error_result, get_json_result, server_error_response, validate_request +from api.utils.mcp_server import get_mcp_tools from api.utils.web_utils import get_float, safe_json_parse from rag.utils.mcp_tool_call_conn import MCPToolCallSession, close_multiple_mcp_toolcall_sessions @@ -27,9 +28,13 @@ def list_mcp() -> Response: req = request.get_json() mcp_ids = req.get("mcp_ids", []) try: - servers = MCPServerService.get_servers(current_user.id, mcp_ids, page_number, items_per_page, orderby, desc, keywords) or [] + servers = MCPServerService.get_servers(current_user.id, mcp_ids, 0, 0, orderby, desc, keywords) or [] + total = len(servers) - return get_json_result(data={"mcp_servers": servers, "total": len(servers)}) + if page_number and items_per_page: + servers = servers[(page_number - 1) * items_per_page : page_number * items_per_page] + + return get_json_result(data={"mcp_servers": servers, "total": total}) except Exception as e: return server_error_response(e) @@ -63,22 +68,43 @@ def create() -> Response: if not server_name or len(server_name.encode("utf-8")) > 255: return get_data_error_result(message=f"Invaild MCP name or length is {len(server_name)} which is large than 255.") - req["headers"] = safe_json_parse(req.get("headers", {})) - req["variables"] = safe_json_parse(req.get("variables", {})) + e, _ = MCPServerService.get_by_name_and_tenant(name=server_name, tenant_id=current_user.id) + if e: + return get_data_error_result(message="Duplicated MCP server name.") + + url = req.get("url", "") + if not url: + return get_data_error_result(message="Invaild url.") + + headers = safe_json_parse(req.get("headers", {})) + req["headers"] = headers + variables = safe_json_parse(req.get("variables", {})) + variables.pop("tools", None) + + timeout = get_float(req, "timeout", 10) try: req["id"] = get_uuid() req["tenant_id"] = current_user.id e, _ = TenantService.get_by_id(current_user.id) - if not e: return get_data_error_result(message="Tenant not found.") - if not MCPServerService.insert(**req): - return get_data_error_result() + mcp_server = MCPServer(id=server_name, name=server_name, url=url, server_type=server_type, variables=variables, headers=headers) + server_tools, err_message = get_mcp_tools([mcp_server], timeout) + if err_message: + return get_data_error_result(err_message) - return get_json_result(data={"id": req["id"]}) + tools = server_tools[server_name] + tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool} + variables["tools"] = tools + req["variables"] = variables + + if not MCPServerService.insert(**req): + return get_data_error_result("Failed to create MCP server.") + + return get_json_result(data=req) except Exception as e: return server_error_response(e) @@ -89,26 +115,44 @@ def create() -> Response: def update() -> Response: req = request.get_json() - server_type = req.get("server_type", "") - if server_type and server_type not in VALID_MCP_SERVER_TYPES: - return get_data_error_result(message="Unsupported MCP server type.") - server_name = req.get("name", "") - if server_name and len(server_name.encode("utf-8")) > 255: - return get_data_error_result(message=f"Invaild MCP name or length is {len(server_name)} which is large than 255.") - mcp_id = req.get("mcp_id", "") e, mcp_server = MCPServerService.get_by_id(mcp_id) if not e or mcp_server.tenant_id != current_user.id: return get_data_error_result(message=f"Cannot find MCP server {mcp_id} for user {current_user.id}") - req["headers"] = safe_json_parse(req.get("headers", mcp_server.headers)) - req["variables"] = safe_json_parse(req.get("variables", mcp_server.variables)) + server_type = req.get("server_type", mcp_server.server_type) + if server_type and server_type not in VALID_MCP_SERVER_TYPES: + return get_data_error_result(message="Unsupported MCP server type.") + server_name = req.get("name", mcp_server.name) + if server_name and len(server_name.encode("utf-8")) > 255: + return get_data_error_result(message=f"Invaild MCP name or length is {len(server_name)} which is large than 255.") + url = req.get("url", mcp_server.url) + if not url: + return get_data_error_result(message="Invaild url.") + + headers = safe_json_parse(req.get("headers", mcp_server.headers)) + req["headers"] = headers + + variables = safe_json_parse(req.get("variables", mcp_server.variables)) + variables.pop("tools", None) + + timeout = get_float(req, "timeout", 10) try: req["tenant_id"] = current_user.id req.pop("mcp_id", None) req["id"] = mcp_id + mcp_server = MCPServer(id=server_name, name=server_name, url=url, server_type=server_type, variables=variables, headers=headers) + server_tools, err_message = get_mcp_tools([mcp_server], timeout) + if err_message: + return get_data_error_result(err_message) + + tools = server_tools[server_name] + tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool} + variables["tools"] = tools + req["variables"] = variables + if not MCPServerService.filter_update([MCPServer.id == mcp_id, MCPServer.tenant_id == current_user.id], req): return get_data_error_result(message="Failed to updated MCP server.") @@ -145,17 +189,22 @@ def rm() -> Response: def import_multiple() -> Response: req = request.get_json() servers = req.get("mcpServers", {}) - if not servers: return get_data_error_result(message="No MCP servers provided.") + timeout = get_float(req, "timeout", 10) + results = [] try: for server_name, config in servers.items(): - if not all(key in config for key in ["type", "url"]): + if not all(key in config for key in {"type", "url"}): results.append({"server": server_name, "success": False, "message": "Missing required fields (type or url)"}) continue + if not server_name or len(server_name.encode("utf-8")) > 255: + results.append({"server": server_name, "success": False, "message": f"Invaild MCP name or length is {len(server_name)} which is large than 255."}) + continue + base_name = server_name new_name = base_name counter = 0 @@ -173,14 +222,25 @@ def import_multiple() -> Response: "name": new_name, "url": config["url"], "server_type": config["type"], - "variables": {"authorization_token": config.get("authorization_token", ""), "tool_configuration": config.get("tool_configuration", {})}, + "variables": {"authorization_token": config.get("authorization_token", "")}, } + headers = {"authorization_token": config["authorization_token"]} if "authorization_token" in config else {} + variables = {k: v for k, v in config.items() if k not in {"type", "url", "headers"}} + mcp_server = MCPServer(id=new_name, name=new_name, url=config["url"], server_type=config["type"], variables=variables, headers=headers) + server_tools, err_message = get_mcp_tools([mcp_server], timeout) + if err_message: + results.append({"server": base_name, "success": False, "message": err_message}) + continue + + tools = server_tools[new_name] + tools = {tool["name"]: tool for tool in tools if isinstance(tool, dict) and "name" in tool} + create_data["variables"]["tools"] = tools + if MCPServerService.insert(**create_data): result = {"server": server_name, "success": True, "action": "created", "id": create_data["id"], "new_name": new_name} if new_name != base_name: - result["message"] = f"Renamed from '{base_name}' to avoid duplication" - + result["message"] = f"Renamed from '{base_name}' to '{new_name}' avoid duplication" results.append(result) else: results.append({"server": server_name, "success": False, "message": "Failed to create MCP server."}) @@ -214,7 +274,7 @@ def export_multiple() -> Response: "url": mcp_server.url, "name": mcp_server.name, "authorization_token": mcp_server.variables.get("authorization_token", ""), - "tool_configuration": mcp_server.variables.get("tool_configuration", {}), + "tools": mcp_server.variables.get("tools", {}), } return get_json_result(data={"mcpServers": exported_servers}) @@ -256,16 +316,17 @@ def list_tools() -> Response: results[server_key] = [] for tool in tools: tool_dict = tool.model_dump() - cached_tool = cached_tools.get(tool_dict["name"]) + cached_tool = cached_tools.get(tool_dict["name"], {}) - tool_dict["enabled"] = cached_tool.get("enabled") if cached_tool and "enabled" in cached_tool else True + tool_dict["enabled"] = cached_tool.get("enabled", True) results[server_key].append(tool_dict) - # PERF: blocking call to close sessions — consider moving to background thread or task queue - close_multiple_mcp_toolcall_sessions(tool_call_sessions) return get_json_result(data=results) except Exception as e: return server_error_response(e) + finally: + # PERF: blocking call to close sessions — consider moving to background thread or task queue + close_multiple_mcp_toolcall_sessions(tool_call_sessions) @manager.route("/test_tool", methods=["POST"]) # noqa: F821 diff --git a/api/utils/mcp_server.py b/api/utils/mcp_server.py new file mode 100644 index 000000000..83b168711 --- /dev/null +++ b/api/utils/mcp_server.py @@ -0,0 +1,34 @@ +from api.db.db_models import MCPServer +from rag.utils.mcp_tool_call_conn import MCPToolCallSession, close_multiple_mcp_toolcall_sessions + + +def get_mcp_tools(mcp_servers: list[MCPServer], timeout: float | int = 10) -> tuple[dict, str]: + results = {} + tool_call_sessions = [] + try: + for mcp_server in mcp_servers: + server_key = mcp_server.id + + cached_tools = mcp_server.variables.get("tools", {}) + + tool_call_session = MCPToolCallSession(mcp_server, mcp_server.variables) + tool_call_sessions.append(tool_call_session) + + try: + tools = tool_call_session.get_tools(timeout) + except Exception: + tools = [] + + results[server_key] = [] + for tool in tools: + tool_dict = tool.model_dump() + cached_tool = cached_tools.get(tool_dict["name"], {}) + + tool_dict["enabled"] = cached_tool.get("enabled", True) + results[server_key].append(tool_dict) + + # PERF: blocking call to close sessions — consider moving to background thread or task queue + close_multiple_mcp_toolcall_sessions(tool_call_sessions) + return results, "" + except Exception as e: + return {}, str(e)