From 1e6bda735a2b924674b040e56ea651f8b0d8a96f Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Mon, 7 Jul 2025 09:22:25 +0800 Subject: [PATCH] Fix: add ES re-connect once request timeout. (#8678) ### What problem does this PR solve? #8669 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- rag/utils/es_conn.py | 81 ++++++++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 33 deletions(-) diff --git a/rag/utils/es_conn.py b/rag/utils/es_conn.py index c66c7edff..47d4f1a4f 100644 --- a/rag/utils/es_conn.py +++ b/rag/utils/es_conn.py @@ -44,19 +44,12 @@ class ESConnection(DocStoreConnection): logger.info(f"Use Elasticsearch {settings.ES['hosts']} as the doc engine.") for _ in range(ATTEMPT_TIME): try: - self.es = Elasticsearch( - settings.ES["hosts"].split(","), - basic_auth=(settings.ES["username"], settings.ES[ - "password"]) if "username" in settings.ES and "password" in settings.ES else None, - verify_certs=False, - timeout=600 - ) - if self.es: - self.info = self.es.info() + if self._connect(): break except Exception as e: logger.warning(f"{str(e)}. Waiting Elasticsearch {settings.ES['hosts']} to be healthy.") time.sleep(5) + if not self.es.ping(): msg = f"Elasticsearch {settings.ES['hosts']} is unhealthy in 120s." logger.error(msg) @@ -75,6 +68,19 @@ class ESConnection(DocStoreConnection): self.mapping = json.load(open(fp_mapping, "r")) logger.info(f"Elasticsearch {settings.ES['hosts']} is healthy.") + def _connect(self): + self.es = Elasticsearch( + settings.ES["hosts"].split(","), + basic_auth=(settings.ES["username"], settings.ES[ + "password"]) if "username" in settings.ES and "password" in settings.ES else None, + verify_certs=False, + timeout=600 + ) + if self.es: + self.info = self.es.info() + return True + return False + """ Database operations """ @@ -118,10 +124,13 @@ class ESConnection(DocStoreConnection): for i in range(ATTEMPT_TIME): try: return s.exists() + except ConnectionTimeout: + logger.exception("ES request timeout") + time.sleep(3) + self._connect() + continue except Exception as e: - logger.exception("ESConnection.indexExist got exception") - if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0: - continue + logger.exception(e) break return False @@ -249,11 +258,14 @@ class ESConnection(DocStoreConnection): raise Exception("Es Timeout.") logger.debug(f"ESConnection.search {str(indexNames)} res: " + str(res)) return res + except ConnectionTimeout: + logger.exception("ES request timeout") + self._connect() + continue except Exception as e: - logger.exception(f"ESConnection.search {str(indexNames)} query: " + str(q)) - if str(e).find("Timeout") > 0: - continue + logger.exception(f"ESConnection.search {str(indexNames)} query: " + str(q) + str(e)) raise e + logger.error(f"ESConnection.search timeout for {ATTEMPT_TIME} times!") raise Exception("ESConnection.search timeout.") @@ -271,8 +283,6 @@ class ESConnection(DocStoreConnection): return None except Exception as e: logger.exception(f"ESConnection.get({chunkId}) got exception") - if str(e).find("Timeout") > 0: - continue raise e logger.error(f"ESConnection.get timeout for {ATTEMPT_TIME} times!") raise Exception("ESConnection.get timeout.") @@ -304,14 +314,15 @@ class ESConnection(DocStoreConnection): if action in item and "error" in item[action]: res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"])) return res + except ConnectionTimeout: + logger.exception("ES request timeout") + time.sleep(3) + self._connect() + continue except Exception as e: res.append(str(e)) logger.warning("ESConnection.insert got exception: " + str(e)) - res = [] - if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE): - res.append(str(e)) - time.sleep(3) - continue + return res def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool: @@ -334,9 +345,7 @@ class ESConnection(DocStoreConnection): return True except Exception as e: logger.exception( - f"ESConnection.update(index={indexName}, id={chunkId}, doc={json.dumps(condition, ensure_ascii=False)}) got exception") - if re.search(r"(timeout|connection)", str(e).lower()): - continue + f"ESConnection.update(index={indexName}, id={chunkId}, doc={json.dumps(condition, ensure_ascii=False)}) got exception: "+str(e)) break return False @@ -398,10 +407,13 @@ class ESConnection(DocStoreConnection): try: _ = ubq.execute() return True + except ConnectionTimeout: + logger.exception("ES request timeout") + time.sleep(3) + self._connect() + continue except Exception as e: logger.error("ESConnection.update got exception: " + str(e) + "\n".join(scripts)) - if re.search(r"(timeout|connection|conflict)", str(e).lower()): - continue break return False @@ -438,17 +450,18 @@ class ESConnection(DocStoreConnection): logger.debug("ESConnection.delete query: " + json.dumps(qry.to_dict())) for _ in range(ATTEMPT_TIME): try: - #print(Search().query(qry).to_dict(), flush=True) res = self.es.delete_by_query( index=indexName, body=Search().query(qry).to_dict(), refresh=True) return res["deleted"] + except ConnectionTimeout: + logger.exception("ES request timeout") + time.sleep(3) + self._connect() + continue except Exception as e: logger.warning("ESConnection.delete got exception: " + str(e)) - if re.search(r"(timeout|connection)", str(e).lower()): - time.sleep(3) - continue if re.search(r"(not_found)", str(e), re.IGNORECASE): return 0 return 0 @@ -557,10 +570,12 @@ class ESConnection(DocStoreConnection): request_timeout="2s") return res except ConnectionTimeout: - logger.exception("ESConnection.sql timeout") + logger.exception("ES request timeout") + time.sleep(3) + self._connect() continue except Exception: logger.exception("ESConnection.sql got exception") - return None + break logger.error(f"ESConnection.sql timeout for {ATTEMPT_TIME} times!") return None