Integration with Infinity (#2894)

### What problem does this PR solve?

Integration with Infinity

- Replaced ELASTICSEARCH with dataStoreConn
- Renamed deleteByQuery with delete
- Renamed bulk to upsertBulk
- getHighlight, getAggregation
- Fix KGSearch.search
- Moved Dealer.sql_retrieval to es_conn.py


### Type of change

- [x] Refactoring
This commit is contained in:
Zhichang Yu
2024-11-12 14:59:41 +08:00
committed by GitHub
parent 00b6000b76
commit f4c52371ab
42 changed files with 2647 additions and 1878 deletions

View File

@ -30,7 +30,6 @@ from api.db.services.task_service import TaskService, queue_tasks
from api.utils.api_utils import server_error_response
from api.utils.api_utils import get_result, get_error_data_result
from io import BytesIO
from elasticsearch_dsl import Q
from flask import request, send_file
from api.db import FileSource, TaskStatus, FileType
from api.db.db_models import File
@ -42,7 +41,7 @@ from api.settings import RetCode, retrievaler
from api.utils.api_utils import construct_json_result, get_parser_config
from rag.nlp import search
from rag.utils import rmSpace
from rag.utils.es_conn import ELASTICSEARCH
from api.settings import docStoreConn
from rag.utils.storage_factory import STORAGE_IMPL
import os
@ -293,9 +292,7 @@ def update_doc(tenant_id, dataset_id, document_id):
)
if not e:
return get_error_data_result(message="Document not found!")
ELASTICSEARCH.deleteByQuery(
Q("match", doc_id=doc.id), idxnm=search.index_name(tenant_id)
)
docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), dataset_id)
return get_result()
@ -647,9 +644,7 @@ def parse(tenant_id, dataset_id):
info["chunk_num"] = 0
info["token_num"] = 0
DocumentService.update_by_id(id, info)
ELASTICSEARCH.deleteByQuery(
Q("match", doc_id=id), idxnm=search.index_name(tenant_id)
)
docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), dataset_id)
TaskService.filter_delete([Task.doc_id == id])
e, doc = DocumentService.get_by_id(id)
doc = doc.to_dict()
@ -713,9 +708,7 @@ def stop_parsing(tenant_id, dataset_id):
)
info = {"run": "2", "progress": 0, "chunk_num": 0}
DocumentService.update_by_id(id, info)
ELASTICSEARCH.deleteByQuery(
Q("match", doc_id=id), idxnm=search.index_name(tenant_id)
)
docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), dataset_id)
return get_result()
@ -812,7 +805,6 @@ def list_chunks(tenant_id, dataset_id, document_id):
"question": question,
"sort": True,
}
sres = retrievaler.search(query, search.index_name(tenant_id), highlight=True)
key_mapping = {
"chunk_num": "chunk_count",
"kb_id": "dataset_id",
@ -833,51 +825,56 @@ def list_chunks(tenant_id, dataset_id, document_id):
renamed_doc[new_key] = value
if key == "run":
renamed_doc["run"] = run_mapping.get(str(value))
res = {"total": sres.total, "chunks": [], "doc": renamed_doc}
origin_chunks = []
sign = 0
for id in sres.ids:
d = {
"chunk_id": id,
"content_with_weight": (
rmSpace(sres.highlight[id])
if question and id in sres.highlight
else sres.field[id].get("content_with_weight", "")
),
"doc_id": sres.field[id]["doc_id"],
"docnm_kwd": sres.field[id]["docnm_kwd"],
"important_kwd": sres.field[id].get("important_kwd", []),
"img_id": sres.field[id].get("img_id", ""),
"available_int": sres.field[id].get("available_int", 1),
"positions": sres.field[id].get("position_int", "").split("\t"),
}
if len(d["positions"]) % 5 == 0:
poss = []
for i in range(0, len(d["positions"]), 5):
poss.append(
[
float(d["positions"][i]),
float(d["positions"][i + 1]),
float(d["positions"][i + 2]),
float(d["positions"][i + 3]),
float(d["positions"][i + 4]),
]
)
d["positions"] = poss
origin_chunks.append(d)
res = {"total": 0, "chunks": [], "doc": renamed_doc}
origin_chunks = []
if docStoreConn.indexExist(search.index_name(tenant_id), dataset_id):
sres = retrievaler.search(query, search.index_name(tenant_id), [dataset_id], emb_mdl=None, highlight=True)
res["total"] = sres.total
sign = 0
for id in sres.ids:
d = {
"id": id,
"content_with_weight": (
rmSpace(sres.highlight[id])
if question and id in sres.highlight
else sres.field[id].get("content_with_weight", "")
),
"doc_id": sres.field[id]["doc_id"],
"docnm_kwd": sres.field[id]["docnm_kwd"],
"important_kwd": sres.field[id].get("important_kwd", []),
"img_id": sres.field[id].get("img_id", ""),
"available_int": sres.field[id].get("available_int", 1),
"positions": sres.field[id].get("position_int", "").split("\t"),
}
if len(d["positions"]) % 5 == 0:
poss = []
for i in range(0, len(d["positions"]), 5):
poss.append(
[
float(d["positions"][i]),
float(d["positions"][i + 1]),
float(d["positions"][i + 2]),
float(d["positions"][i + 3]),
float(d["positions"][i + 4]),
]
)
d["positions"] = poss
origin_chunks.append(d)
if req.get("id"):
if req.get("id") == id:
origin_chunks.clear()
origin_chunks.append(d)
sign = 1
break
if req.get("id"):
if req.get("id") == id:
origin_chunks.clear()
origin_chunks.append(d)
sign = 1
break
if req.get("id"):
if sign == 0:
return get_error_data_result(f"Can't find this chunk {req.get('id')}")
if sign == 0:
return get_error_data_result(f"Can't find this chunk {req.get('id')}")
for chunk in origin_chunks:
key_mapping = {
"chunk_id": "id",
"id": "id",
"content_with_weight": "content",
"doc_id": "document_id",
"important_kwd": "important_keywords",
@ -996,9 +993,9 @@ def add_chunk(tenant_id, dataset_id, document_id):
)
d["create_time"] = str(datetime.datetime.now()).replace("T", " ")[:19]
d["create_timestamp_flt"] = datetime.datetime.now().timestamp()
d["kb_id"] = [doc.kb_id]
d["kb_id"] = dataset_id
d["docnm_kwd"] = doc.name
d["doc_id"] = doc.id
d["doc_id"] = document_id
embd_id = DocumentService.get_embd_id(document_id)
embd_mdl = TenantLLMService.model_instance(
tenant_id, LLMType.EMBEDDING.value, embd_id
@ -1006,14 +1003,12 @@ def add_chunk(tenant_id, dataset_id, document_id):
v, c = embd_mdl.encode([doc.name, req["content"]])
v = 0.1 * v[0] + 0.9 * v[1]
d["q_%d_vec" % len(v)] = v.tolist()
ELASTICSEARCH.upsert([d], search.index_name(tenant_id))
docStoreConn.insert([d], search.index_name(tenant_id), dataset_id)
DocumentService.increment_chunk_num(doc.id, doc.kb_id, c, 1, 0)
d["chunk_id"] = chunk_id
d["kb_id"] = doc.kb_id
# rename keys
key_mapping = {
"chunk_id": "id",
"id": "id",
"content_with_weight": "content",
"doc_id": "document_id",
"important_kwd": "important_keywords",
@ -1079,36 +1074,16 @@ def rm_chunk(tenant_id, dataset_id, document_id):
"""
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
doc = DocumentService.query(id=document_id, kb_id=dataset_id)
if not doc:
return get_error_data_result(
message=f"You don't own the document {document_id}."
)
doc = doc[0]
req = request.json
if not req.get("chunk_ids"):
return get_error_data_result("`chunk_ids` is required")
query = {"doc_ids": [doc.id], "page": 1, "size": 1024, "question": "", "sort": True}
sres = retrievaler.search(query, search.index_name(tenant_id), highlight=True)
if not req:
chunk_ids = None
else:
chunk_ids = req.get("chunk_ids")
if not chunk_ids:
chunk_list = sres.ids
else:
chunk_list = chunk_ids
for chunk_id in chunk_list:
if chunk_id not in sres.ids:
return get_error_data_result(f"Chunk {chunk_id} not found")
if not ELASTICSEARCH.deleteByQuery(
Q("ids", values=chunk_list), search.index_name(tenant_id)
):
return get_error_data_result(message="Index updating failure")
deleted_chunk_ids = chunk_list
chunk_number = len(deleted_chunk_ids)
DocumentService.decrement_chunk_num(doc.id, doc.kb_id, 1, chunk_number, 0)
return get_result()
condition = {"doc_id": document_id}
if "chunk_ids" in req:
condition["id"] = req["chunk_ids"]
chunk_number = docStoreConn.delete(condition, search.index_name(tenant_id), dataset_id)
if chunk_number != 0:
DocumentService.decrement_chunk_num(document_id, dataset_id, 1, chunk_number, 0)
if "chunk_ids" in req and chunk_number != len(req["chunk_ids"]):
return get_error_data_result(message=f"rm_chunk deleted chunks {chunk_number}, expect {len(req["chunk_ids"])}")
return get_result(message=f"deleted {chunk_number} chunks")
@manager.route(
@ -1168,9 +1143,8 @@ def update_chunk(tenant_id, dataset_id, document_id, chunk_id):
schema:
type: object
"""
try:
res = ELASTICSEARCH.get(chunk_id, search.index_name(tenant_id))
except Exception:
chunk = docStoreConn.get(chunk_id, search.index_name(tenant_id), [dataset_id])
if chunk is None:
return get_error_data_result(f"Can't find this chunk {chunk_id}")
if not KnowledgebaseService.accessible(kb_id=dataset_id, user_id=tenant_id):
return get_error_data_result(message=f"You don't own the dataset {dataset_id}.")
@ -1180,19 +1154,12 @@ def update_chunk(tenant_id, dataset_id, document_id, chunk_id):
message=f"You don't own the document {document_id}."
)
doc = doc[0]
query = {
"doc_ids": [document_id],
"page": 1,
"size": 1024,
"question": "",
"sort": True,
}
sres = retrievaler.search(query, search.index_name(tenant_id), highlight=True)
if chunk_id not in sres.ids:
return get_error_data_result(f"You don't own the chunk {chunk_id}")
req = request.json
content = res["_source"].get("content_with_weight")
d = {"id": chunk_id, "content_with_weight": req.get("content", content)}
if "content" in req:
content = req["content"]
else:
content = chunk.get("content_with_weight", "")
d = {"id": chunk_id, "content_with_weight": content}
d["content_ltks"] = rag_tokenizer.tokenize(d["content_with_weight"])
d["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(d["content_ltks"])
if "important_keywords" in req:
@ -1220,7 +1187,7 @@ def update_chunk(tenant_id, dataset_id, document_id, chunk_id):
v, c = embd_mdl.encode([doc.name, d["content_with_weight"]])
v = 0.1 * v[0] + 0.9 * v[1] if doc.parser_id != ParserType.QA else v[1]
d["q_%d_vec" % len(v)] = v.tolist()
ELASTICSEARCH.upsert([d], search.index_name(tenant_id))
docStoreConn.update({"id": chunk_id}, d, search.index_name(tenant_id), dataset_id)
return get_result()