Feat: Alter flask to Quart for async API serving. (#11275)

### What problem does this PR solve?

#11277

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Kevin Hu
2025-11-18 17:05:16 +08:00
committed by GitHub
parent c2b7c305fa
commit d1716d865a
49 changed files with 4120 additions and 3888 deletions

View File

@ -18,11 +18,9 @@ import logging
import random
import re
from flask import request
from flask_login import login_required, current_user
from quart import request
import numpy as np
from api.db.services.connector_service import Connector2KbService
from api.db.services.llm_service import LLMBundle
from api.db.services.document_service import DocumentService, queue_raptor_o_graphrag_tasks
@ -31,7 +29,8 @@ from api.db.services.file_service import FileService
from api.db.services.pipeline_operation_log_service import PipelineOperationLogService
from api.db.services.task_service import TaskService, GRAPH_RAPTOR_FAKE_DOC_ID
from api.db.services.user_service import TenantService, UserTenantService
from api.utils.api_utils import get_error_data_result, server_error_response, get_data_error_result, validate_request, not_allowed_parameters
from api.utils.api_utils import get_error_data_result, server_error_response, get_data_error_result, validate_request, not_allowed_parameters, \
request_json
from api.db import VALID_FILE_TYPES
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.db_models import File
@ -42,27 +41,28 @@ from rag.utils.redis_conn import REDIS_CONN
from rag.utils.doc_store_conn import OrderByExpr
from common.constants import RetCode, PipelineTaskType, StatusEnum, VALID_TASK_STATUS, FileSource, LLMType, PAGERANK_FLD
from common import settings
from api.apps import login_required, current_user
@manager.route('/create', methods=['post']) # noqa: F821
@login_required
@validate_request("name")
def create():
req = request.json
req = KnowledgebaseService.create_with_name(
async def create():
req = await request_json()
e, res = KnowledgebaseService.create_with_name(
name = req.pop("name", None),
tenant_id = current_user.id,
parser_id = req.pop("parser_id", None),
**req
)
code = req.get("code")
if code:
return get_data_error_result(code=code, message=req.get("message"))
if not e:
return res
try:
if not KnowledgebaseService.save(**req):
if not KnowledgebaseService.save(**res):
return get_data_error_result()
return get_json_result(data={"kb_id":req["id"]})
return get_json_result(data={"kb_id":res["id"]})
except Exception as e:
return server_error_response(e)
@ -71,8 +71,8 @@ def create():
@login_required
@validate_request("kb_id", "name", "description", "parser_id")
@not_allowed_parameters("id", "tenant_id", "created_by", "create_time", "update_time", "create_date", "update_date", "created_by")
def update():
req = request.json
async def update():
req = await request_json()
if not isinstance(req["name"], str):
return get_data_error_result(message="Dataset name must be string.")
if req["name"].strip() == "":
@ -170,18 +170,19 @@ def detail():
@manager.route('/list', methods=['POST']) # noqa: F821
@login_required
def list_kbs():
keywords = request.args.get("keywords", "")
page_number = int(request.args.get("page", 0))
items_per_page = int(request.args.get("page_size", 0))
parser_id = request.args.get("parser_id")
orderby = request.args.get("orderby", "create_time")
if request.args.get("desc", "true").lower() == "false":
async def list_kbs():
args = request.args
keywords = args.get("keywords", "")
page_number = int(args.get("page", 0))
items_per_page = int(args.get("page_size", 0))
parser_id = args.get("parser_id")
orderby = args.get("orderby", "create_time")
if args.get("desc", "true").lower() == "false":
desc = False
else:
desc = True
req = request.get_json()
req = await request_json()
owner_ids = req.get("owner_ids", [])
try:
if not owner_ids:
@ -203,11 +204,12 @@ def list_kbs():
except Exception as e:
return server_error_response(e)
@manager.route('/rm', methods=['post']) # noqa: F821
@login_required
@validate_request("kb_id")
def rm():
req = request.json
async def rm():
req = await request_json()
if not KnowledgebaseService.accessible4deletion(req["kb_id"], current_user.id):
return get_json_result(
data=False,
@ -283,8 +285,8 @@ def list_tags_from_kbs():
@manager.route('/<kb_id>/rm_tags', methods=['POST']) # noqa: F821
@login_required
def rm_tags(kb_id):
req = request.json
async def rm_tags(kb_id):
req = await request_json()
if not KnowledgebaseService.accessible(kb_id, current_user.id):
return get_json_result(
data=False,
@ -303,8 +305,8 @@ def rm_tags(kb_id):
@manager.route('/<kb_id>/rename_tag', methods=['POST']) # noqa: F821
@login_required
def rename_tags(kb_id):
req = request.json
async def rename_tags(kb_id):
req = await request_json()
if not KnowledgebaseService.accessible(kb_id, current_user.id):
return get_json_result(
data=False,
@ -407,7 +409,7 @@ def get_basic_info():
@manager.route("/list_pipeline_logs", methods=["POST"]) # noqa: F821
@login_required
def list_pipeline_logs():
async def list_pipeline_logs():
kb_id = request.args.get("kb_id")
if not kb_id:
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
@ -426,7 +428,7 @@ def list_pipeline_logs():
if create_date_to > create_date_from:
return get_data_error_result(message="Create data filter is abnormal.")
req = request.get_json()
req = await request_json()
operation_status = req.get("operation_status", [])
if operation_status:
@ -451,7 +453,7 @@ def list_pipeline_logs():
@manager.route("/list_pipeline_dataset_logs", methods=["POST"]) # noqa: F821
@login_required
def list_pipeline_dataset_logs():
async def list_pipeline_dataset_logs():
kb_id = request.args.get("kb_id")
if not kb_id:
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
@ -468,7 +470,7 @@ def list_pipeline_dataset_logs():
if create_date_to > create_date_from:
return get_data_error_result(message="Create data filter is abnormal.")
req = request.get_json()
req = await request_json()
operation_status = req.get("operation_status", [])
if operation_status:
@ -485,12 +487,12 @@ def list_pipeline_dataset_logs():
@manager.route("/delete_pipeline_logs", methods=["POST"]) # noqa: F821
@login_required
def delete_pipeline_logs():
async def delete_pipeline_logs():
kb_id = request.args.get("kb_id")
if not kb_id:
return get_json_result(data=False, message='Lack of "KB ID"', code=RetCode.ARGUMENT_ERROR)
req = request.get_json()
req = await request_json()
log_ids = req.get("log_ids", [])
PipelineOperationLogService.delete_by_ids(log_ids)
@ -514,8 +516,8 @@ def pipeline_log_detail():
@manager.route("/run_graphrag", methods=["POST"]) # noqa: F821
@login_required
def run_graphrag():
req = request.json
async def run_graphrag():
req = await request_json()
kb_id = req.get("kb_id", "")
if not kb_id:
@ -583,8 +585,8 @@ def trace_graphrag():
@manager.route("/run_raptor", methods=["POST"]) # noqa: F821
@login_required
def run_raptor():
req = request.json
async def run_raptor():
req = await request_json()
kb_id = req.get("kb_id", "")
if not kb_id:
@ -652,8 +654,8 @@ def trace_raptor():
@manager.route("/run_mindmap", methods=["POST"]) # noqa: F821
@login_required
def run_mindmap():
req = request.json
async def run_mindmap():
req = await request_json()
kb_id = req.get("kb_id", "")
if not kb_id:
@ -768,7 +770,7 @@ def delete_kb_task():
@manager.route("/check_embedding", methods=["post"]) # noqa: F821
@login_required
def check_embedding():
async def check_embedding():
def _guess_vec_field(src: dict) -> str | None:
for k in src or {}:
@ -859,7 +861,7 @@ def check_embedding():
def _clean(s: str) -> str:
s = re.sub(r"</?(table|td|caption|tr|th)( [^<>]{0,12})?>", " ", s or "")
return s if s else "None"
req = request.json
req = await request_json()
kb_id = req.get("kb_id", "")
embd_id = req.get("embd_id", "")
n = int(req.get("check_num", 5))