From 17ea5c1dee1122419fe90b06d0944af3c61d4eb2 Mon Sep 17 00:00:00 2001 From: Yongteng Lei Date: Wed, 5 Nov 2025 19:15:27 +0800 Subject: [PATCH] Fix: MCP cannot handle empty Auth field properly (#11034) ### What problem does this PR solve? Fix MCP cannot handle empty Auth field properly, then result in ```bash 2025-11-05 11:10:41,919 INFO 51209 Negotiated protocol version: 2025-06-18 2025-11-05 11:10:41,920 INFO 51209 client_session initialized successfully 2025-11-05 11:10:41,994 INFO 51209 127.0.0.1 - - [05/Nov/2025 11:10:41] "GET /api/v1/datasets?page=1&page_size=1000&orderby=create_time&desc=True HTTP/1.1" 200 - 2025-11-05 11:10:41,999 INFO 51209 Want to clean up 1 MCP sessions 2025-11-05 11:10:42,000 INFO 51209 1 MCP sessions has been cleaned up. 0 in global context. 2025-11-05 11:10:42,001 INFO 51209 127.0.0.1 - - [05/Nov/2025 11:10:42] "POST /v1/mcp_server/test_mcp HTTP/1.1" 200 - 2025-11-05 11:11:30,441 INFO 51209 Negotiated protocol version: 2025-06-18 2025-11-05 11:11:30,442 INFO 51209 client_session initialized successfully 2025-11-05 11:11:30,520 INFO 51209 127.0.0.1 - - [05/Nov/2025 11:11:30] "GET /api/v1/datasets?page=1&page_size=1000&orderby=create_time&desc=True HTTP/1.1" 200 - 2025-11-05 11:11:30,525 INFO 51209 Want to clean up 1 MCP sessions 2025-11-05 11:11:30,526 INFO 51209 1 MCP sessions has been cleaned up. 0 in global context. 2025-11-05 11:11:30,527 INFO 51209 127.0.0.1 - - [05/Nov/2025 11:11:30] "POST /v1/mcp_server/test_mcp HTTP/1.1" 200 - 2025-11-05 11:11:31,476 INFO 51209 Negotiated protocol version: 2025-06-18 2025-11-05 11:11:31,476 INFO 51209 client_session initialized successfully 2025-11-05 11:11:31,549 INFO 51209 127.0.0.1 - - [05/Nov/2025 11:11:31] "GET /api/v1/datasets?page=1&page_size=1000&orderby=create_time&desc=True HTTP/1.1" 200 - 2025-11-05 11:11:31,552 INFO 51209 Want to clean up 1 MCP sessions 2025-11-05 11:11:31,553 INFO 51209 1 MCP sessions has been cleaned up. 0 in global context. 2025-11-05 11:11:31,554 INFO 51209 127.0.0.1 - - [05/Nov/2025 11:11:31] "POST /v1/mcp_server/test_mcp HTTP/1.1" 200 - 2025-11-05 11:11:51,930 ERROR 51209 unhandled errors in a TaskGroup (1 sub-exception) + Exception Group Traceback (most recent call last): | File "/home/xxxxxxxxx/workspace/ragflow/rag/utils/mcp_tool_call_conn.py", line 86, in _mcp_server_loop | async with streamablehttp_client(url, headers) as (read_stream, write_stream, _): | File "/home/xxxxxxxxx/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/contextlib.py", line 217, in __aexit__ | await self.gen.athrow(typ, value, traceback) | File "/home/xxxxxxxxx/workspace/ragflow/.venv/lib/python3.10/site-packages/mcp/client/streamable_http.py", line 478, in streamablehttp_client | async with anyio.create_task_group() as tg: | File "/home/xxxxxxxxx/workspace/ragflow/.venv/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 781, in __aexit__ | raise BaseExceptionGroup( | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception) +-+---------------- 1 ---------------- | Traceback (most recent call last): | File "/home/xxxxxxxxx/workspace/ragflow/.venv/lib/python3.10/site-packages/mcp/client/streamable_http.py", line 409, in handle_request_async | await self._handle_post_request(ctx) | File "/home/xxxxxxxxx/workspace/ragflow/.venv/lib/python3.10/site-packages/mcp/client/streamable_http.py", line 278, in _handle_post_request | response.raise_for_status() | File "/home/xxxxxxxxx/workspace/ragflow/.venv/lib/python3.10/site-packages/httpx/_models.py", line 829, in raise_for_status | raise HTTPStatusError(message, request=request, response=self) | httpx.HTTPStatusError: Server error '502 Bad Gateway' for url 'http://192.168.1.38:9382/mcp' | For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/502 +------------------------------------ 2025-11-05 11:11:51,942 ERROR 51209 Error fetching tools from MCP server: streamable-http: http://192.168.1.38:9382/mcp Traceback (most recent call last): File "/home/xxxxxxxxx/workspace/ragflow/rag/utils/mcp_tool_call_conn.py", line 168, in get_tools return future.result(timeout=timeout) File "/home/xxxxxxxxx/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/concurrent/futures/_base.py", line 458, in result return self.__get_result() File "/home/xxxxxxxxx/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "<@beartype(rag.utils.mcp_tool_call_conn.MCPToolCallSession._get_tools_from_mcp_server) at 0x7d58f02e2c20>", line 40, in _get_tools_from_mcp_server File "/home/xxxxxxxxx/workspace/ragflow/rag/utils/mcp_tool_call_conn.py", line 160, in _get_tools_from_mcp_server result: ListToolsResult = await self._call_mcp_server("list_tools", timeout=timeout) File "<@beartype(rag.utils.mcp_tool_call_conn.MCPToolCallSession._call_mcp_server) at 0x7d58f02e2b00>", line 63, in _call_mcp_server File "/home/xxxxxxxxx/workspace/ragflow/rag/utils/mcp_tool_call_conn.py", line 139, in _call_mcp_server raise result ValueError: Connection failed (possibly due to auth error). Please check authentication settings first 2025-11-05 11:11:51,943 ERROR 51209 Test MCP error: Connection failed (possibly due to auth error). Please check authentication settings first Traceback (most recent call last): File "/home/xxxxxxxxx/workspace/ragflow/api/apps/mcp_server_app.py", line 429, in test_mcp tools = tool_call_session.get_tools(timeout) File "<@beartype(rag.utils.mcp_tool_call_conn.MCPToolCallSession.get_tools) at 0x7d58f02e2cb0>", line 40, in get_tools File "/home/xxxxxxxxx/workspace/ragflow/rag/utils/mcp_tool_call_conn.py", line 168, in get_tools return future.result(timeout=timeout) File "/home/xxxxxxxxx/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/concurrent/futures/_base.py", line 458, in result return self.__get_result() File "/home/xxxxxxxxx/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "<@beartype(rag.utils.mcp_tool_call_conn.MCPToolCallSession._get_tools_from_mcp_server) at 0x7d58f02e2c20>", line 40, in _get_tools_from_mcp_server File "/home/xxxxxxxxx/workspace/ragflow/rag/utils/mcp_tool_call_conn.py", line 160, in _get_tools_from_mcp_server result: ListToolsResult = await self._call_mcp_server("list_tools", timeout=timeout) File "<@beartype(rag.utils.mcp_tool_call_conn.MCPToolCallSession._call_mcp_server) at 0x7d58f02e2b00>", line 63, in _call_mcp_server File "/home/xxxxxxxxx/workspace/ragflow/rag/utils/mcp_tool_call_conn.py", line 139, in _call_mcp_server raise result ValueError: Connection failed (possibly due to auth error). Please check authentication settings first 2025-11-05 11:11:51,944 INFO 51209 Want to clean up 1 MCP sessions 2025-11-05 11:11:51,945 INFO 51209 1 MCP sessions has been cleaned up. 0 in global context. 2025-11-05 11:11:51,946 INFO 51209 127.0.0.1 - - [05/Nov/2025 11:11:51] "POST /v1/mcp_server/test_mcp HTTP/1.1" 200 - 2025-11-05 11:12:20,484 INFO 51209 Negotiated protocol version: 2025-06-18 2025-11-05 11:12:20,485 INFO 51209 client_session initialized successfully 2025-11-05 11:12:20,570 INFO 51209 127.0.0.1 - - [05/Nov/2025 11:12:20] "GET /api/v1/datasets?page=1&page_size=1000&orderby=create_time&desc=True HTTP/1.1" 200 - 2025-11-05 11:12:20,573 INFO 51209 Want to clean up 1 MCP sessions 2025-11-05 11:12:20,574 INFO 51209 1 MCP sessions has been cleaned up. 0 in global context. 2025-11-05 11:12:20,575 INFO 51209 127.0.0.1 - - [05/Nov/2025 11:12:20] "POST /v1/mcp_server/test_mcp HTTP/1.1" 200 - 2025-11-05 11:15:02,119 INFO 51209 127.0.0.1 - - [05/Nov/2025 11:15:02] "GET /api/v1/datasets?page=1&page_size=1000&orderby=create_time&desc=True HTTP/1.1" 200 - 2025-11-05 11:16:24,967 INFO 51209 127.0.0.1 - - [05/Nov/2025 11:16:24] "GET /api/v1/datasets?page=1&page_size=1000&orderby=create_time&desc=True HTTP/1.1" 200 - 2025-11-05 11:30:24,284 ERROR 51209 Task was destroyed but it is pending! task: :11> wait_for= cb=[_chain_future.._call_set_state() at /home/xxxxxxxxx/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/asyncio/futures.py:392]> 2025-11-05 11:30:24,285 ERROR 51209 Task was destroyed but it is pending! task: wait_for= cb=[_release_waiter()() at /home/xxxxxxxxx/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/asyncio/tasks.py:387]> Exception ignored in: Traceback (most recent call last): File "/home/xxxxxxxxx/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/asyncio/queues.py", line 161, in get getter.cancel() # Just in case getter is not done yet. File "/home/xxxxxxxxx/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/asyncio/base_events.py", line 753, in call_soon self._check_closed() File "/home/xxxxxxxxx/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed raise RuntimeError('Event loop is closed') RuntimeError: Event loop is closed ``` ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- rag/utils/mcp_tool_call_conn.py | 90 +++++++++++++++++++++++++++------ 1 file changed, 74 insertions(+), 16 deletions(-) diff --git a/rag/utils/mcp_tool_call_conn.py b/rag/utils/mcp_tool_call_conn.py index 93d678ae6..2093f7bc8 100644 --- a/rag/utils/mcp_tool_call_conn.py +++ b/rag/utils/mcp_tool_call_conn.py @@ -61,7 +61,8 @@ class MCPToolCallSession(ToolCallSession): for h, v in raw_headers.items(): nh = Template(h).safe_substitute(self._server_variables) nv = Template(v).safe_substitute(self._server_variables) - headers[nh] = nv + if nh.strip() and nv.strip().strip("Bearer"): + headers[nh] = nv if self._mcp_server.server_type == MCPServerType.SSE: # SSE transport @@ -76,6 +77,9 @@ class MCPToolCallSession(ToolCallSession): msg = f"Timeout initializing client_session for server {self._mcp_server.id}" logging.error(msg) await self._process_mcp_tasks(None, msg) + except asyncio.CancelledError: + logging.warning(f"SSE transport MCP session cancelled for server {self._mcp_server.id}") + return except Exception: msg = "Connection failed (possibly due to auth error). Please check authentication settings first" await self._process_mcp_tasks(None, msg) @@ -93,6 +97,9 @@ class MCPToolCallSession(ToolCallSession): msg = f"Timeout initializing client_session for server {self._mcp_server.id}" logging.error(msg) await self._process_mcp_tasks(None, msg) + except asyncio.CancelledError: + logging.warning(f"STREAMABLE_HTTP MCP session cancelled for server {self._mcp_server.id}") + return except Exception as e: logging.exception(e) msg = "Connection failed (possibly due to auth error). Please check authentication settings first" @@ -107,6 +114,8 @@ class MCPToolCallSession(ToolCallSession): mcp_task, arguments, result_queue = await asyncio.wait_for(self._queue.get(), timeout=1) except asyncio.TimeoutError: continue + except asyncio.CancelledError: + break logging.debug(f"Got MCP task {mcp_task} arguments {arguments}") @@ -114,7 +123,10 @@ class MCPToolCallSession(ToolCallSession): if not client_session or error_message: r = ValueError(error_message) - await result_queue.put(r) + try: + await result_queue.put(r) + except asyncio.CancelledError: + break continue try: @@ -126,10 +138,18 @@ class MCPToolCallSession(ToolCallSession): r = ValueError(f"Unknown MCP task {mcp_task}") except Exception as e: r = e + except asyncio.CancelledError: + break - await result_queue.put(r) + try: + await result_queue.put(r) + except asyncio.CancelledError: + break async def _call_mcp_server(self, task_type: MCPTaskType, timeout: float | int = 8, **kwargs) -> Any: + if self._close: + raise ValueError("Session is closed") + results = asyncio.Queue() await self._queue.put((task_type, kwargs, results)) @@ -163,6 +183,9 @@ class MCPToolCallSession(ToolCallSession): raise def get_tools(self, timeout: float | int = 10) -> list[Tool]: + if self._close: + raise ValueError("Session is closed") + future = asyncio.run_coroutine_threadsafe(self._get_tools_from_mcp_server(timeout=timeout), self._event_loop) try: return future.result(timeout=timeout) @@ -176,6 +199,9 @@ class MCPToolCallSession(ToolCallSession): @override def tool_call(self, name: str, arguments: dict[str, Any], timeout: float | int = 10) -> str: + if self._close: + return "Error: Session is closed" + future = asyncio.run_coroutine_threadsafe(self._call_mcp_tool(name, arguments), self._event_loop) try: return future.result(timeout=timeout) @@ -191,8 +217,29 @@ class MCPToolCallSession(ToolCallSession): return self._close = True - self._event_loop.call_soon_threadsafe(self._event_loop.stop) - self._thread_pool.shutdown(wait=True) + + while not self._queue.empty(): + try: + _, _, result_queue = self._queue.get_nowait() + try: + await result_queue.put(asyncio.CancelledError("Session is closing")) + except Exception: + pass + except asyncio.QueueEmpty: + break + except Exception: + break + + try: + self._event_loop.call_soon_threadsafe(self._event_loop.stop) + except Exception: + pass + + try: + self._thread_pool.shutdown(wait=True) + except Exception: + pass + self.__class__._ALL_INSTANCES.discard(self) def close_sync(self, timeout: float | int = 5) -> None: @@ -200,13 +247,16 @@ class MCPToolCallSession(ToolCallSession): logging.warning(f"Event loop already stopped for {self._mcp_server.id}") return - future = asyncio.run_coroutine_threadsafe(self.close(), self._event_loop) try: - future.result(timeout=timeout) - except FuturesTimeoutError: - logging.error(f"Timeout while closing session for server {self._mcp_server.id} (timeout={timeout})") + future = asyncio.run_coroutine_threadsafe(self.close(), self._event_loop) + try: + future.result(timeout=timeout) + except FuturesTimeoutError: + logging.error(f"Timeout while closing session for server {self._mcp_server.id} (timeout={timeout})") + except Exception: + logging.exception(f"Unexpected error during close_sync for {self._mcp_server.id}") except Exception: - logging.exception(f"Unexpected error during close_sync for {self._mcp_server.id}") + logging.exception(f"Exception while scheduling close for server {self._mcp_server.id}") def close_multiple_mcp_toolcall_sessions(sessions: list[MCPToolCallSession]) -> None: @@ -215,16 +265,24 @@ def close_multiple_mcp_toolcall_sessions(sessions: list[MCPToolCallSession]) -> async def _gather_and_stop() -> None: try: await asyncio.gather(*[s.close() for s in sessions if s is not None], return_exceptions=True) + except Exception: + logging.exception("Exception during MCP session cleanup") finally: - loop.call_soon_threadsafe(loop.stop) + try: + loop.call_soon_threadsafe(loop.stop) + except Exception: + pass - loop = asyncio.new_event_loop() - thread = threading.Thread(target=loop.run_forever, daemon=True) - thread.start() + try: + loop = asyncio.new_event_loop() + thread = threading.Thread(target=loop.run_forever, daemon=True) + thread.start() - asyncio.run_coroutine_threadsafe(_gather_and_stop(), loop).result() + asyncio.run_coroutine_threadsafe(_gather_and_stop(), loop).result() + thread.join() + except Exception: + logging.exception("Exception during MCP session cleanup thread management") - thread.join() logging.info(f"{len(sessions)} MCP sessions has been cleaned up. {len(list(MCPToolCallSession._ALL_INSTANCES))} in global context.")