Fix highlight with infinity (#10345)

Fix highlight with infinity
Fix on OpenSUSE Tumbleweed

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Zhichang Yu
2025-09-30 19:15:01 +08:00
committed by GitHub
parent aa61ae24dc
commit 518a00630e
5 changed files with 94 additions and 98 deletions

View File

@ -341,11 +341,13 @@ docker build --platform linux/amd64 -f Dockerfile -t infiniflow/ragflow:nightly
5. If your operating system does not have jemalloc, please install it as follows:
```bash
# ubuntu
# Ubuntu
sudo apt-get install libjemalloc-dev
# centos
# CentOS
sudo yum install jemalloc
# mac
# OpenSUSE
sudo zypper install jemalloc
# macOS
sudo brew install jemalloc
```

View File

@ -6,7 +6,6 @@
# dependencies = [
# "huggingface-hub",
# "nltk",
# "argparse",
# ]
# ///

View File

@ -118,7 +118,7 @@ dependencies = [
"graspologic>=3.4.1,<4.0.0",
"mini-racer>=0.12.4,<0.13.0",
"pyodbc>=5.2.0,<6.0.0",
"pyicu>=2.13.1,<3.0.0",
"pyicu>=2.15.3,<3.0.0",
"flasgger>=0.9.7.1,<0.10.0",
"xxhash>=3.5.0,<4.0.0",
"trio>=0.29.0",

View File

@ -30,6 +30,7 @@ from rag.settings import PAGERANK_FLD, TAG_FLD
from rag.utils import singleton
import pandas as pd
from api.utils.file_utils import get_project_base_directory
from rag.nlp import is_english
from rag.utils.doc_store_conn import (
DocStoreConnection,
@ -40,13 +41,15 @@ from rag.utils.doc_store_conn import (
OrderByExpr,
)
logger = logging.getLogger('ragflow.infinity_conn')
logger = logging.getLogger("ragflow.infinity_conn")
def field_keyword(field_name: str):
# The "docnm_kwd" field is always a string, not list.
if field_name == "source_id" or (field_name.endswith("_kwd") and field_name != "docnm_kwd" and field_name != "knowledge_graph_kwd"):
return True
return False
# The "docnm_kwd" field is always a string, not list.
if field_name == "source_id" or (field_name.endswith("_kwd") and field_name != "docnm_kwd" and field_name != "knowledge_graph_kwd"):
return True
return False
def equivalent_condition_to_str(condition: dict, table_instance=None) -> str | None:
assert "_id" not in condition
@ -74,7 +77,7 @@ def equivalent_condition_to_str(condition: dict, table_instance=None) -> str | N
inCond = list()
for item in v:
if isinstance(item, str):
item = item.replace("'","''")
item = item.replace("'", "''")
inCond.append(f"filter_fulltext('{k}', '{item}')")
if inCond:
strInCond = " or ".join(inCond)
@ -86,7 +89,7 @@ def equivalent_condition_to_str(condition: dict, table_instance=None) -> str | N
inCond = list()
for item in v:
if isinstance(item, str):
item = item.replace("'","''")
item = item.replace("'", "''")
inCond.append(f"'{item}'")
else:
inCond.append(str(item))
@ -115,10 +118,10 @@ def concat_dataframes(df_list: list[pd.DataFrame], selectFields: list[str]) -> p
schema = []
for field_name in selectFields:
if field_name == 'score()': # Workaround: fix schema is changed to score()
schema.append('SCORE')
elif field_name == 'similarity()': # Workaround: fix schema is changed to similarity()
schema.append('SIMILARITY')
if field_name == "score()": # Workaround: fix schema is changed to score()
schema.append("SCORE")
elif field_name == "similarity()": # Workaround: fix schema is changed to similarity()
schema.append("SIMILARITY")
else:
schema.append(field_name)
return pd.DataFrame(columns=schema)
@ -158,9 +161,7 @@ class InfinityConnection(DocStoreConnection):
def _migrate_db(self, inf_conn):
inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
fp_mapping = os.path.join(
get_project_base_directory(), "conf", "infinity_mapping.json"
)
fp_mapping = os.path.join(get_project_base_directory(), "conf", "infinity_mapping.json")
if not os.path.exists(fp_mapping):
raise Exception(f"Mapping file not found at {fp_mapping}")
schema = json.load(open(fp_mapping))
@ -178,16 +179,12 @@ class InfinityConnection(DocStoreConnection):
continue
res = inf_table.add_columns({field_name: field_info})
assert res.error_code == infinity.ErrorCode.OK
logger.info(
f"INFINITY added following column to table {table_name}: {field_name} {field_info}"
)
logger.info(f"INFINITY added following column to table {table_name}: {field_name} {field_info}")
if field_info["type"] != "varchar" or "analyzer" not in field_info:
continue
inf_table.create_index(
f"text_idx_{field_name}",
IndexInfo(
field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}
),
IndexInfo(field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}),
ConflictType.Ignore,
)
@ -221,9 +218,7 @@ class InfinityConnection(DocStoreConnection):
inf_conn = self.connPool.get_conn()
inf_db = inf_conn.create_database(self.dbName, ConflictType.Ignore)
fp_mapping = os.path.join(
get_project_base_directory(), "conf", "infinity_mapping.json"
)
fp_mapping = os.path.join(get_project_base_directory(), "conf", "infinity_mapping.json")
if not os.path.exists(fp_mapping):
raise Exception(f"Mapping file not found at {fp_mapping}")
schema = json.load(open(fp_mapping))
@ -253,15 +248,11 @@ class InfinityConnection(DocStoreConnection):
continue
inf_table.create_index(
f"text_idx_{field_name}",
IndexInfo(
field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}
),
IndexInfo(field_name, IndexType.FullText, {"ANALYZER": field_info["analyzer"]}),
ConflictType.Ignore,
)
self.connPool.release_conn(inf_conn)
logger.info(
f"INFINITY created table {table_name}, vector size {vectorSize}"
)
logger.info(f"INFINITY created table {table_name}, vector size {vectorSize}")
def deleteIdx(self, indexName: str, knowledgebaseId: str):
table_name = f"{indexName}_{knowledgebaseId}"
@ -288,20 +279,21 @@ class InfinityConnection(DocStoreConnection):
"""
def search(
self, selectFields: list[str],
highlightFields: list[str],
condition: dict,
matchExprs: list[MatchExpr],
orderBy: OrderByExpr,
offset: int,
limit: int,
indexNames: str | list[str],
knowledgebaseIds: list[str],
aggFields: list[str] = [],
rank_feature: dict | None = None
self,
selectFields: list[str],
highlightFields: list[str],
condition: dict,
matchExprs: list[MatchExpr],
orderBy: OrderByExpr,
offset: int,
limit: int,
indexNames: str | list[str],
knowledgebaseIds: list[str],
aggFields: list[str] = [],
rank_feature: dict | None = None,
) -> tuple[pd.DataFrame, int]:
"""
TODO: Infinity doesn't provide highlight
BUG: Infinity returns empty for a highlight field if the query string doesn't use that field.
"""
if isinstance(indexNames, str):
indexNames = indexNames.split(",")
@ -438,9 +430,7 @@ class InfinityConnection(DocStoreConnection):
matchExpr.extra_options.copy(),
)
elif isinstance(matchExpr, FusionExpr):
builder = builder.fusion(
matchExpr.method, matchExpr.topn, matchExpr.fusion_params
)
builder = builder.fusion(matchExpr.method, matchExpr.topn, matchExpr.fusion_params)
else:
if filter_cond and len(filter_cond) > 0:
builder.filter(filter_cond)
@ -455,15 +445,13 @@ class InfinityConnection(DocStoreConnection):
self.connPool.release_conn(inf_conn)
res = concat_dataframes(df_list, output)
if matchExprs:
res['Sum'] = res[score_column] + res[PAGERANK_FLD]
res = res.sort_values(by='Sum', ascending=False).reset_index(drop=True).drop(columns=['Sum'])
res["Sum"] = res[score_column] + res[PAGERANK_FLD]
res = res.sort_values(by="Sum", ascending=False).reset_index(drop=True).drop(columns=["Sum"])
res = res.head(limit)
logger.debug(f"INFINITY search final result: {str(res)}")
return res, total_hits_count
def get(
self, chunkId: str, indexName: str, knowledgebaseIds: list[str]
) -> dict | None:
def get(self, chunkId: str, indexName: str, knowledgebaseIds: list[str]) -> dict | None:
inf_conn = self.connPool.get_conn()
db_instance = inf_conn.get_database(self.dbName)
df_list = list()
@ -476,8 +464,7 @@ class InfinityConnection(DocStoreConnection):
try:
table_instance = db_instance.get_table(table_name)
except Exception:
logger.warning(
f"Table not found: {table_name}, this knowledge base isn't created in Infinity. Maybe it is created in other document engine.")
logger.warning(f"Table not found: {table_name}, this knowledge base isn't created in Infinity. Maybe it is created in other document engine.")
continue
kb_res, _ = table_instance.output(["*"]).filter(f"id = '{chunkId}'").to_df()
logger.debug(f"INFINITY get table: {str(table_list)}, result: {str(kb_res)}")
@ -487,9 +474,7 @@ class InfinityConnection(DocStoreConnection):
res_fields = self.getFields(res, res.columns.tolist())
return res_fields.get(chunkId, None)
def insert(
self, documents: list[dict], indexName: str, knowledgebaseId: str = None
) -> list[str]:
def insert(self, documents: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]:
inf_conn = self.connPool.get_conn()
db_instance = inf_conn.get_database(self.dbName)
table_name = f"{indexName}_{knowledgebaseId}"
@ -532,7 +517,7 @@ class InfinityConnection(DocStoreConnection):
d[k] = v
elif re.search(r"_feas$", k):
d[k] = json.dumps(v)
elif k == 'kb_id':
elif k == "kb_id":
if isinstance(d[k], list):
d[k] = d[k][0] # since d[k] is a list, but we need a str
elif k == "position_int":
@ -561,16 +546,14 @@ class InfinityConnection(DocStoreConnection):
logger.debug(f"INFINITY inserted into {table_name} {str_ids}.")
return []
def update(
self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str
) -> bool:
def update(self, condition: dict, newValue: dict, indexName: str, knowledgebaseId: str) -> bool:
# if 'position_int' in newValue:
# logger.info(f"update position_int: {newValue['position_int']}")
inf_conn = self.connPool.get_conn()
db_instance = inf_conn.get_database(self.dbName)
table_name = f"{indexName}_{knowledgebaseId}"
table_instance = db_instance.get_table(table_name)
#if "exists" in condition:
# if "exists" in condition:
# del condition["exists"]
clmns = {}
@ -587,7 +570,7 @@ class InfinityConnection(DocStoreConnection):
newValue[k] = v
elif re.search(r"_feas$", k):
newValue[k] = json.dumps(v)
elif k == 'kb_id':
elif k == "kb_id":
if isinstance(newValue[k], list):
newValue[k] = newValue[k][0] # since d[k] is a list, but we need a str
elif k == "position_int":
@ -612,10 +595,10 @@ class InfinityConnection(DocStoreConnection):
else:
newValue[k] = v
remove_opt = {} # "[k,new_value]": [id_to_update, ...]
remove_opt = {} # "[k,new_value]": [id_to_update, ...]
if removeValue:
col_to_remove = list(removeValue.keys())
row_to_opt = table_instance.output(col_to_remove + ['id']).filter(filter).to_df()
row_to_opt = table_instance.output(col_to_remove + ["id"]).filter(filter).to_df()
logger.debug(f"INFINITY search table {str(table_name)}, filter {filter}, result: {str(row_to_opt[0])}")
row_to_opt = self.getFields(row_to_opt, col_to_remove)
for id, old_v in row_to_opt.items():
@ -632,7 +615,7 @@ class InfinityConnection(DocStoreConnection):
logger.debug(f"INFINITY update table {table_name}, filter {filter}, newValue {newValue}.")
for update_kv, ids in remove_opt.items():
k, v = json.loads(update_kv)
table_instance.update(filter + " AND id in ({0})".format(",".join([f"'{id}'" for id in ids])), {k:"###".join(v)})
table_instance.update(filter + " AND id in ({0})".format(",".join([f"'{id}'" for id in ids])), {k: "###".join(v)})
table_instance.update(filter, newValue)
self.connPool.release_conn(inf_conn)
@ -645,9 +628,7 @@ class InfinityConnection(DocStoreConnection):
try:
table_instance = db_instance.get_table(table_name)
except Exception:
logger.warning(
f"Skipped deleting from table {table_name} since the table doesn't exist."
)
logger.warning(f"Skipped deleting from table {table_name} since the table doesn't exist.")
return 0
filter = equivalent_condition_to_str(condition, table_instance)
logger.debug(f"INFINITY delete table {table_name}, filter {filter}.")
@ -675,32 +656,34 @@ class InfinityConnection(DocStoreConnection):
if not fields:
return {}
fieldsAll = fields.copy()
fieldsAll.append('id')
fieldsAll.append("id")
column_map = {col.lower(): col for col in res.columns}
matched_columns = {column_map[col.lower()]:col for col in set(fieldsAll) if col.lower() in column_map}
matched_columns = {column_map[col.lower()]: col for col in set(fieldsAll) if col.lower() in column_map}
none_columns = [col for col in set(fieldsAll) if col.lower() not in column_map]
res2 = res[matched_columns.keys()]
res2 = res2.rename(columns=matched_columns)
res2.drop_duplicates(subset=['id'], inplace=True)
res2.drop_duplicates(subset=["id"], inplace=True)
for column in res2.columns:
k = column.lower()
if field_keyword(k):
res2[column] = res2[column].apply(lambda v:[kwd for kwd in v.split("###") if kwd])
res2[column] = res2[column].apply(lambda v: [kwd for kwd in v.split("###") if kwd])
elif re.search(r"_feas$", k):
res2[column] = res2[column].apply(lambda v: json.loads(v) if v else {})
elif k == "position_int":
def to_position_int(v):
if v:
arr = [int(hex_val, 16) for hex_val in v.split('_')]
v = [arr[i:i + 5] for i in range(0, len(arr), 5)]
arr = [int(hex_val, 16) for hex_val in v.split("_")]
v = [arr[i : i + 5] for i in range(0, len(arr), 5)]
else:
v = []
return v
res2[column] = res2[column].apply(to_position_int)
elif k in ["page_num_int", "top_int"]:
res2[column] = res2[column].apply(lambda v:[int(hex_val, 16) for hex_val in v.split('_')] if v else [])
res2[column] = res2[column].apply(lambda v: [int(hex_val, 16) for hex_val in v.split("_")] if v else [])
else:
pass
for column in none_columns:
@ -719,23 +702,35 @@ class InfinityConnection(DocStoreConnection):
for i in range(num_rows):
id = column_id[i]
txt = res[fieldnm][i]
if re.search(r"<em>[^<>]+</em>", txt, flags=re.IGNORECASE | re.MULTILINE):
ans[id] = txt
continue
txt = re.sub(r"[\r\n]", " ", txt, flags=re.IGNORECASE | re.MULTILINE)
txts = []
for t in re.split(r"[.?!;\n]", txt):
for w in keywords:
t = re.sub(
r"(^|[ .?/'\"\(\)!,:;-])(%s)([ .?/'\"\(\)!,:;-])"
% re.escape(w),
r"\1<em>\2</em>\3",
t,
flags=re.IGNORECASE | re.MULTILINE,
)
if not re.search(
r"<em>[^<>]+</em>", t, flags=re.IGNORECASE | re.MULTILINE
):
if is_english([t]):
for w in keywords:
t = re.sub(
r"(^|[ .?/'\"\(\)!,:;-])(%s)([ .?/'\"\(\)!,:;-])" % re.escape(w),
r"\1<em>\2</em>\3",
t,
flags=re.IGNORECASE | re.MULTILINE,
)
else:
for w in sorted(keywords, key=len, reverse=True):
t = re.sub(
re.escape(w),
f"<em>{w}</em>",
t,
flags=re.IGNORECASE | re.MULTILINE,
)
if not re.search(r"<em>[^<>]+</em>", t, flags=re.IGNORECASE | re.MULTILINE):
continue
txts.append(t)
ans[id] = "...".join(txts)
if txts:
ans[id] = "...".join(txts)
else:
ans[id] = txt
return ans
def getAggregation(self, res: tuple[pd.DataFrame, int] | pd.DataFrame, fieldnm: str):

6
uv.lock generated
View File

@ -4854,9 +4854,9 @@ wheels = [
[[package]]
name = "pyicu"
version = "2.15.2"
version = "2.15.3"
source = { registry = "https://mirrors.aliyun.com/pypi/simple" }
sdist = { url = "https://mirrors.aliyun.com/pypi/packages/9f/57/9db810ab75133a1c87ac2e327fb59199d78d233f575fbb63bfd3492b769c/pyicu-2.15.2.tar.gz", hash = "sha256:561e77eedff17cec6839f26211f7a5ce3c071b776e8a0ec9d1207f46cbce598f" }
sdist = { url = "https://mirrors.aliyun.com/pypi/packages/88/b0/c8b61bac55424e2ff80e20d7251c3f002baff3c07c34cee3849e3505d8f5/pyicu-2.15.3.tar.gz", hash = "sha256:f32e78e1cb64d0aeb14f027e037a8944861d3114548818a6adf0081ef51aefc3" }
[[package]]
name = "pyjwt"
@ -5514,7 +5514,7 @@ requires-dist = [
{ name = "psycopg2-binary", specifier = "==2.9.9" },
{ name = "pyclipper", specifier = "==1.3.0.post5" },
{ name = "pycryptodomex", specifier = "==3.20.0" },
{ name = "pyicu", specifier = ">=2.13.1,<3.0.0" },
{ name = "pyicu", specifier = ">=2.15.3,<3.0.0" },
{ name = "pymysql", specifier = ">=1.1.1,<2.0.0" },
{ name = "pyodbc", specifier = ">=5.2.0,<6.0.0" },
{ name = "pypdf", specifier = "==6.0.0" },