mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 12:32:30 +08:00
Refa: link connector to KB. (#10991)
### What problem does this PR solve? #10953 ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@ -40,9 +40,9 @@ def set_connector():
|
||||
"source": req["source"],
|
||||
"input_type": InputType.POLL,
|
||||
"config": req["config"],
|
||||
"refresh_freq": int(req["refresh_freq"]),
|
||||
"prune_freq": int(req["prune_freq"]),
|
||||
"timeout_secs": int(req["timeout_secs"]),
|
||||
"refresh_freq": int(req.get("refresh_freq", 30)),
|
||||
"prune_freq": int(req.get("prune_freq", 720)),
|
||||
"timeout_secs": int(req.get("timeout_secs", 60*29)),
|
||||
"status": TaskStatus.SCHEDULE
|
||||
}
|
||||
conn["status"] = TaskStatus.SCHEDULE
|
||||
|
||||
@ -22,6 +22,7 @@ from flask_login import login_required, current_user
|
||||
import numpy as np
|
||||
|
||||
from api.db import LLMType
|
||||
from api.db.services.connector_service import Connector2KbService
|
||||
from api.db.services.llm_service import LLMBundle
|
||||
from api.db.services.document_service import DocumentService, queue_raptor_o_graphrag_tasks
|
||||
from api.db.services.file2document_service import File2DocumentService
|
||||
@ -147,6 +148,8 @@ def detail():
|
||||
return get_data_error_result(
|
||||
message="Can't find this knowledgebase!")
|
||||
kb["size"] = DocumentService.get_total_size_by_kb_id(kb_id=kb["id"],keywords="", run_status=[], types=[])
|
||||
kb["connectors"] = Connector2KbService.list_connectors(kb_id)
|
||||
|
||||
for key in ["graphrag_task_finish_at", "raptor_task_finish_at", "mindmap_task_finish_at"]:
|
||||
if finish_at := kb.get(key):
|
||||
kb[key] = finish_at.strftime("%Y-%m-%d %H:%M:%S")
|
||||
@ -720,26 +723,31 @@ def delete_kb_task():
|
||||
if not pipeline_task_type or pipeline_task_type not in [PipelineTaskType.GRAPH_RAG, PipelineTaskType.RAPTOR, PipelineTaskType.MINDMAP]:
|
||||
return get_error_data_result(message="Invalid task type")
|
||||
|
||||
def cancel_task(task_id):
|
||||
REDIS_CONN.set(f"{task_id}-cancel", "x")
|
||||
|
||||
match pipeline_task_type:
|
||||
case PipelineTaskType.GRAPH_RAG:
|
||||
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), kb_id)
|
||||
kb_task_id_field = "graphrag_task_id"
|
||||
task_id = kb.graphrag_task_id
|
||||
kb_task_finish_at = "graphrag_task_finish_at"
|
||||
cancel_task(task_id)
|
||||
settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), kb_id)
|
||||
case PipelineTaskType.RAPTOR:
|
||||
kb_task_id_field = "raptor_task_id"
|
||||
task_id = kb.raptor_task_id
|
||||
kb_task_finish_at = "raptor_task_finish_at"
|
||||
cancel_task(task_id)
|
||||
settings.docStoreConn.delete({"raptor_kwd": ["raptor"]}, search.index_name(kb.tenant_id), kb_id)
|
||||
case PipelineTaskType.MINDMAP:
|
||||
kb_task_id_field = "mindmap_task_id"
|
||||
task_id = kb.mindmap_task_id
|
||||
kb_task_finish_at = "mindmap_task_finish_at"
|
||||
cancel_task(task_id)
|
||||
case _:
|
||||
return get_error_data_result(message="Internal Error: Invalid task type")
|
||||
|
||||
def cancel_task(task_id):
|
||||
REDIS_CONN.set(f"{task_id}-cancel", "x")
|
||||
cancel_task(task_id)
|
||||
|
||||
|
||||
ok = KnowledgebaseService.update_by_id(kb_id, {kb_task_id_field: "", kb_task_finish_at: None})
|
||||
if not ok:
|
||||
@ -883,4 +891,16 @@ def check_embedding():
|
||||
}
|
||||
if summary["avg_cos_sim"] > 0.99:
|
||||
return get_json_result(data={"summary": summary, "results": results})
|
||||
return get_json_result(code=RetCode.NOT_EFFECTIVE, message="failed", data={"summary": summary, "results": results})
|
||||
return get_json_result(code=settings.RetCode.NOT_EFFECTIVE, message="failed", data={"summary": summary, "results": results})
|
||||
|
||||
|
||||
@manager.route("/<kb_id>/link", methods=["POST"]) # noqa: F821
|
||||
@validate_request("connector_ids")
|
||||
@login_required
|
||||
def link_connector(kb_id):
|
||||
req = request.json
|
||||
errors = Connector2KbService.link_connectors(kb_id, req["connector_ids"], current_user.id)
|
||||
if errors:
|
||||
return get_json_result(data=False, message=errors, code=settings.RetCode.SERVER_ERROR)
|
||||
return get_json_result(data=True)
|
||||
|
||||
|
||||
@ -57,9 +57,9 @@ class ConnectorService(CommonService):
|
||||
cls.model.source,
|
||||
cls.model.status
|
||||
]
|
||||
return cls.model.select(*fields).where(
|
||||
return list(cls.model.select(*fields).where(
|
||||
cls.model.tenant_id == tenant_id
|
||||
).dicts()
|
||||
).dicts())
|
||||
|
||||
|
||||
class SyncLogsService(CommonService):
|
||||
@ -218,3 +218,45 @@ class Connector2KbService(CommonService):
|
||||
errs.append(err)
|
||||
return "\n".join(errs)
|
||||
|
||||
@classmethod
|
||||
def link_connectors(cls, kb_id:str, connector_ids: list[str], tenant_id:str):
|
||||
arr = cls.query(kb_id=kb_id)
|
||||
old_conn_ids = [a.connector_id for a in arr]
|
||||
for conn_id in connector_ids:
|
||||
if conn_id in old_conn_ids:
|
||||
continue
|
||||
cls.save(**{
|
||||
"id": get_uuid(),
|
||||
"connector_id": conn_id,
|
||||
"kb_id": kb_id
|
||||
})
|
||||
SyncLogsService.schedule(conn_id, kb_id, reindex=True)
|
||||
|
||||
errs = []
|
||||
for conn_id in old_conn_ids:
|
||||
if conn_id in connector_ids:
|
||||
continue
|
||||
cls.filter_delete([cls.model.kb_id==kb_id, cls.model.connector_id==conn_id])
|
||||
e, conn = ConnectorService.get_by_id(conn_id)
|
||||
SyncLogsService.filter_update([SyncLogs.connector_id==conn_id, SyncLogs.kb_id==kb_id, SyncLogs.status==TaskStatus.SCHEDULE], {"status": TaskStatus.CANCEL})
|
||||
docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}")
|
||||
err = FileService.delete_docs([d.id for d in docs], tenant_id)
|
||||
if err:
|
||||
errs.append(err)
|
||||
return "\n".join(errs)
|
||||
|
||||
@classmethod
|
||||
def list_connectors(cls, kb_id):
|
||||
fields = [
|
||||
Connector.id,
|
||||
Connector.source,
|
||||
Connector.name,
|
||||
Connector.status
|
||||
]
|
||||
return list(cls.model.select(*fields)\
|
||||
.join(Connector, on=(cls.model.connector_id==Connector.id))\
|
||||
.where(
|
||||
cls.model.kb_id==kb_id
|
||||
).dicts()
|
||||
)
|
||||
|
||||
|
||||
@ -30,7 +30,10 @@ async def image2id(d: dict, storage_put_func: partial, objname:str, bucket:str="
|
||||
from io import BytesIO
|
||||
import trio
|
||||
from rag.svr.task_executor import minio_limiter
|
||||
if not d.get("image"):
|
||||
if "image" not in d:
|
||||
return
|
||||
if not d["image"]:
|
||||
del d["image"]
|
||||
return
|
||||
|
||||
with BytesIO() as output_buffer:
|
||||
|
||||
@ -701,7 +701,8 @@ async def run_raptor_for_kb(row, kb_parser_config, chat_mdl, embd_mdl, vector_si
|
||||
"doc_id": fake_doc_id,
|
||||
"kb_id": [str(row["kb_id"])],
|
||||
"docnm_kwd": row["name"],
|
||||
"title_tks": rag_tokenizer.tokenize(row["name"])
|
||||
"title_tks": rag_tokenizer.tokenize(row["name"]),
|
||||
"raptor_kwd": "raptor"
|
||||
}
|
||||
if row["pagerank"]:
|
||||
doc[PAGERANK_FLD] = int(row["pagerank"])
|
||||
|
||||
Reference in New Issue
Block a user