From 5d253e0a3445200d37d46c25799c394e9f6f248c Mon Sep 17 00:00:00 2001 From: alulala <61424174+Yue-Lyu123@users.noreply.github.com> Date: Wed, 16 Apr 2025 19:15:35 +0800 Subject: [PATCH] Fix: pymysql.err.InterfaceError: (0, '') during long time streaming chat responses (#6548) (#7057) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Related Issue: https://github.com/infiniflow/ragflow/issues/6548 ### Related PR: https://github.com/infiniflow/ragflow/pull/6861 ### Environment: Commit version: [[48730e0](https://github.com/infiniflow/ragflow/commit/48730e00a864606606a9d0778620d75411488740)] ### Bug Description: Unexpected `pymysql.err.InterfaceError: (0, '') `when using Peewee + PyMySQL + PooledMySQLDatabase after a long-running `chat streamly` operation. This is a common issue with Peewee + PyMySQL + connection pooling: you end up using a connection that was silently closed by the server, but Peewee doesn't realize it's dead. **I found that the error only occurs during longer streaming outputs** and is unrelated to the database connection context, so it's likely because: - The prolonged streaming response caused the database connection to time out - The original database connection might have been disconnected by the server during the streaming process ### Why This Happens This error happens even when using `@DB.connection_context() `after the stream is done. After investigation, I found this is caused by MySQL connection pools that appear to be open but are actually dead (expired due to` wait_timeout`). 1. `@DB.connection_context()` (as a decorator or context manager) pulls a connection from the pool. 2. If this connection was idle and expired on the MySQL server (e.g., due to `wait_timeout`), but not closed in Python, it will still be considered “open” (`DB.is_closed() == False`). 3. The real error will occur only when I execute a SQL command (such as .`get_or_none()`), and PyMySQL tries to send it to the server via a broken socket. ### Changes Made: 1. I implemented manual connection checks before executing SQL: ``` try: DB.execute_sql("SELECT 1") except Exception: print("Connection dead, reconnecting...") DB.close() DB.connect() ``` 2. Delayed the token count update until after the streaming response is completed to ensure the streaming output isn't interrupted by database operations. ``` total_tokens = 0 for txt in chat_streamly(system, history, gen_conf): if isinstance(txt, int): total_tokens = txt ...... break ...... if total_tokens > 0: if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, txt, self.llm_name): logging.error("LLMBundle.chat_streamly can't update token usage for {}/CHAT llm_name: {}, content: {}".format(self.tenant_id, self.llm_name, txt)) ``` --- api/db/services/llm_service.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/api/db/services/llm_service.py b/api/db/services/llm_service.py index ded4f7f3f..c368807f9 100644 --- a/api/db/services/llm_service.py +++ b/api/db/services/llm_service.py @@ -159,6 +159,12 @@ class TenantLLMService(CommonService): @classmethod @DB.connection_context() def increase_usage(cls, tenant_id, llm_type, used_tokens, llm_name=None): + try: + if not DB.is_connection_usable(): + DB.connect() + except Exception: + DB.close() + DB.connect() e, tenant = TenantService.get_by_id(tenant_id) if not e: logging.error(f"Tenant not found: {tenant_id}") @@ -356,21 +362,22 @@ class LLMBundle: ans = "" chat_streamly = self.mdl.chat_streamly - + total_tokens = 0 if self.is_tools and self.mdl.is_tools: chat_streamly = self.mdl.chat_streamly_with_tools for txt in chat_streamly(system, history, gen_conf): if isinstance(txt, int): + total_tokens = txt if self.langfuse: generation.end(output={"output": ans}) - - if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, txt, self.llm_name): - logging.error("LLMBundle.chat_streamly can't update token usage for {}/CHAT llm_name: {}, content: {}".format(self.tenant_id, self.llm_name, txt)) - return ans + break if txt.endswith(""): ans = ans.rstrip("") ans += txt yield ans + if total_tokens > 0: + if not TenantLLMService.increase_usage(self.tenant_id, self.llm_type, txt, self.llm_name): + logging.error("LLMBundle.chat_streamly can't update token usage for {}/CHAT llm_name: {}, content: {}".format(self.tenant_id, self.llm_name, txt))