Fix: add ES re-connect once request timeout. (#8678)

### What problem does this PR solve?

#8669

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Kevin Hu
2025-07-07 09:22:25 +08:00
committed by GitHub
parent ebf827a956
commit 1e6bda735a

View File

@ -44,19 +44,12 @@ class ESConnection(DocStoreConnection):
logger.info(f"Use Elasticsearch {settings.ES['hosts']} as the doc engine.") logger.info(f"Use Elasticsearch {settings.ES['hosts']} as the doc engine.")
for _ in range(ATTEMPT_TIME): for _ in range(ATTEMPT_TIME):
try: try:
self.es = Elasticsearch( if self._connect():
settings.ES["hosts"].split(","),
basic_auth=(settings.ES["username"], settings.ES[
"password"]) if "username" in settings.ES and "password" in settings.ES else None,
verify_certs=False,
timeout=600
)
if self.es:
self.info = self.es.info()
break break
except Exception as e: except Exception as e:
logger.warning(f"{str(e)}. Waiting Elasticsearch {settings.ES['hosts']} to be healthy.") logger.warning(f"{str(e)}. Waiting Elasticsearch {settings.ES['hosts']} to be healthy.")
time.sleep(5) time.sleep(5)
if not self.es.ping(): if not self.es.ping():
msg = f"Elasticsearch {settings.ES['hosts']} is unhealthy in 120s." msg = f"Elasticsearch {settings.ES['hosts']} is unhealthy in 120s."
logger.error(msg) logger.error(msg)
@ -75,6 +68,19 @@ class ESConnection(DocStoreConnection):
self.mapping = json.load(open(fp_mapping, "r")) self.mapping = json.load(open(fp_mapping, "r"))
logger.info(f"Elasticsearch {settings.ES['hosts']} is healthy.") logger.info(f"Elasticsearch {settings.ES['hosts']} is healthy.")
def _connect(self):
self.es = Elasticsearch(
settings.ES["hosts"].split(","),
basic_auth=(settings.ES["username"], settings.ES[
"password"]) if "username" in settings.ES and "password" in settings.ES else None,
verify_certs=False,
timeout=600
)
if self.es:
self.info = self.es.info()
return True
return False
""" """
Database operations Database operations
""" """
@ -118,10 +124,13 @@ class ESConnection(DocStoreConnection):
for i in range(ATTEMPT_TIME): for i in range(ATTEMPT_TIME):
try: try:
return s.exists() return s.exists()
except ConnectionTimeout:
logger.exception("ES request timeout")
time.sleep(3)
self._connect()
continue
except Exception as e: except Exception as e:
logger.exception("ESConnection.indexExist got exception") logger.exception(e)
if str(e).find("Timeout") > 0 or str(e).find("Conflict") > 0:
continue
break break
return False return False
@ -249,11 +258,14 @@ class ESConnection(DocStoreConnection):
raise Exception("Es Timeout.") raise Exception("Es Timeout.")
logger.debug(f"ESConnection.search {str(indexNames)} res: " + str(res)) logger.debug(f"ESConnection.search {str(indexNames)} res: " + str(res))
return res return res
except ConnectionTimeout:
logger.exception("ES request timeout")
self._connect()
continue
except Exception as e: except Exception as e:
logger.exception(f"ESConnection.search {str(indexNames)} query: " + str(q)) logger.exception(f"ESConnection.search {str(indexNames)} query: " + str(q) + str(e))
if str(e).find("Timeout") > 0:
continue
raise e raise e
logger.error(f"ESConnection.search timeout for {ATTEMPT_TIME} times!") logger.error(f"ESConnection.search timeout for {ATTEMPT_TIME} times!")
raise Exception("ESConnection.search timeout.") raise Exception("ESConnection.search timeout.")
@ -271,8 +283,6 @@ class ESConnection(DocStoreConnection):
return None return None
except Exception as e: except Exception as e:
logger.exception(f"ESConnection.get({chunkId}) got exception") logger.exception(f"ESConnection.get({chunkId}) got exception")
if str(e).find("Timeout") > 0:
continue
raise e raise e
logger.error(f"ESConnection.get timeout for {ATTEMPT_TIME} times!") logger.error(f"ESConnection.get timeout for {ATTEMPT_TIME} times!")
raise Exception("ESConnection.get timeout.") raise Exception("ESConnection.get timeout.")
@ -304,14 +314,15 @@ class ESConnection(DocStoreConnection):
if action in item and "error" in item[action]: if action in item and "error" in item[action]:
res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"])) res.append(str(item[action]["_id"]) + ":" + str(item[action]["error"]))
return res return res
except ConnectionTimeout:
logger.exception("ES request timeout")
time.sleep(3)
self._connect()
continue
except Exception as e: except Exception as e:
res.append(str(e)) res.append(str(e))
logger.warning("ESConnection.insert got exception: " + str(e)) logger.warning("ESConnection.insert got exception: " + str(e))
res = []
if re.search(r"(Timeout|time out)", str(e), re.IGNORECASE):
res.append(str(e))
time.sleep(3)
continue
return res return res
def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool: def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool:
@ -334,9 +345,7 @@ class ESConnection(DocStoreConnection):
return True return True
except Exception as e: except Exception as e:
logger.exception( logger.exception(
f"ESConnection.update(index={indexName}, id={chunkId}, doc={json.dumps(condition, ensure_ascii=False)}) got exception") f"ESConnection.update(index={indexName}, id={chunkId}, doc={json.dumps(condition, ensure_ascii=False)}) got exception: "+str(e))
if re.search(r"(timeout|connection)", str(e).lower()):
continue
break break
return False return False
@ -398,10 +407,13 @@ class ESConnection(DocStoreConnection):
try: try:
_ = ubq.execute() _ = ubq.execute()
return True return True
except ConnectionTimeout:
logger.exception("ES request timeout")
time.sleep(3)
self._connect()
continue
except Exception as e: except Exception as e:
logger.error("ESConnection.update got exception: " + str(e) + "\n".join(scripts)) logger.error("ESConnection.update got exception: " + str(e) + "\n".join(scripts))
if re.search(r"(timeout|connection|conflict)", str(e).lower()):
continue
break break
return False return False
@ -438,17 +450,18 @@ class ESConnection(DocStoreConnection):
logger.debug("ESConnection.delete query: " + json.dumps(qry.to_dict())) logger.debug("ESConnection.delete query: " + json.dumps(qry.to_dict()))
for _ in range(ATTEMPT_TIME): for _ in range(ATTEMPT_TIME):
try: try:
#print(Search().query(qry).to_dict(), flush=True)
res = self.es.delete_by_query( res = self.es.delete_by_query(
index=indexName, index=indexName,
body=Search().query(qry).to_dict(), body=Search().query(qry).to_dict(),
refresh=True) refresh=True)
return res["deleted"] return res["deleted"]
except ConnectionTimeout:
logger.exception("ES request timeout")
time.sleep(3)
self._connect()
continue
except Exception as e: except Exception as e:
logger.warning("ESConnection.delete got exception: " + str(e)) logger.warning("ESConnection.delete got exception: " + str(e))
if re.search(r"(timeout|connection)", str(e).lower()):
time.sleep(3)
continue
if re.search(r"(not_found)", str(e), re.IGNORECASE): if re.search(r"(not_found)", str(e), re.IGNORECASE):
return 0 return 0
return 0 return 0
@ -557,10 +570,12 @@ class ESConnection(DocStoreConnection):
request_timeout="2s") request_timeout="2s")
return res return res
except ConnectionTimeout: except ConnectionTimeout:
logger.exception("ESConnection.sql timeout") logger.exception("ES request timeout")
time.sleep(3)
self._connect()
continue continue
except Exception: except Exception:
logger.exception("ESConnection.sql got exception") logger.exception("ESConnection.sql got exception")
return None break
logger.error(f"ESConnection.sql timeout for {ATTEMPT_TIME} times!") logger.error(f"ESConnection.sql timeout for {ATTEMPT_TIME} times!")
return None return None