diff --git a/api/apps/connector_app.py b/api/apps/connector_app.py index dc45c0c0e..a2ae4465f 100644 --- a/api/apps/connector_app.py +++ b/api/apps/connector_app.py @@ -40,9 +40,9 @@ def set_connector(): "source": req["source"], "input_type": InputType.POLL, "config": req["config"], - "refresh_freq": int(req["refresh_freq"]), - "prune_freq": int(req["prune_freq"]), - "timeout_secs": int(req["timeout_secs"]), + "refresh_freq": int(req.get("refresh_freq", 30)), + "prune_freq": int(req.get("prune_freq", 720)), + "timeout_secs": int(req.get("timeout_secs", 60*29)), "status": TaskStatus.SCHEDULE } conn["status"] = TaskStatus.SCHEDULE diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index a4f96a032..ff3a102f7 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -22,6 +22,7 @@ from flask_login import login_required, current_user import numpy as np from api.db import LLMType +from api.db.services.connector_service import Connector2KbService from api.db.services.llm_service import LLMBundle from api.db.services.document_service import DocumentService, queue_raptor_o_graphrag_tasks from api.db.services.file2document_service import File2DocumentService @@ -147,6 +148,8 @@ def detail(): return get_data_error_result( message="Can't find this knowledgebase!") kb["size"] = DocumentService.get_total_size_by_kb_id(kb_id=kb["id"],keywords="", run_status=[], types=[]) + kb["connectors"] = Connector2KbService.list_connectors(kb_id) + for key in ["graphrag_task_finish_at", "raptor_task_finish_at", "mindmap_task_finish_at"]: if finish_at := kb.get(key): kb[key] = finish_at.strftime("%Y-%m-%d %H:%M:%S") @@ -720,26 +723,31 @@ def delete_kb_task(): if not pipeline_task_type or pipeline_task_type not in [PipelineTaskType.GRAPH_RAG, PipelineTaskType.RAPTOR, PipelineTaskType.MINDMAP]: return get_error_data_result(message="Invalid task type") + def cancel_task(task_id): + REDIS_CONN.set(f"{task_id}-cancel", "x") + match pipeline_task_type: case PipelineTaskType.GRAPH_RAG: - settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), kb_id) kb_task_id_field = "graphrag_task_id" task_id = kb.graphrag_task_id kb_task_finish_at = "graphrag_task_finish_at" + cancel_task(task_id) + settings.docStoreConn.delete({"knowledge_graph_kwd": ["graph", "subgraph", "entity", "relation"]}, search.index_name(kb.tenant_id), kb_id) case PipelineTaskType.RAPTOR: kb_task_id_field = "raptor_task_id" task_id = kb.raptor_task_id kb_task_finish_at = "raptor_task_finish_at" + cancel_task(task_id) + settings.docStoreConn.delete({"raptor_kwd": ["raptor"]}, search.index_name(kb.tenant_id), kb_id) case PipelineTaskType.MINDMAP: kb_task_id_field = "mindmap_task_id" task_id = kb.mindmap_task_id kb_task_finish_at = "mindmap_task_finish_at" + cancel_task(task_id) case _: return get_error_data_result(message="Internal Error: Invalid task type") - def cancel_task(task_id): - REDIS_CONN.set(f"{task_id}-cancel", "x") - cancel_task(task_id) + ok = KnowledgebaseService.update_by_id(kb_id, {kb_task_id_field: "", kb_task_finish_at: None}) if not ok: @@ -883,4 +891,16 @@ def check_embedding(): } if summary["avg_cos_sim"] > 0.99: return get_json_result(data={"summary": summary, "results": results}) - return get_json_result(code=RetCode.NOT_EFFECTIVE, message="failed", data={"summary": summary, "results": results}) + return get_json_result(code=settings.RetCode.NOT_EFFECTIVE, message="failed", data={"summary": summary, "results": results}) + + +@manager.route("//link", methods=["POST"]) # noqa: F821 +@validate_request("connector_ids") +@login_required +def link_connector(kb_id): + req = request.json + errors = Connector2KbService.link_connectors(kb_id, req["connector_ids"], current_user.id) + if errors: + return get_json_result(data=False, message=errors, code=settings.RetCode.SERVER_ERROR) + return get_json_result(data=True) + diff --git a/api/db/services/connector_service.py b/api/db/services/connector_service.py index ccf855d23..c51cb2e23 100644 --- a/api/db/services/connector_service.py +++ b/api/db/services/connector_service.py @@ -57,9 +57,9 @@ class ConnectorService(CommonService): cls.model.source, cls.model.status ] - return cls.model.select(*fields).where( + return list(cls.model.select(*fields).where( cls.model.tenant_id == tenant_id - ).dicts() + ).dicts()) class SyncLogsService(CommonService): @@ -218,3 +218,45 @@ class Connector2KbService(CommonService): errs.append(err) return "\n".join(errs) + @classmethod + def link_connectors(cls, kb_id:str, connector_ids: list[str], tenant_id:str): + arr = cls.query(kb_id=kb_id) + old_conn_ids = [a.connector_id for a in arr] + for conn_id in connector_ids: + if conn_id in old_conn_ids: + continue + cls.save(**{ + "id": get_uuid(), + "connector_id": conn_id, + "kb_id": kb_id + }) + SyncLogsService.schedule(conn_id, kb_id, reindex=True) + + errs = [] + for conn_id in old_conn_ids: + if conn_id in connector_ids: + continue + cls.filter_delete([cls.model.kb_id==kb_id, cls.model.connector_id==conn_id]) + e, conn = ConnectorService.get_by_id(conn_id) + SyncLogsService.filter_update([SyncLogs.connector_id==conn_id, SyncLogs.kb_id==kb_id, SyncLogs.status==TaskStatus.SCHEDULE], {"status": TaskStatus.CANCEL}) + docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}") + err = FileService.delete_docs([d.id for d in docs], tenant_id) + if err: + errs.append(err) + return "\n".join(errs) + + @classmethod + def list_connectors(cls, kb_id): + fields = [ + Connector.id, + Connector.source, + Connector.name, + Connector.status + ] + return list(cls.model.select(*fields)\ + .join(Connector, on=(cls.model.connector_id==Connector.id))\ + .where( + cls.model.kb_id==kb_id + ).dicts() + ) + diff --git a/common/base64_image.py b/common/base64_image.py index e8e622408..15794944c 100644 --- a/common/base64_image.py +++ b/common/base64_image.py @@ -30,7 +30,10 @@ async def image2id(d: dict, storage_put_func: partial, objname:str, bucket:str=" from io import BytesIO import trio from rag.svr.task_executor import minio_limiter - if not d.get("image"): + if "image" not in d: + return + if not d["image"]: + del d["image"] return with BytesIO() as output_buffer: diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 5bc267ecd..fb3efac10 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -701,7 +701,8 @@ async def run_raptor_for_kb(row, kb_parser_config, chat_mdl, embd_mdl, vector_si "doc_id": fake_doc_id, "kb_id": [str(row["kb_id"])], "docnm_kwd": row["name"], - "title_tks": rag_tokenizer.tokenize(row["name"]) + "title_tks": rag_tokenizer.tokenize(row["name"]), + "raptor_kwd": "raptor" } if row["pagerank"]: doc[PAGERANK_FLD] = int(row["pagerank"])