From ef5341b664559275c201224de70053733863c724 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Sat, 27 Dec 2025 20:25:06 +0800 Subject: [PATCH] Fix memory issue on Infinity 0.6.15 (#12258) ### What problem does this PR solve? 1. Remove unused columns 2. Check the empty database 3. Switch on the order by expression ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --------- Signed-off-by: Jin Hai --- memory/services/messages.py | 6 ++++-- memory/utils/es_conn.py | 4 ++-- memory/utils/infinity_conn.py | 31 ++++++++----------------------- 3 files changed, 14 insertions(+), 27 deletions(-) diff --git a/memory/services/messages.py b/memory/services/messages.py index 29f0f4da1..18b774432 100644 --- a/memory/services/messages.py +++ b/memory/services/messages.py @@ -168,7 +168,7 @@ class MessageService: order_by = OrderByExpr() order_by.desc("valid_at") - res = settings.msgStoreConn.search( + res, count = settings.msgStoreConn.search( select_fields=["memory_id", "content", "content_embed"], highlight_fields=[], condition={}, @@ -177,8 +177,10 @@ class MessageService: offset=0, limit=2048*len(memory_ids), index_names=index_names, memory_ids=memory_ids, agg_fields=[], hide_forgotten=False ) - if not res: + + if count == 0: return {} + docs = settings.msgStoreConn.get_fields(res, ["memory_id", "content", "content_embed"]) size_dict = {} for doc in docs.values(): diff --git a/memory/utils/es_conn.py b/memory/utils/es_conn.py index b75b9df56..2c635ac51 100644 --- a/memory/utils/es_conn.py +++ b/memory/utils/es_conn.py @@ -228,14 +228,14 @@ class ESConnection(ESConnectionBase): if str(res.get("timed_out", "")).lower() == "true": raise Exception("Es Timeout.") self.logger.debug(f"ESConnection.search {str(index_names)} res: " + str(res)) - return res + return res, self.get_total(res) except ConnectionTimeout: self.logger.exception("ES request timeout") self._connect() continue except NotFoundError as e: self.logger.debug(f"ESConnection.search {str(index_names)} query: " + str(q) + str(e)) - return None + return None, 0 except Exception as e: self.logger.exception(f"ESConnection.search {str(index_names)} query: " + str(q) + str(e)) raise e diff --git a/memory/utils/infinity_conn.py b/memory/utils/infinity_conn.py index 91bcbf2e9..932655a1d 100644 --- a/memory/utils/infinity_conn.py +++ b/memory/utils/infinity_conn.py @@ -22,7 +22,6 @@ from infinity.errors import ErrorCode from common.decorator import singleton import pandas as pd -from common.constants import PAGERANK_FLD, TAG_FLD from common.doc_store.doc_store_base import MatchExpr, MatchTextExpr, MatchDenseExpr, FusionExpr, OrderByExpr from common.doc_store.infinity_conn_base import InfinityConnectionBase from common.time_utils import date_string_to_timestamp @@ -150,8 +149,6 @@ class InfinityConnection(InfinityConnectionBase): if match_expressions: if score_func not in output: output.append(score_func) - if PAGERANK_FLD not in output: - output.append(PAGERANK_FLD) output = [f for f in output if f != "_score"] if limit <= 0: # ElasticSearch default limit is 10000 @@ -192,17 +189,6 @@ class InfinityConnection(InfinityConnectionBase): str_minimum_should_match = str(int(minimum_should_match * 100)) + "%" matchExpr.extra_options["minimum_should_match"] = str_minimum_should_match - # Add rank_feature support - if rank_feature and "rank_features" not in matchExpr.extra_options: - # Convert rank_feature dict to Infinity's rank_features string format - # Format: "field^feature_name^weight,field^feature_name^weight" - rank_features_list = [] - for feature_name, weight in rank_feature.items(): - # Use TAG_FLD as the field containing rank features - rank_features_list.append(f"{TAG_FLD}^{feature_name}^{weight}") - if rank_features_list: - matchExpr.extra_options["rank_features"] = ",".join(rank_features_list) - for k, v in matchExpr.extra_options.items(): if not isinstance(v, str): matchExpr.extra_options[k] = str(v) @@ -225,14 +211,13 @@ class InfinityConnection(InfinityConnectionBase): self.logger.debug(f"INFINITY search FusionExpr: {json.dumps(matchExpr.__dict__)}") order_by_expr_list = list() - # todo use order_by after infinity fixed bug - # if order_by.fields: - # for order_field in order_by.fields: - # order_field_name = self.convert_condition_and_order_field(order_field[0]) - # if order_field[1] == 0: - # order_by_expr_list.append((order_field_name, SortType.Asc)) - # else: - # order_by_expr_list.append((order_field_name, SortType.Desc)) + if order_by.fields: + for order_field in order_by.fields: + order_field_name = self.convert_condition_and_order_field(order_field[0]) + if order_field[1] == 0: + order_by_expr_list.append((order_field_name, SortType.Asc)) + else: + order_by_expr_list.append((order_field_name, SortType.Desc)) total_hits_count = 0 # Scatter search tables and gather the results @@ -284,7 +269,7 @@ class InfinityConnection(InfinityConnectionBase): self.connPool.release_conn(inf_conn) res = self.concat_dataframes(df_list, output) if match_expressions: - res["_score"] = res[score_column] + res[PAGERANK_FLD] + res["_score"] = res[score_column] res = res.sort_values(by="_score", ascending=False).reset_index(drop=True) res = res.head(limit) self.logger.debug(f"INFINITY search final result: {str(res)}")