Fix: optimize init memory_size (#12254)

### What problem does this PR solve?

Handle 404 exception when init memory size from es.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

---------

Co-authored-by: Liu An <asiro@qq.com>
This commit is contained in:
Lynn
2025-12-26 21:18:44 +08:00
committed by GitHub
parent a1ed4430ce
commit 3364cf96cf
3 changed files with 13 additions and 13 deletions

View File

@ -223,13 +223,8 @@ def init_memory_size_cache():
if not memory_list: if not memory_list:
logging.info("No memory found, no need to init memory size.") logging.info("No memory found, no need to init memory size.")
else: else:
memory_size_map = MessageService.calculate_memory_size( for m in memory_list:
memory_ids=[m.id for m in memory_list], get_memory_size_cache(m.id, m.tenant_id)
uid_list=[m.tenant_id for m in memory_list],
)
for memory in memory_list:
memory_size = memory_size_map.get(memory.id, 0)
set_memory_size_cache(memory.id, memory_size)
logging.info("Memory size cache init done.") logging.info("Memory size cache init done.")

View File

@ -198,7 +198,7 @@ class MessageService:
message_list = settings.msgStoreConn.get_fields(res, select_fields) message_list = settings.msgStoreConn.get_fields(res, select_fields)
current_size = 0 current_size = 0
ids_to_remove = [] ids_to_remove = []
for message in message_list: for message in message_list.values():
if current_size < size_to_delete: if current_size < size_to_delete:
current_size += cls.calculate_message_size(message) current_size += cls.calculate_message_size(message)
ids_to_remove.append(message["message_id"]) ids_to_remove.append(message["message_id"])
@ -210,7 +210,7 @@ class MessageService:
order_by = OrderByExpr() order_by = OrderByExpr()
order_by.asc("valid_at") order_by.asc("valid_at")
res = settings.msgStoreConn.search( res = settings.msgStoreConn.search(
select_fields=["memory_id", "content", "content_embed"], select_fields=select_fields,
highlight_fields=[], highlight_fields=[],
condition={}, condition={},
match_expressions=[], match_expressions=[],
@ -222,7 +222,7 @@ class MessageService:
for doc in docs.values(): for doc in docs.values():
if current_size < size_to_delete: if current_size < size_to_delete:
current_size += cls.calculate_message_size(doc) current_size += cls.calculate_message_size(doc)
ids_to_remove.append(doc["memory_id"]) ids_to_remove.append(doc["message_id"])
else: else:
return ids_to_remove, current_size return ids_to_remove, current_size
return ids_to_remove, current_size return ids_to_remove, current_size

View File

@ -127,6 +127,11 @@ class ESConnection(ESConnectionBase):
index_names = index_names.split(",") index_names = index_names.split(",")
assert isinstance(index_names, list) and len(index_names) > 0 assert isinstance(index_names, list) and len(index_names) > 0
assert "_id" not in condition assert "_id" not in condition
exist_index_list = [idx for idx in index_names if self.index_exist(idx)]
if not exist_index_list:
return None
bool_query = Q("bool", must=[], must_not=[]) bool_query = Q("bool", must=[], must_not=[])
if hide_forgotten: if hide_forgotten:
# filter not forget # filter not forget
@ -214,7 +219,7 @@ class ESConnection(ESConnectionBase):
for i in range(ATTEMPT_TIME): for i in range(ATTEMPT_TIME):
try: try:
#print(json.dumps(q, ensure_ascii=False)) #print(json.dumps(q, ensure_ascii=False))
res = self.es.search(index=index_names, res = self.es.search(index=exist_index_list,
body=q, body=q,
timeout="600s", timeout="600s",
# search_type="dfs_query_then_fetch", # search_type="dfs_query_then_fetch",
@ -239,8 +244,8 @@ class ESConnection(ESConnectionBase):
raise Exception("ESConnection.search timeout.") raise Exception("ESConnection.search timeout.")
def get_forgotten_messages(self, select_fields: list[str], index_name: str, memory_id: str, limit: int=512): def get_forgotten_messages(self, select_fields: list[str], index_name: str, memory_id: str, limit: int=512):
bool_query = Q("bool", must_not=[]) bool_query = Q("bool", must=[])
bool_query.must_not.append(Q("term", forget_at=None)) bool_query.must.append(Q("exists", field="forget_at"))
bool_query.filter.append(Q("term", memory_id=memory_id)) bool_query.filter.append(Q("term", memory_id=memory_id))
# from old to new # from old to new
order_by = OrderByExpr() order_by = OrderByExpr()