Feat: add splitter (#10161)

### What problem does this PR solve?


### Type of change
- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Lynn <lynn_inf@hotmail.com>
Co-authored-by: chanx <1243304602@qq.com>
Co-authored-by: balibabu <cike8899@users.noreply.github.com>
Co-authored-by: 纷繁下的无奈 <zhileihuang@126.com>
Co-authored-by: huangzl <huangzl@shinemo.com>
Co-authored-by: writinwaters <93570324+writinwaters@users.noreply.github.com>
Co-authored-by: Wilmer <33392318@qq.com>
Co-authored-by: Adrian Weidig <adrianweidig@gmx.net>
Co-authored-by: Zhichang Yu <yuzhichang@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Yongteng Lei <yongtengrey@outlook.com>
Co-authored-by: Liu An <asiro@qq.com>
Co-authored-by: buua436 <66937541+buua436@users.noreply.github.com>
Co-authored-by: BadwomanCraZY <511528396@qq.com>
Co-authored-by: cucusenok <31804608+cucusenok@users.noreply.github.com>
Co-authored-by: Russell Valentine <russ@coldstonelabs.org>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Billy Bao <newyorkupperbay@gmail.com>
Co-authored-by: Zhedong Cen <cenzhedong2@126.com>
Co-authored-by: TensorNull <129579691+TensorNull@users.noreply.github.com>
Co-authored-by: TensorNull <tensor.null@gmail.com>
This commit is contained in:
Kevin Hu
2025-09-19 10:15:19 +08:00
committed by GitHub
parent f9c7404bee
commit a1b947ffd6
81 changed files with 3083 additions and 799 deletions

View File

@ -0,0 +1,105 @@
# 健康检查与 Kubernetes 探针简明说明
本文件说明:什么是 K8s 探针、如何用 `/v1/system/healthz` 做健康检查,以及下文用例中的关键词含义。
## 什么是 K8s 探针Probe
- 探针是 K8s 用来“探测”容器是否健康/可对外服务的机制。
- 常见三类:
- livenessProbe活性探针。失败时 K8s 会重启容器,用于“应用卡死/失去连接时自愈”。
- readinessProbe就绪探针。失败时 Endpoint 不会被加入 Service 负载均衡,用于“应用尚未准备好时不接流量”。
- startupProbe启动探针。给慢启动应用更长的初始化窗口期间不执行 liveness/readiness。
- 这些探针通常通过 HTTP GET 访问一个公开且轻量的健康端点(无需鉴权),以 HTTP 状态码判定结果200=通过5xx/超时=失败。
## 本项目健康端点
- 已实现:`GET /v1/system/healthz`(无需认证)。
- 语义:
- 200关键依赖正常。
- 500任一关键依赖异常当前判定为 DB 或 Chat
- 响应体JSON最小字段 `status, db, chat`;并包含 `redis, doc_engine, storage` 等可观测项。失败项会在 `_meta` 中包含 `error/elapsed`
- 示例DB 故障):
```json
{"status":"nok","chat":"ok","db":"nok"}
```
## 用例背景Problem/use case
- 现状Ragflow 跑在 K8s数据库是 AWS RDS Postgres凭证由 Secret Manager 管理并每 7 天轮换。轮换后应用连接失效,需要手动重启 Pod 才能重新建立连接。
- 目标:通过 K8s 探针自动化检测并重启异常 Pod减少人工操作。
- 需求:一个“无需鉴权”的公共健康端点,能在依赖异常时返回非 200如 500且提供 JSON 详情。
- 现已满足:`/v1/system/healthz` 正是为此设计。
## 关键术语解释(对应你提供的描述)
- Ragflow instance部署在 K8s 的 Ragflow 服务。
- AWS RDS Postgres托管的 PostgreSQL 数据库实例。
- Secret Manager rotationSecrets 定期轮换(每 7 天),会导致旧连接失效。
- ProbesK8s 探针liveness/readiness用于自动重启或摘除不健康实例。
- Public endpoint without API key无需 Authorization 的 HTTP 路由,便于探针直接访问。
- Dependencies statuses依赖健康状态db、chat、redis、doc_engine、storage 等)。
- HTTP 500 with JSON当依赖异常时返回 500并附带 JSON 说明哪个子系统失败。
## 快速测试
- 正常:
```bash
curl -i http://<host>/v1/system/healthz
```
- 制造 DB 故障docker-compose 示例):
```bash
docker compose stop db && curl -i http://<host>/v1/system/healthz
```
(预期 500JSON 中 `db:"nok"`
## 更完整的测试清单
### 1) 仅查看 HTTP 状态码
```bash
curl -s -o /dev/null -w "%{http_code}\n" http://<host>/v1/system/healthz
```
期望:`200``500`
### 2) Windows PowerShell
```powershell
# 状态码
(Invoke-WebRequest -Uri "http://<host>/v1/system/healthz" -Method GET -TimeoutSec 3 -ErrorAction SilentlyContinue).StatusCode
# 完整响应
Invoke-RestMethod -Uri "http://<host>/v1/system/healthz" -Method GET
```
### 3) 通过 kubectl 端口转发本地测试
```bash
# 前端/网关暴露端口不同环境自行调整
kubectl port-forward deploy/<your-deploy> 8080:80 -n <ns>
curl -i http://127.0.0.1:8080/v1/system/healthz
```
### 4) 制造常见失败场景
- DB 失败(推荐):
```bash
docker compose stop db
curl -i http://<host>/v1/system/healthz # 预期 500
```
- Chat 失败(可选):将 `CHAT_CFG``factory`/`base_url` 设为无效并重启后端,再请求应为 500`chat:"nok"`
- Redis/存储/文档引擎:停用对应服务后再次请求,可在 JSON 中看到相应字段为 `"nok"`(不影响 200/500 判定)。
### 5) 浏览器验证
- 直接打开 `http://<host>/v1/system/healthz`,在 DevTools Network 查看 200/500页面正文就是 JSON。
- 反向代理注意:若有自定义 500 错页,需对 `/healthz` 关闭错误页拦截(如 `proxy_intercept_errors off;`)。
## K8s 探针示例
```yaml
readinessProbe:
httpGet:
path: /v1/system/healthz
port: 80
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 2
failureThreshold: 1
livenessProbe:
httpGet:
path: /v1/system/healthz
port: 80
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 2
failureThreshold: 3
```
提示如有反向代理Nginx自定义 500 错页,需对 `/healthz` 关闭错误页拦截,以便保留 JSON。

View File

@ -28,6 +28,7 @@ from api.db import CanvasCategory, FileType
from api.db.services.canvas_service import CanvasTemplateService, UserCanvasService, API4ConversationService
from api.db.services.document_service import DocumentService
from api.db.services.file_service import FileService
from api.db.services.task_service import queue_dataflow
from api.db.services.user_service import TenantService
from api.db.services.user_canvas_version import UserCanvasVersionService
from api.settings import RetCode
@ -48,14 +49,6 @@ def templates():
return get_json_result(data=[c.to_dict() for c in CanvasTemplateService.query(canvas_category=CanvasCategory.Agent)])
@manager.route('/list', methods=['GET']) # noqa: F821
@login_required
def canvas_list():
return get_json_result(data=sorted([c.to_dict() for c in \
UserCanvasService.query(user_id=current_user.id, canvas_category=CanvasCategory.Agent)], key=lambda x: x["update_time"]*-1)
)
@manager.route('/rm', methods=['POST']) # noqa: F821
@validate_request("canvas_ids")
@login_required
@ -77,9 +70,10 @@ def save():
if not isinstance(req["dsl"], str):
req["dsl"] = json.dumps(req["dsl"], ensure_ascii=False)
req["dsl"] = json.loads(req["dsl"])
cate = req.get("canvas_category", CanvasCategory.Agent)
if "id" not in req:
req["user_id"] = current_user.id
if UserCanvasService.query(user_id=current_user.id, title=req["title"].strip(), canvas_category=CanvasCategory.Agent):
if UserCanvasService.query(user_id=current_user.id, title=req["title"].strip(), canvas_category=cate):
return get_data_error_result(message=f"{req['title'].strip()} already exists.")
req["id"] = get_uuid()
if not UserCanvasService.save(**req):
@ -148,6 +142,14 @@ def run():
if not isinstance(cvs.dsl, str):
cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False)
if cvs.canvas_category == CanvasCategory.DataFlow:
task_id = get_uuid()
flow_id = get_uuid()
ok, error_message = queue_dataflow(dsl=cvs.dsl, tenant_id=user_id, file=files[0], task_id=task_id, flow_id=flow_id, priority=0)
if not ok:
return server_error_response(error_message)
return get_json_result(data={"task_id": task_id, "message_id": flow_id})
try:
canvas = Canvas(cvs.dsl, current_user.id, req["id"])
except Exception as e:
@ -332,7 +334,7 @@ def test_db_connect():
if req["db_type"] in ["mysql", "mariadb"]:
db = MySQLDatabase(req["database"], user=req["username"], host=req["host"], port=req["port"],
password=req["password"])
elif req["db_type"] == 'postgresql':
elif req["db_type"] == 'postgres':
db = PostgresqlDatabase(req["database"], user=req["username"], host=req["host"], port=req["port"],
password=req["password"])
elif req["db_type"] == 'mssql':
@ -383,22 +385,31 @@ def getversion( version_id):
return get_json_result(data=f"Error getting history file: {e}")
@manager.route('/listteam', methods=['GET']) # noqa: F821
@manager.route('/list', methods=['GET']) # noqa: F821
@login_required
def list_canvas():
keywords = request.args.get("keywords", "")
page_number = int(request.args.get("page", 1))
items_per_page = int(request.args.get("page_size", 150))
orderby = request.args.get("orderby", "create_time")
desc = request.args.get("desc", True)
try:
canvas_category = request.args.get("canvas_category")
if request.args.get("desc", "true").lower() == "false":
desc = False
else:
desc = True
owner_ids = request.args.get("owner_ids", [])
if not owner_ids:
tenants = TenantService.get_joined_tenants_by_user_id(current_user.id)
tenants = [m["tenant_id"] for m in tenants]
canvas, total = UserCanvasService.get_by_tenant_ids(
[m["tenant_id"] for m in tenants], current_user.id, page_number,
items_per_page, orderby, desc, keywords, canvas_category=CanvasCategory.Agent)
return get_json_result(data={"canvas": canvas, "total": total})
except Exception as e:
return server_error_response(e)
tenants, current_user.id, page_number,
items_per_page, orderby, desc, keywords, canvas_category)
else:
tenants = owner_ids
canvas, total = UserCanvasService.get_by_tenant_ids(
tenants, current_user.id, 0,
0, orderby, desc, keywords, canvas_category)
return get_json_result(data={"canvas": canvas, "total": total})
@manager.route('/setting', methods=['POST']) # noqa: F821

View File

@ -182,6 +182,7 @@ def create():
"id": get_uuid(),
"kb_id": kb.id,
"parser_id": kb.parser_id,
"pipeline_id": kb.pipeline_id,
"parser_config": kb.parser_config,
"created_by": current_user.id,
"type": FileType.VIRTUAL,
@ -546,31 +547,22 @@ def get(doc_id):
@manager.route("/change_parser", methods=["POST"]) # noqa: F821
@login_required
@validate_request("doc_id", "parser_id")
@validate_request("doc_id")
def change_parser():
req = request.json
if not DocumentService.accessible(req["doc_id"], current_user.id):
return get_json_result(data=False, message="No authorization.", code=settings.RetCode.AUTHENTICATION_ERROR)
try:
e, doc = DocumentService.get_by_id(req["doc_id"])
if not e:
return get_data_error_result(message="Document not found!")
if doc.parser_id.lower() == req["parser_id"].lower():
if "parser_config" in req:
if req["parser_config"] == doc.parser_config:
return get_json_result(data=True)
else:
return get_json_result(data=True)
if (doc.type == FileType.VISUAL and req["parser_id"] != "picture") or (re.search(r"\.(ppt|pptx|pages)$", doc.name) and req["parser_id"] != "presentation"):
return get_data_error_result(message="Not supported yet!")
e, doc = DocumentService.get_by_id(req["doc_id"])
if not e:
return get_data_error_result(message="Document not found!")
def reset_doc():
nonlocal doc
e = DocumentService.update_by_id(doc.id, {"parser_id": req["parser_id"], "progress": 0, "progress_msg": "", "run": TaskStatus.UNSTART.value})
if not e:
return get_data_error_result(message="Document not found!")
if "parser_config" in req:
DocumentService.update_parser_config(doc.id, req["parser_config"])
if doc.token_num > 0:
e = DocumentService.increment_chunk_num(doc.id, doc.kb_id, doc.token_num * -1, doc.chunk_num * -1, doc.process_duration * -1)
if not e:
@ -581,6 +573,26 @@ def change_parser():
if settings.docStoreConn.indexExist(search.index_name(tenant_id), doc.kb_id):
settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id)
try:
if req.get("pipeline_id"):
if doc.pipeline_id == req["pipeline_id"]:
return get_json_result(data=True)
DocumentService.update_by_id(doc.id, {"pipeline_id": req["pipeline_id"]})
reset_doc()
return get_json_result(data=True)
if doc.parser_id.lower() == req["parser_id"].lower():
if "parser_config" in req:
if req["parser_config"] == doc.parser_config:
return get_json_result(data=True)
else:
return get_json_result(data=True)
if (doc.type == FileType.VISUAL and req["parser_id"] != "picture") or (re.search(r"\.(ppt|pptx|pages)$", doc.name) and req["parser_id"] != "presentation"):
return get_data_error_result(message="Not supported yet!")
if "parser_config" in req:
DocumentService.update_parser_config(doc.id, req["parser_config"])
reset_doc()
return get_json_result(data=True)
except Exception as e:
return server_error_response(e)

View File

@ -64,7 +64,7 @@ def create():
e, t = TenantService.get_by_id(current_user.id)
if not e:
return get_data_error_result(message="Tenant not found.")
req["embd_id"] = t.embd_id
#req["embd_id"] = t.embd_id
if not KnowledgebaseService.save(**req):
return get_data_error_result()
return get_json_result(data={"kb_id": req["id"]})
@ -379,3 +379,19 @@ def get_meta():
code=settings.RetCode.AUTHENTICATION_ERROR
)
return get_json_result(data=DocumentService.get_meta_by_kbs(kb_ids))
@manager.route("/basic_info", methods=["GET"]) # noqa: F821
@login_required
def get_basic_info():
kb_id = request.args.get("kb_id", "")
if not KnowledgebaseService.accessible(kb_id, current_user.id):
return get_json_result(
data=False,
message='No authorization.',
code=settings.RetCode.AUTHENTICATION_ERROR
)
basic_info = DocumentService.knowledgebase_basic_info(kb_id)
return get_json_result(data=basic_info)

View File

@ -414,7 +414,7 @@ def agents_completion_openai_compatibility(tenant_id, agent_id):
tenant_id,
agent_id,
question,
session_id=req.get("session_id", req.get("id", "") or req.get("metadata", {}).get("id", "")),
session_id=req.pop("session_id", req.get("id", "")) or req.get("metadata", {}).get("id", ""),
stream=True,
**req,
),
@ -432,7 +432,7 @@ def agents_completion_openai_compatibility(tenant_id, agent_id):
tenant_id,
agent_id,
question,
session_id=req.get("session_id", req.get("id", "") or req.get("metadata", {}).get("id", "")),
session_id=req.pop("session_id", req.get("id", "")) or req.get("metadata", {}).get("id", ""),
stream=False,
**req,
)

View File

@ -36,6 +36,8 @@ from rag.utils.storage_factory import STORAGE_IMPL, STORAGE_IMPL_TYPE
from timeit import default_timer as timer
from rag.utils.redis_conn import REDIS_CONN
from flask import jsonify
from api.utils.health import run_health_checks
@manager.route("/version", methods=["GET"]) # noqa: F821
@login_required
@ -169,6 +171,12 @@ def status():
return get_json_result(data=res)
@manager.route("/healthz", methods=["GET"]) # noqa: F821
def healthz():
result, all_ok = run_health_checks()
return jsonify(result), (200 if all_ok else 500)
@manager.route("/new_token", methods=["POST"]) # noqa: F821
@login_required
def new_token():

View File

@ -646,6 +646,7 @@ class Knowledgebase(DataBaseModel):
vector_similarity_weight = FloatField(default=0.3, index=True)
parser_id = CharField(max_length=32, null=False, help_text="default parser ID", default=ParserType.NAIVE.value, index=True)
pipeline_id = CharField(max_length=32, null=True, help_text="Pipeline ID", index=True)
parser_config = JSONField(null=False, default={"pages": [[1, 1000000]]})
pagerank = IntegerField(default=0, index=False)
status = CharField(max_length=1, null=True, help_text="is it validate(0: wasted, 1: validate)", default="1", index=True)
@ -662,6 +663,7 @@ class Document(DataBaseModel):
thumbnail = TextField(null=True, help_text="thumbnail base64 string")
kb_id = CharField(max_length=256, null=False, index=True)
parser_id = CharField(max_length=32, null=False, help_text="default parser ID", index=True)
pipeline_id = CharField(max_length=32, null=True, help_text="pipleline ID", index=True)
parser_config = JSONField(null=False, default={"pages": [[1, 1000000]]})
source_type = CharField(max_length=128, null=False, default="local", help_text="where dose this document come from", index=True)
type = CharField(max_length=32, null=False, help_text="file extension", index=True)
@ -1020,7 +1022,6 @@ def migrate_db():
migrate(migrator.add_column("dialog", "meta_data_filter", JSONField(null=True, default={})))
except Exception:
pass
try:
migrate(migrator.alter_column_type("canvas_template", "title", JSONField(null=True, default=dict, help_text="Canvas title")))
except Exception:
@ -1037,4 +1038,12 @@ def migrate_db():
migrate(migrator.add_column("canvas_template", "canvas_category", CharField(max_length=32, null=False, default="agent_canvas", help_text="agent_canvas|dataflow_canvas", index=True)))
except Exception:
pass
try:
migrate(migrator.add_column("knowledgebase", "pipeline_id", CharField(max_length=32, null=True, help_text="default parser ID", index=True)))
except Exception:
pass
try:
migrate(migrator.add_column("document", "pipeline_id", CharField(max_length=32, null=True, help_text="default parser ID", index=True)))
except Exception:
pass
logging.disable(logging.NOTSET)

View File

@ -95,7 +95,7 @@ class UserCanvasService(CommonService):
@DB.connection_context()
def get_by_tenant_ids(cls, joined_tenant_ids, user_id,
page_number, items_per_page,
orderby, desc, keywords, canvas_category=CanvasCategory.Agent,
orderby, desc, keywords, canvas_category=None
):
fields = [
cls.model.id,
@ -122,7 +122,8 @@ class UserCanvasService(CommonService):
TenantPermission.TEAM.value)) | (
cls.model.user_id == user_id))
)
agents = agents.where(cls.model.canvas_category == canvas_category)
if canvas_category:
agents = agents.where(cls.model.canvas_category == canvas_category)
if desc:
agents = agents.order_by(cls.model.getter_by(orderby).desc())
else:

View File

@ -24,7 +24,7 @@ from io import BytesIO
import trio
import xxhash
from peewee import fn
from peewee import fn, Case
from api import settings
from api.constants import IMG_BASE64_PREFIX, FILE_NAME_LEN_LIMIT
@ -674,6 +674,53 @@ class DocumentService(CommonService):
return False
@classmethod
@DB.connection_context()
def knowledgebase_basic_info(cls, kb_id: str) -> dict[str, int]:
# cancelled: run == "2" but progress can vary
cancelled = (
cls.model.select(fn.COUNT(1))
.where((cls.model.kb_id == kb_id) & (cls.model.run == TaskStatus.CANCEL))
.scalar()
)
row = (
cls.model.select(
# finished: progress == 1
fn.COALESCE(fn.SUM(Case(None, [(cls.model.progress == 1, 1)], 0)), 0).alias("finished"),
# failed: progress == -1
fn.COALESCE(fn.SUM(Case(None, [(cls.model.progress == -1, 1)], 0)), 0).alias("failed"),
# processing: 0 <= progress < 1
fn.COALESCE(
fn.SUM(
Case(
None,
[
(((cls.model.progress == 0) | ((cls.model.progress > 0) & (cls.model.progress < 1))), 1),
],
0,
)
),
0,
).alias("processing"),
)
.where(
(cls.model.kb_id == kb_id)
& ((cls.model.run.is_null(True)) | (cls.model.run != TaskStatus.CANCEL))
)
.dicts()
.get()
)
return {
"processing": int(row["processing"]),
"finished": int(row["finished"]),
"failed": int(row["failed"]),
"cancelled": int(cancelled),
}
def queue_raptor_o_graphrag_tasks(doc, ty, priority):
chunking_config = DocumentService.get_chunking_config(doc["id"])
hasher = xxhash.xxh64()
@ -702,6 +749,8 @@ def queue_raptor_o_graphrag_tasks(doc, ty, priority):
def get_queue_length(priority):
group_info = REDIS_CONN.queue_info(get_svr_queue_name(priority), SVR_CONSUMER_GROUP_NAME)
if not group_info:
return 0
return int(group_info.get("lag", 0) or 0)
@ -847,3 +896,4 @@ def doc_upload_and_parse(conversation_id, file_objs, user_id):
doc_id, kb.id, token_counts[doc_id], chunk_counts[doc_id], 0)
return [d["id"] for d, _ in files]

View File

@ -440,6 +440,7 @@ class FileService(CommonService):
"id": doc_id,
"kb_id": kb.id,
"parser_id": self.get_parser(filetype, filename, kb.parser_id),
"pipeline_id": kb.pipeline_id,
"parser_config": kb.parser_config,
"created_by": user_id,
"type": filetype,

View File

@ -472,19 +472,19 @@ def has_canceled(task_id):
return False
def queue_dataflow(dsl:str, tenant_id:str, doc_id:str, task_id:str, flow_id:str, priority: int, callback=None) -> tuple[bool, str]:
def queue_dataflow(dsl:str, tenant_id:str, task_id:str, flow_id:str=None, doc_id:str=None, file:dict=None, priority: int=0, callback=None) -> tuple[bool, str]:
"""
Returns a tuple (success: bool, error_message: str).
"""
_ = callback
task = dict(
id=get_uuid() if not task_id else task_id,
doc_id=doc_id,
from_page=0,
to_page=100000000,
task_type="dataflow",
priority=priority,
id=get_uuid() if not task_id else task_id,
doc_id=doc_id,
from_page=0,
to_page=100000000,
task_type="dataflow",
priority=priority,
)
TaskService.model.delete().where(TaskService.model.id == task["id"]).execute()
@ -499,6 +499,7 @@ def queue_dataflow(dsl:str, tenant_id:str, doc_id:str, task_id:str, flow_id:str,
task["task_type"] = "dataflow"
task["dsl"] = dsl
task["dataflow_id"] = get_uuid() if not flow_id else flow_id
task["file"] = file
if not REDIS_CONN.queue_product(
get_svr_queue_name(priority), message=task

View File

@ -1,3 +1,51 @@
import base64
from functools import partial
from io import BytesIO
from PIL import Image
test_image_base64 = "iVBORw0KGgoAAAANSUhEUgAAAGQAAABkCAIAAAD/gAIDAAAA6ElEQVR4nO3QwQ3AIBDAsIP9d25XIC+EZE8QZc18w5l9O+AlZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBT+IYAHHLHkdEgAAAABJRU5ErkJggg=="
test_image = base64.b64decode(test_image_base64)
test_image = base64.b64decode(test_image_base64)
async def image2id(d: dict, storage_put_func: partial, bucket:str, objname:str):
import logging
from io import BytesIO
import trio
from rag.svr.task_executor import minio_limiter
if not d.get("image"):
return
with BytesIO() as output_buffer:
if isinstance(d["image"], bytes):
output_buffer.write(d["image"])
output_buffer.seek(0)
else:
# If the image is in RGBA mode, convert it to RGB mode before saving it in JPEG format.
if d["image"].mode in ("RGBA", "P"):
converted_image = d["image"].convert("RGB")
d["image"] = converted_image
try:
d["image"].save(output_buffer, format='JPEG')
except OSError as e:
logging.warning(
"Saving image exception, ignore: {}".format(str(e)))
async with minio_limiter:
await trio.to_thread.run_sync(lambda: storage_put_func(bucket=bucket, fnm=objname, binary=output_buffer.getvalue()))
d["img_id"] = f"{bucket}-{objname}"
if not isinstance(d["image"], bytes):
d["image"].close()
del d["image"] # Remove image reference
def id2image(image_id:str|None, storage_get_func: partial):
if not image_id:
return
arr = image_id.split("-")
if len(arr) != 2:
return
bkt, nm = image_id.split("-")
blob = storage_get_func(bucket=bkt, filename=nm)
if not blob:
return
return Image.open(BytesIO(blob))

104
api/utils/health.py Normal file
View File

@ -0,0 +1,104 @@
from timeit import default_timer as timer
from api import settings
from api.db.db_models import DB
from rag.utils.redis_conn import REDIS_CONN
from rag.utils.storage_factory import STORAGE_IMPL
def _ok_nok(ok: bool) -> str:
return "ok" if ok else "nok"
def check_db() -> tuple[bool, dict]:
st = timer()
try:
# lightweight probe; works for MySQL/Postgres
DB.execute_sql("SELECT 1")
return True, {"elapsed": f"{(timer() - st) * 1000.0:.1f}"}
except Exception as e:
return False, {"elapsed": f"{(timer() - st) * 1000.0:.1f}", "error": str(e)}
def check_redis() -> tuple[bool, dict]:
st = timer()
try:
ok = bool(REDIS_CONN.health())
return ok, {"elapsed": f"{(timer() - st) * 1000.0:.1f}"}
except Exception as e:
return False, {"elapsed": f"{(timer() - st) * 1000.0:.1f}", "error": str(e)}
def check_doc_engine() -> tuple[bool, dict]:
st = timer()
try:
meta = settings.docStoreConn.health()
# treat any successful call as ok
return True, {"elapsed": f"{(timer() - st) * 1000.0:.1f}", **(meta or {})}
except Exception as e:
return False, {"elapsed": f"{(timer() - st) * 1000.0:.1f}", "error": str(e)}
def check_storage() -> tuple[bool, dict]:
st = timer()
try:
STORAGE_IMPL.health()
return True, {"elapsed": f"{(timer() - st) * 1000.0:.1f}"}
except Exception as e:
return False, {"elapsed": f"{(timer() - st) * 1000.0:.1f}", "error": str(e)}
def check_chat() -> tuple[bool, dict]:
st = timer()
try:
cfg = getattr(settings, "CHAT_CFG", None)
ok = bool(cfg and cfg.get("factory"))
return ok, {"elapsed": f"{(timer() - st) * 1000.0:.1f}"}
except Exception as e:
return False, {"elapsed": f"{(timer() - st) * 1000.0:.1f}", "error": str(e)}
def run_health_checks() -> tuple[dict, bool]:
result: dict[str, str | dict] = {}
db_ok, db_meta = check_db()
chat_ok, chat_meta = check_chat()
result["db"] = _ok_nok(db_ok)
if not db_ok:
result.setdefault("_meta", {})["db"] = db_meta
result["chat"] = _ok_nok(chat_ok)
if not chat_ok:
result.setdefault("_meta", {})["chat"] = chat_meta
# Optional probes (do not change minimal contract but exposed for observability)
try:
redis_ok, redis_meta = check_redis()
result["redis"] = _ok_nok(redis_ok)
if not redis_ok:
result.setdefault("_meta", {})["redis"] = redis_meta
except Exception:
result["redis"] = "nok"
try:
doc_ok, doc_meta = check_doc_engine()
result["doc_engine"] = _ok_nok(doc_ok)
if not doc_ok:
result.setdefault("_meta", {})["doc_engine"] = doc_meta
except Exception:
result["doc_engine"] = "nok"
try:
sto_ok, sto_meta = check_storage()
result["storage"] = _ok_nok(sto_ok)
if not sto_ok:
result.setdefault("_meta", {})["storage"] = sto_meta
except Exception:
result["storage"] = "nok"
all_ok = (result.get("db") == "ok") and (result.get("chat") == "ok")
result["status"] = "ok" if all_ok else "nok"
return result, all_ok