From b846a0f547252689b7b8fc2a334c01141bd4d905 Mon Sep 17 00:00:00 2001 From: Yongteng Lei Date: Thu, 20 Nov 2025 15:35:09 +0800 Subject: [PATCH] Fix: incorrect retrieval total count with pagination enabled (#11400) ### What problem does this PR solve? Incorrect retrieval total count with pagination enabled. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- rag/nlp/search.py | 150 +++++++++++++++++++++++++++++----------------- 1 file changed, 96 insertions(+), 54 deletions(-) diff --git a/rag/nlp/search.py b/rag/nlp/search.py index 4dbd9945c..a479e5d3f 100644 --- a/rag/nlp/search.py +++ b/rag/nlp/search.py @@ -355,75 +355,102 @@ class Dealer: rag_tokenizer.tokenize(ans).split(), rag_tokenizer.tokenize(inst).split()) - def retrieval(self, question, embd_mdl, tenant_ids, kb_ids, page, page_size, similarity_threshold=0.2, - vector_similarity_weight=0.3, top=1024, doc_ids=None, aggs=True, - rerank_mdl=None, highlight=False, - rank_feature: dict | None = {PAGERANK_FLD: 10}): + def retrieval( + self, + question, + embd_mdl, + tenant_ids, + kb_ids, + page, + page_size, + similarity_threshold=0.2, + vector_similarity_weight=0.3, + top=1024, + doc_ids=None, + aggs=True, + rerank_mdl=None, + highlight=False, + rank_feature: dict | None = {PAGERANK_FLD: 10}, + ): ranks = {"total": 0, "chunks": [], "doc_aggs": {}} if not question: return ranks # Ensure RERANK_LIMIT is multiple of page_size - RERANK_LIMIT = math.ceil(64/page_size) * page_size if page_size>1 else 1 - req = {"kb_ids": kb_ids, "doc_ids": doc_ids, "page": math.ceil(page_size*page/RERANK_LIMIT), "size": RERANK_LIMIT, - "question": question, "vector": True, "topk": top, - "similarity": similarity_threshold, - "available_int": 1} - + RERANK_LIMIT = math.ceil(64 / page_size) * page_size if page_size > 1 else 1 + req = { + "kb_ids": kb_ids, + "doc_ids": doc_ids, + "page": math.ceil(page_size * page / RERANK_LIMIT), + "size": RERANK_LIMIT, + "question": question, + "vector": True, + "topk": top, + "similarity": similarity_threshold, + "available_int": 1, + } if isinstance(tenant_ids, str): tenant_ids = tenant_ids.split(",") - sres = self.search(req, [index_name(tid) for tid in tenant_ids], - kb_ids, embd_mdl, highlight, rank_feature=rank_feature) + sres = self.search(req, [index_name(tid) for tid in tenant_ids], kb_ids, embd_mdl, highlight, rank_feature=rank_feature) if rerank_mdl and sres.total > 0: - sim, tsim, vsim = self.rerank_by_model(rerank_mdl, - sres, question, 1 - vector_similarity_weight, - vector_similarity_weight, - rank_feature=rank_feature) + sim, tsim, vsim = self.rerank_by_model( + rerank_mdl, + sres, + question, + 1 - vector_similarity_weight, + vector_similarity_weight, + rank_feature=rank_feature, + ) else: - lower_case_doc_engine = os.getenv('DOC_ENGINE', 'elasticsearch') - if lower_case_doc_engine in ["elasticsearch","opensearch"]: + lower_case_doc_engine = os.getenv("DOC_ENGINE", "elasticsearch") + if lower_case_doc_engine in ["elasticsearch", "opensearch"]: # ElasticSearch doesn't normalize each way score before fusion. sim, tsim, vsim = self.rerank( - sres, question, 1 - vector_similarity_weight, vector_similarity_weight, - rank_feature=rank_feature) + sres, + question, + 1 - vector_similarity_weight, + vector_similarity_weight, + rank_feature=rank_feature, + ) else: # Don't need rerank here since Infinity normalizes each way score before fusion. sim = [sres.field[id].get("_score", 0.0) for id in sres.ids] - sim = [s if s is not None else 0. for s in sim] + sim = [s if s is not None else 0.0 for s in sim] tsim = sim vsim = sim - # Already paginated in search function - max_pages = RERANK_LIMIT // page_size - page_index = (page % max_pages) - 1 - begin = max(page_index * page_size, 0) - sim = sim[begin : begin + page_size] + sim_np = np.array(sim, dtype=np.float64) - idx = np.argsort(sim_np * -1) + if sim_np.size == 0: + return ranks + + sorted_idx = np.argsort(sim_np * -1) + + valid_idx = [int(i) for i in sorted_idx if sim_np[i] >= similarity_threshold] + filtered_count = len(valid_idx) + ranks["total"] = int(filtered_count) + + if filtered_count == 0: + return ranks + + max_pages = max(RERANK_LIMIT // max(page_size, 1), 1) + page_index = (page - 1) % max_pages + begin = page_index * page_size + end = begin + page_size + page_idx = valid_idx[begin:end] + dim = len(sres.query_vector) vector_column = f"q_{dim}_vec" zero_vector = [0.0] * dim - filtered_count = (sim_np >= similarity_threshold).sum() - ranks["total"] = int(filtered_count) # Convert from np.int64 to Python int otherwise JSON serializable error - for i in idx: - if np.float64(sim[i]) < similarity_threshold: - break + for i in page_idx: id = sres.ids[i] chunk = sres.field[id] dnm = chunk.get("docnm_kwd", "") did = chunk.get("doc_id", "") - if len(ranks["chunks"]) >= page_size: - if aggs: - if dnm not in ranks["doc_aggs"]: - ranks["doc_aggs"][dnm] = {"doc_id": did, "count": 0} - ranks["doc_aggs"][dnm]["count"] += 1 - continue - break - position_int = chunk.get("position_int", []) d = { "chunk_id": id, @@ -434,12 +461,12 @@ class Dealer: "kb_id": chunk["kb_id"], "important_kwd": chunk.get("important_kwd", []), "image_id": chunk.get("img_id", ""), - "similarity": sim[i], - "vector_similarity": vsim[i], - "term_similarity": tsim[i], + "similarity": float(sim_np[i]), + "vector_similarity": float(vsim[i]), + "term_similarity": float(tsim[i]), "vector": chunk.get(vector_column, zero_vector), "positions": position_int, - "doc_type_kwd": chunk.get("doc_type_kwd", "") + "doc_type_kwd": chunk.get("doc_type_kwd", ""), } if highlight and sres.highlight: if id in sres.highlight: @@ -447,15 +474,30 @@ class Dealer: else: d["highlight"] = d["content_with_weight"] ranks["chunks"].append(d) - if dnm not in ranks["doc_aggs"]: - ranks["doc_aggs"][dnm] = {"doc_id": did, "count": 0} - ranks["doc_aggs"][dnm]["count"] += 1 - ranks["doc_aggs"] = [{"doc_name": k, - "doc_id": v["doc_id"], - "count": v["count"]} for k, - v in sorted(ranks["doc_aggs"].items(), - key=lambda x: x[1]["count"] * -1)] - ranks["chunks"] = ranks["chunks"][:page_size] + + if aggs: + for i in valid_idx: + id = sres.ids[i] + chunk = sres.field[id] + dnm = chunk.get("docnm_kwd", "") + did = chunk.get("doc_id", "") + if dnm not in ranks["doc_aggs"]: + ranks["doc_aggs"][dnm] = {"doc_id": did, "count": 0} + ranks["doc_aggs"][dnm]["count"] += 1 + + ranks["doc_aggs"] = [ + { + "doc_name": k, + "doc_id": v["doc_id"], + "count": v["count"], + } + for k, v in sorted( + ranks["doc_aggs"].items(), + key=lambda x: x[1]["count"] * -1, + ) + ] + else: + ranks["doc_aggs"] = [] return ranks @@ -564,7 +606,7 @@ class Dealer: ids = relevant_chunks_with_toc(query, toc, chat_mdl, topn*2) if not ids: return chunks - + vector_size = 1024 id2idx = {ck["chunk_id"]: i for i, ck in enumerate(chunks)} for cid, sim in ids: