From 51210a17621df2216b4c1c0cfd975f970ac91aef Mon Sep 17 00:00:00 2001 From: akie <103188271+zpf121@users.noreply.github.com> Date: Mon, 2 Feb 2026 13:22:29 +0800 Subject: [PATCH] Add secondary index to infinity (#12825) Add secondary index: 1. kb_id 2. available_int --------- Signed-off-by: zpf121 <1219290549@qq.com> Co-authored-by: Yingfeng Zhang --- common/doc_store/infinity_conn_base.py | 142 ++++++++++++++----------- conf/infinity_mapping.json | 4 +- 2 files changed, 84 insertions(+), 62 deletions(-) diff --git a/common/doc_store/infinity_conn_base.py b/common/doc_store/infinity_conn_base.py index 4389289d9..294192bd6 100644 --- a/common/doc_store/infinity_conn_base.py +++ b/common/doc_store/infinity_conn_base.py @@ -33,7 +33,7 @@ from common.doc_store.doc_store_base import DocStoreConnection, MatchExpr, Order class InfinityConnectionBase(DocStoreConnection): - def __init__(self, mapping_file_name: str="infinity_mapping.json", logger_name: str="ragflow.infinity_conn"): + def __init__(self, mapping_file_name: str = "infinity_mapping.json", logger_name: str = "ragflow.infinity_conn"): from common.doc_store.infinity_conn_pool import INFINITY_CONN self.dbName = settings.INFINITY.get("db_name", "default_db") @@ -85,22 +85,43 @@ class InfinityConnectionBase(DocStoreConnection): column_names = inf_table.show_columns()["name"] column_names = set(column_names) for field_name, field_info in schema.items(): - if field_name in column_names: - continue - res = inf_table.add_columns({field_name: field_info}) - assert res.error_code == infinity.ErrorCode.OK - self.logger.info(f"INFINITY added following column to table {table_name}: {field_name} {field_info}") - if field_info["type"] != "varchar" or "analyzer" not in field_info: - continue - analyzers = field_info["analyzer"] - if isinstance(analyzers, str): - analyzers = [analyzers] - for analyzer in analyzers: - inf_table.create_index( - f"ft_{re.sub(r'[^a-zA-Z0-9]', '_', field_name)}_{re.sub(r'[^a-zA-Z0-9]', '_', analyzer)}", - IndexInfo(field_name, IndexType.FullText, {"ANALYZER": analyzer}), - ConflictType.Ignore, - ) + is_new_column = field_name not in column_names + if is_new_column: + res = inf_table.add_columns({field_name: field_info}) + assert res.error_code == infinity.ErrorCode.OK + self.logger.info(f"INFINITY added following column to table {table_name}: {field_name} {field_info}") + + if field_info["type"] == "varchar" and "analyzer" in field_info: + analyzers = field_info["analyzer"] + if isinstance(analyzers, str): + analyzers = [analyzers] + for analyzer in analyzers: + inf_table.create_index( + f"ft_{re.sub(r'[^a-zA-Z0-9]', '_', field_name)}_{re.sub(r'[^a-zA-Z0-9]', '_', analyzer)}", + IndexInfo(field_name, IndexType.FullText, {"ANALYZER": analyzer}), + ConflictType.Ignore, + ) + + if "index_type" in field_info: + index_config = field_info["index_type"] + if isinstance(index_config, str) and index_config == "secondary": + inf_table.create_index( + f"sec_{field_name}", + IndexInfo(field_name, IndexType.Secondary), + ConflictType.Ignore, + ) + self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name}") + elif isinstance(index_config, dict): + if index_config.get("type") == "secondary": + params = {} + if "cardinality" in index_config: + params = {"cardinality": index_config["cardinality"]} + inf_table.create_index( + f"sec_{field_name}", + IndexInfo(field_name, IndexType.Secondary, params), + ConflictType.Ignore, + ) + self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name} with params {params}") """ Dataframe and fields convert @@ -244,6 +265,7 @@ class InfinityConnectionBase(DocStoreConnection): if parser_id is not None: from common.constants import ParserType + if parser_id == ParserType.TABLE.value: # Table parser: add chunk_data JSON column to store table-specific fields schema["chunk_data"] = {"type": "json", "default": "{}"} @@ -282,6 +304,31 @@ class InfinityConnectionBase(DocStoreConnection): IndexInfo(field_name, IndexType.FullText, {"ANALYZER": analyzer}), ConflictType.Ignore, ) + + # Create secondary indexes for fields with index_type + for field_name, field_info in schema.items(): + if "index_type" not in field_info: + continue + index_config = field_info["index_type"] + if isinstance(index_config, str) and index_config == "secondary": + inf_table.create_index( + f"sec_{field_name}", + IndexInfo(field_name, IndexType.Secondary), + ConflictType.Ignore, + ) + self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name}") + elif isinstance(index_config, dict): + if index_config.get("type") == "secondary": + params = {} + if "cardinality" in index_config: + params = {"cardinality": index_config["cardinality"]} + inf_table.create_index( + f"sec_{field_name}", + IndexInfo(field_name, IndexType.Secondary, params), + ConflictType.Ignore, + ) + self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name} with params {params}") + self.connPool.release_conn(inf_conn) self.logger.info(f"INFINITY created table {table_name}, vector size {vector_size}") return True @@ -568,50 +615,27 @@ class InfinityConnectionBase(DocStoreConnection): # Apply field transformations for alias, actual in field_mapping.items(): - select_clause = re.sub( - rf'(^|[, ]){alias}([, ]|$)', - rf'\1{actual}\2', - select_clause - ) + select_clause = re.sub(rf"(^|[, ]){alias}([, ]|$)", rf"\1{actual}\2", select_clause) - sql = select_clause + from_clause + sql[select_match.end():] + sql = select_clause + from_clause + sql[select_match.end() :] # Also replace field names in WHERE, ORDER BY, GROUP BY, and HAVING clauses for alias, actual in field_mapping.items(): # Transform in WHERE clause - sql = re.sub( - rf'(\bwhere\s+[^;]*?)(\b){re.escape(alias)}\b', - rf'\1{actual}', - sql, - flags=re.IGNORECASE - ) + sql = re.sub(rf"(\bwhere\s+[^;]*?)(\b){re.escape(alias)}\b", rf"\1{actual}", sql, flags=re.IGNORECASE) # Transform in ORDER BY clause - sql = re.sub( - rf'(\border by\s+[^;]*?)(\b){re.escape(alias)}\b', - rf'\1{actual}', - sql, - flags=re.IGNORECASE - ) + sql = re.sub(rf"(\border by\s+[^;]*?)(\b){re.escape(alias)}\b", rf"\1{actual}", sql, flags=re.IGNORECASE) # Transform in GROUP BY clause - sql = re.sub( - rf'(\bgroup by\s+[^;]*?)(\b){re.escape(alias)}\b', - rf'\1{actual}', - sql, - flags=re.IGNORECASE - ) + sql = re.sub(rf"(\bgroup by\s+[^;]*?)(\b){re.escape(alias)}\b", rf"\1{actual}", sql, flags=re.IGNORECASE) # Transform in HAVING clause - sql = re.sub( - rf'(\bhaving\s+[^;]*?)(\b){re.escape(alias)}\b', - rf'\1{actual}', - sql, - flags=re.IGNORECASE - ) + sql = re.sub(rf"(\bhaving\s+[^;]*?)(\b){re.escape(alias)}\b", rf"\1{actual}", sql, flags=re.IGNORECASE) self.logger.debug(f"InfinityConnection.sql to execute: {sql}") # Get connection parameters from the Infinity connection pool wrapper # We need to use INFINITY_CONN singleton, not the raw ConnectionPool from common.doc_store.infinity_conn_pool import INFINITY_CONN + conn_info = INFINITY_CONN.get_conn_uri() # Parse host and port from conn_info @@ -639,6 +663,7 @@ class InfinityConnectionBase(DocStoreConnection): psql_path = "/usr/bin/psql" # Check if psql exists at expected location, otherwise try to find it import shutil + psql_from_path = shutil.which("psql") if psql_from_path: psql_path = psql_from_path @@ -646,9 +671,12 @@ class InfinityConnectionBase(DocStoreConnection): # Execute SQL with psql to get both column names and data in one call psql_cmd = [ psql_path, - "-h", host, - "-p", port, - "-c", sql, + "-h", + host, + "-p", + port, + "-c", + sql, ] self.logger.debug(f"Executing psql command: {' '.join(psql_cmd)}") @@ -657,7 +685,7 @@ class InfinityConnectionBase(DocStoreConnection): psql_cmd, capture_output=True, text=True, - timeout=10 # 10 second timeout + timeout=10, # 10 second timeout ) if result.returncode != 0: @@ -668,10 +696,7 @@ class InfinityConnectionBase(DocStoreConnection): output = result.stdout.strip() if not output: # No results - return { - "columns": [], - "rows": [] - } if format == "json" else [] + return {"columns": [], "rows": []} if format == "json" else [] # Parse psql table output which has format: # col1 | col2 | col3 @@ -704,16 +729,13 @@ class InfinityConnectionBase(DocStoreConnection): rows.append(row) elif len(row) > len(columns): # Row has more cells than columns - truncate - rows.append(row[:len(columns)]) + rows.append(row[: len(columns)]) elif len(row) < len(columns): # Row has fewer cells - pad with empty strings rows.append(row + [""] * (len(columns) - len(row))) if format == "json": - result = { - "columns": columns, - "rows": rows[:fetch_size] if fetch_size > 0 else rows - } + result = {"columns": columns, "rows": rows[:fetch_size] if fetch_size > 0 else rows} else: result = rows[:fetch_size] if fetch_size > 0 else rows diff --git a/conf/infinity_mapping.json b/conf/infinity_mapping.json index 94909f8ff..83e3d5f98 100644 --- a/conf/infinity_mapping.json +++ b/conf/infinity_mapping.json @@ -1,7 +1,7 @@ { "id": {"type": "varchar", "default": ""}, "doc_id": {"type": "varchar", "default": ""}, - "kb_id": {"type": "varchar", "default": ""}, + "kb_id": {"type": "varchar", "default": "", "index_type": {"type": "secondary", "cardinality": "low"}}, "mom_id": {"type": "varchar", "default": ""}, "create_time": {"type": "varchar", "default": ""}, "create_timestamp_flt": {"type": "float", "default": 0.0}, @@ -21,7 +21,7 @@ "weight_flt": {"type": "float", "default": 0.0}, "rank_int": {"type": "integer", "default": 0}, "rank_flt": {"type": "float", "default": 0}, - "available_int": {"type": "integer", "default": 1}, + "available_int": {"type": "integer", "default": 1, "index_type": {"type": "secondary", "cardinality": "low"}}, "knowledge_graph_kwd": {"type": "varchar", "default": ""}, "entities_kwd": {"type": "varchar", "default": "", "analyzer": "whitespace-#"}, "pagerank_fea": {"type": "integer", "default": 0},