Fix memory issue on Infinity 0.6.15 (#12258)

### What problem does this PR solve?

1. Remove unused columns
2. Check the empty database
3. Switch on the order by expression

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai
2025-12-27 20:25:06 +08:00
committed by GitHub
parent 050534e743
commit ef5341b664
3 changed files with 14 additions and 27 deletions

View File

@ -228,14 +228,14 @@ class ESConnection(ESConnectionBase):
if str(res.get("timed_out", "")).lower() == "true":
raise Exception("Es Timeout.")
self.logger.debug(f"ESConnection.search {str(index_names)} res: " + str(res))
return res
return res, self.get_total(res)
except ConnectionTimeout:
self.logger.exception("ES request timeout")
self._connect()
continue
except NotFoundError as e:
self.logger.debug(f"ESConnection.search {str(index_names)} query: " + str(q) + str(e))
return None
return None, 0
except Exception as e:
self.logger.exception(f"ESConnection.search {str(index_names)} query: " + str(q) + str(e))
raise e

View File

@ -22,7 +22,6 @@ from infinity.errors import ErrorCode
from common.decorator import singleton
import pandas as pd
from common.constants import PAGERANK_FLD, TAG_FLD
from common.doc_store.doc_store_base import MatchExpr, MatchTextExpr, MatchDenseExpr, FusionExpr, OrderByExpr
from common.doc_store.infinity_conn_base import InfinityConnectionBase
from common.time_utils import date_string_to_timestamp
@ -150,8 +149,6 @@ class InfinityConnection(InfinityConnectionBase):
if match_expressions:
if score_func not in output:
output.append(score_func)
if PAGERANK_FLD not in output:
output.append(PAGERANK_FLD)
output = [f for f in output if f != "_score"]
if limit <= 0:
# ElasticSearch default limit is 10000
@ -192,17 +189,6 @@ class InfinityConnection(InfinityConnectionBase):
str_minimum_should_match = str(int(minimum_should_match * 100)) + "%"
matchExpr.extra_options["minimum_should_match"] = str_minimum_should_match
# Add rank_feature support
if rank_feature and "rank_features" not in matchExpr.extra_options:
# Convert rank_feature dict to Infinity's rank_features string format
# Format: "field^feature_name^weight,field^feature_name^weight"
rank_features_list = []
for feature_name, weight in rank_feature.items():
# Use TAG_FLD as the field containing rank features
rank_features_list.append(f"{TAG_FLD}^{feature_name}^{weight}")
if rank_features_list:
matchExpr.extra_options["rank_features"] = ",".join(rank_features_list)
for k, v in matchExpr.extra_options.items():
if not isinstance(v, str):
matchExpr.extra_options[k] = str(v)
@ -225,14 +211,13 @@ class InfinityConnection(InfinityConnectionBase):
self.logger.debug(f"INFINITY search FusionExpr: {json.dumps(matchExpr.__dict__)}")
order_by_expr_list = list()
# todo use order_by after infinity fixed bug
# if order_by.fields:
# for order_field in order_by.fields:
# order_field_name = self.convert_condition_and_order_field(order_field[0])
# if order_field[1] == 0:
# order_by_expr_list.append((order_field_name, SortType.Asc))
# else:
# order_by_expr_list.append((order_field_name, SortType.Desc))
if order_by.fields:
for order_field in order_by.fields:
order_field_name = self.convert_condition_and_order_field(order_field[0])
if order_field[1] == 0:
order_by_expr_list.append((order_field_name, SortType.Asc))
else:
order_by_expr_list.append((order_field_name, SortType.Desc))
total_hits_count = 0
# Scatter search tables and gather the results
@ -284,7 +269,7 @@ class InfinityConnection(InfinityConnectionBase):
self.connPool.release_conn(inf_conn)
res = self.concat_dataframes(df_list, output)
if match_expressions:
res["_score"] = res[score_column] + res[PAGERANK_FLD]
res["_score"] = res[score_column]
res = res.sort_values(by="_score", ascending=False).reset_index(drop=True)
res = res.head(limit)
self.logger.debug(f"INFINITY search final result: {str(res)}")