Add secondary index to infinity (#12825)

Add secondary index:
1. kb_id
2. available_int

---------

Signed-off-by: zpf121 <1219290549@qq.com>
Co-authored-by: Yingfeng Zhang <yingfeng.zhang@gmail.com>
This commit is contained in:
akie
2026-02-02 13:22:29 +08:00
committed by GitHub
parent 01f9b98c3f
commit 51210a1762
2 changed files with 84 additions and 62 deletions

View File

@ -85,13 +85,13 @@ class InfinityConnectionBase(DocStoreConnection):
column_names = inf_table.show_columns()["name"] column_names = inf_table.show_columns()["name"]
column_names = set(column_names) column_names = set(column_names)
for field_name, field_info in schema.items(): for field_name, field_info in schema.items():
if field_name in column_names: is_new_column = field_name not in column_names
continue if is_new_column:
res = inf_table.add_columns({field_name: field_info}) res = inf_table.add_columns({field_name: field_info})
assert res.error_code == infinity.ErrorCode.OK assert res.error_code == infinity.ErrorCode.OK
self.logger.info(f"INFINITY added following column to table {table_name}: {field_name} {field_info}") self.logger.info(f"INFINITY added following column to table {table_name}: {field_name} {field_info}")
if field_info["type"] != "varchar" or "analyzer" not in field_info:
continue if field_info["type"] == "varchar" and "analyzer" in field_info:
analyzers = field_info["analyzer"] analyzers = field_info["analyzer"]
if isinstance(analyzers, str): if isinstance(analyzers, str):
analyzers = [analyzers] analyzers = [analyzers]
@ -102,6 +102,27 @@ class InfinityConnectionBase(DocStoreConnection):
ConflictType.Ignore, ConflictType.Ignore,
) )
if "index_type" in field_info:
index_config = field_info["index_type"]
if isinstance(index_config, str) and index_config == "secondary":
inf_table.create_index(
f"sec_{field_name}",
IndexInfo(field_name, IndexType.Secondary),
ConflictType.Ignore,
)
self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name}")
elif isinstance(index_config, dict):
if index_config.get("type") == "secondary":
params = {}
if "cardinality" in index_config:
params = {"cardinality": index_config["cardinality"]}
inf_table.create_index(
f"sec_{field_name}",
IndexInfo(field_name, IndexType.Secondary, params),
ConflictType.Ignore,
)
self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name} with params {params}")
""" """
Dataframe and fields convert Dataframe and fields convert
""" """
@ -244,6 +265,7 @@ class InfinityConnectionBase(DocStoreConnection):
if parser_id is not None: if parser_id is not None:
from common.constants import ParserType from common.constants import ParserType
if parser_id == ParserType.TABLE.value: if parser_id == ParserType.TABLE.value:
# Table parser: add chunk_data JSON column to store table-specific fields # Table parser: add chunk_data JSON column to store table-specific fields
schema["chunk_data"] = {"type": "json", "default": "{}"} schema["chunk_data"] = {"type": "json", "default": "{}"}
@ -282,6 +304,31 @@ class InfinityConnectionBase(DocStoreConnection):
IndexInfo(field_name, IndexType.FullText, {"ANALYZER": analyzer}), IndexInfo(field_name, IndexType.FullText, {"ANALYZER": analyzer}),
ConflictType.Ignore, ConflictType.Ignore,
) )
# Create secondary indexes for fields with index_type
for field_name, field_info in schema.items():
if "index_type" not in field_info:
continue
index_config = field_info["index_type"]
if isinstance(index_config, str) and index_config == "secondary":
inf_table.create_index(
f"sec_{field_name}",
IndexInfo(field_name, IndexType.Secondary),
ConflictType.Ignore,
)
self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name}")
elif isinstance(index_config, dict):
if index_config.get("type") == "secondary":
params = {}
if "cardinality" in index_config:
params = {"cardinality": index_config["cardinality"]}
inf_table.create_index(
f"sec_{field_name}",
IndexInfo(field_name, IndexType.Secondary, params),
ConflictType.Ignore,
)
self.logger.info(f"INFINITY created secondary index sec_{field_name} for field {field_name} with params {params}")
self.connPool.release_conn(inf_conn) self.connPool.release_conn(inf_conn)
self.logger.info(f"INFINITY created table {table_name}, vector size {vector_size}") self.logger.info(f"INFINITY created table {table_name}, vector size {vector_size}")
return True return True
@ -568,50 +615,27 @@ class InfinityConnectionBase(DocStoreConnection):
# Apply field transformations # Apply field transformations
for alias, actual in field_mapping.items(): for alias, actual in field_mapping.items():
select_clause = re.sub( select_clause = re.sub(rf"(^|[, ]){alias}([, ]|$)", rf"\1{actual}\2", select_clause)
rf'(^|[, ]){alias}([, ]|$)',
rf'\1{actual}\2',
select_clause
)
sql = select_clause + from_clause + sql[select_match.end() :] sql = select_clause + from_clause + sql[select_match.end() :]
# Also replace field names in WHERE, ORDER BY, GROUP BY, and HAVING clauses # Also replace field names in WHERE, ORDER BY, GROUP BY, and HAVING clauses
for alias, actual in field_mapping.items(): for alias, actual in field_mapping.items():
# Transform in WHERE clause # Transform in WHERE clause
sql = re.sub( sql = re.sub(rf"(\bwhere\s+[^;]*?)(\b){re.escape(alias)}\b", rf"\1{actual}", sql, flags=re.IGNORECASE)
rf'(\bwhere\s+[^;]*?)(\b){re.escape(alias)}\b',
rf'\1{actual}',
sql,
flags=re.IGNORECASE
)
# Transform in ORDER BY clause # Transform in ORDER BY clause
sql = re.sub( sql = re.sub(rf"(\border by\s+[^;]*?)(\b){re.escape(alias)}\b", rf"\1{actual}", sql, flags=re.IGNORECASE)
rf'(\border by\s+[^;]*?)(\b){re.escape(alias)}\b',
rf'\1{actual}',
sql,
flags=re.IGNORECASE
)
# Transform in GROUP BY clause # Transform in GROUP BY clause
sql = re.sub( sql = re.sub(rf"(\bgroup by\s+[^;]*?)(\b){re.escape(alias)}\b", rf"\1{actual}", sql, flags=re.IGNORECASE)
rf'(\bgroup by\s+[^;]*?)(\b){re.escape(alias)}\b',
rf'\1{actual}',
sql,
flags=re.IGNORECASE
)
# Transform in HAVING clause # Transform in HAVING clause
sql = re.sub( sql = re.sub(rf"(\bhaving\s+[^;]*?)(\b){re.escape(alias)}\b", rf"\1{actual}", sql, flags=re.IGNORECASE)
rf'(\bhaving\s+[^;]*?)(\b){re.escape(alias)}\b',
rf'\1{actual}',
sql,
flags=re.IGNORECASE
)
self.logger.debug(f"InfinityConnection.sql to execute: {sql}") self.logger.debug(f"InfinityConnection.sql to execute: {sql}")
# Get connection parameters from the Infinity connection pool wrapper # Get connection parameters from the Infinity connection pool wrapper
# We need to use INFINITY_CONN singleton, not the raw ConnectionPool # We need to use INFINITY_CONN singleton, not the raw ConnectionPool
from common.doc_store.infinity_conn_pool import INFINITY_CONN from common.doc_store.infinity_conn_pool import INFINITY_CONN
conn_info = INFINITY_CONN.get_conn_uri() conn_info = INFINITY_CONN.get_conn_uri()
# Parse host and port from conn_info # Parse host and port from conn_info
@ -639,6 +663,7 @@ class InfinityConnectionBase(DocStoreConnection):
psql_path = "/usr/bin/psql" psql_path = "/usr/bin/psql"
# Check if psql exists at expected location, otherwise try to find it # Check if psql exists at expected location, otherwise try to find it
import shutil import shutil
psql_from_path = shutil.which("psql") psql_from_path = shutil.which("psql")
if psql_from_path: if psql_from_path:
psql_path = psql_from_path psql_path = psql_from_path
@ -646,9 +671,12 @@ class InfinityConnectionBase(DocStoreConnection):
# Execute SQL with psql to get both column names and data in one call # Execute SQL with psql to get both column names and data in one call
psql_cmd = [ psql_cmd = [
psql_path, psql_path,
"-h", host, "-h",
"-p", port, host,
"-c", sql, "-p",
port,
"-c",
sql,
] ]
self.logger.debug(f"Executing psql command: {' '.join(psql_cmd)}") self.logger.debug(f"Executing psql command: {' '.join(psql_cmd)}")
@ -657,7 +685,7 @@ class InfinityConnectionBase(DocStoreConnection):
psql_cmd, psql_cmd,
capture_output=True, capture_output=True,
text=True, text=True,
timeout=10 # 10 second timeout timeout=10, # 10 second timeout
) )
if result.returncode != 0: if result.returncode != 0:
@ -668,10 +696,7 @@ class InfinityConnectionBase(DocStoreConnection):
output = result.stdout.strip() output = result.stdout.strip()
if not output: if not output:
# No results # No results
return { return {"columns": [], "rows": []} if format == "json" else []
"columns": [],
"rows": []
} if format == "json" else []
# Parse psql table output which has format: # Parse psql table output which has format:
# col1 | col2 | col3 # col1 | col2 | col3
@ -710,10 +735,7 @@ class InfinityConnectionBase(DocStoreConnection):
rows.append(row + [""] * (len(columns) - len(row))) rows.append(row + [""] * (len(columns) - len(row)))
if format == "json": if format == "json":
result = { result = {"columns": columns, "rows": rows[:fetch_size] if fetch_size > 0 else rows}
"columns": columns,
"rows": rows[:fetch_size] if fetch_size > 0 else rows
}
else: else:
result = rows[:fetch_size] if fetch_size > 0 else rows result = rows[:fetch_size] if fetch_size > 0 else rows

View File

@ -1,7 +1,7 @@
{ {
"id": {"type": "varchar", "default": ""}, "id": {"type": "varchar", "default": ""},
"doc_id": {"type": "varchar", "default": ""}, "doc_id": {"type": "varchar", "default": ""},
"kb_id": {"type": "varchar", "default": ""}, "kb_id": {"type": "varchar", "default": "", "index_type": {"type": "secondary", "cardinality": "low"}},
"mom_id": {"type": "varchar", "default": ""}, "mom_id": {"type": "varchar", "default": ""},
"create_time": {"type": "varchar", "default": ""}, "create_time": {"type": "varchar", "default": ""},
"create_timestamp_flt": {"type": "float", "default": 0.0}, "create_timestamp_flt": {"type": "float", "default": 0.0},
@ -21,7 +21,7 @@
"weight_flt": {"type": "float", "default": 0.0}, "weight_flt": {"type": "float", "default": 0.0},
"rank_int": {"type": "integer", "default": 0}, "rank_int": {"type": "integer", "default": 0},
"rank_flt": {"type": "float", "default": 0}, "rank_flt": {"type": "float", "default": 0},
"available_int": {"type": "integer", "default": 1}, "available_int": {"type": "integer", "default": 1, "index_type": {"type": "secondary", "cardinality": "low"}},
"knowledge_graph_kwd": {"type": "varchar", "default": ""}, "knowledge_graph_kwd": {"type": "varchar", "default": ""},
"entities_kwd": {"type": "varchar", "default": "", "analyzer": "whitespace-#"}, "entities_kwd": {"type": "varchar", "default": "", "analyzer": "whitespace-#"},
"pagerank_fea": {"type": "integer", "default": 0}, "pagerank_fea": {"type": "integer", "default": 0},