diff --git a/api/apps/HEALTHCHECK_TESTING.md b/api/apps/HEALTHCHECK_TESTING.md deleted file mode 100644 index a97a03c0e..000000000 --- a/api/apps/HEALTHCHECK_TESTING.md +++ /dev/null @@ -1,105 +0,0 @@ -# 健康检查与 Kubernetes 探针简明说明 - -本文件说明:什么是 K8s 探针、如何用 `/v1/system/healthz` 做健康检查,以及下文用例中的关键词含义。 - -## 什么是 K8s 探针(Probe) -- 探针是 K8s 用来“探测”容器是否健康/可对外服务的机制。 -- 常见三类: - - livenessProbe:活性探针。失败时 K8s 会重启容器,用于“应用卡死/失去连接时自愈”。 - - readinessProbe:就绪探针。失败时 Endpoint 不会被加入 Service 负载均衡,用于“应用尚未准备好时不接流量”。 - - startupProbe:启动探针。给慢启动应用更长的初始化窗口,期间不执行 liveness/readiness。 -- 这些探针通常通过 HTTP GET 访问一个公开且轻量的健康端点(无需鉴权),以 HTTP 状态码判定结果:200=通过;5xx/超时=失败。 - -## 本项目健康端点 -- 已实现:`GET /v1/system/healthz`(无需认证)。 -- 语义: - - 200:关键依赖正常。 - - 500:任一关键依赖异常(当前判定为 DB 或 Chat)。 - - 响应体:JSON,最小字段 `status, db, chat`;并包含 `redis, doc_engine, storage` 等可观测项。失败项会在 `_meta` 中包含 `error/elapsed`。 -- 示例(DB 故障): -```json -{"status":"nok","chat":"ok","db":"nok"} -``` - -## 用例背景(Problem/use case) -- 现状:Ragflow 跑在 K8s,数据库是 AWS RDS Postgres,凭证由 Secret Manager 管理并每 7 天轮换。轮换后应用连接失效,需要手动重启 Pod 才能重新建立连接。 -- 目标:通过 K8s 探针自动化检测并重启异常 Pod,减少人工操作。 -- 需求:一个“无需鉴权”的公共健康端点,能在依赖异常时返回非 200(如 500)且提供 JSON 详情。 -- 现已满足:`/v1/system/healthz` 正是为此设计。 - -## 关键术语解释(对应你提供的描述) -- Ragflow instance:部署在 K8s 的 Ragflow 服务。 -- AWS RDS Postgres:托管的 PostgreSQL 数据库实例。 -- Secret Manager rotation:Secrets 定期轮换(每 7 天),会导致旧连接失效。 -- Probes(K8s 探针):liveness/readiness,用于自动重启或摘除不健康实例。 -- Public endpoint without API key:无需 Authorization 的 HTTP 路由,便于探针直接访问。 -- Dependencies statuses:依赖健康状态(db、chat、redis、doc_engine、storage 等)。 -- HTTP 500 with JSON:当依赖异常时返回 500,并附带 JSON 说明哪个子系统失败。 - -## 快速测试 -- 正常: -```bash -curl -i http:///v1/system/healthz -``` -- 制造 DB 故障(docker-compose 示例): -```bash -docker compose stop db && curl -i http:///v1/system/healthz -``` -(预期 500,JSON 中 `db:"nok"`) - -## 更完整的测试清单 -### 1) 仅查看 HTTP 状态码 -```bash -curl -s -o /dev/null -w "%{http_code}\n" http:///v1/system/healthz -``` -期望:`200` 或 `500`。 - -### 2) Windows PowerShell -```powershell -# 状态码 -(Invoke-WebRequest -Uri "http:///v1/system/healthz" -Method GET -TimeoutSec 3 -ErrorAction SilentlyContinue).StatusCode -# 完整响应 -Invoke-RestMethod -Uri "http:///v1/system/healthz" -Method GET -``` - -### 3) 通过 kubectl 端口转发本地测试 -```bash -# 前端/网关暴露端口不同环境自行调整 -kubectl port-forward deploy/ 8080:80 -n -curl -i http://127.0.0.1:8080/v1/system/healthz -``` - -### 4) 制造常见失败场景 -- DB 失败(推荐): -```bash -docker compose stop db -curl -i http:///v1/system/healthz # 预期 500 -``` -- Chat 失败(可选):将 `CHAT_CFG` 的 `factory`/`base_url` 设为无效并重启后端,再请求应为 500,且 `chat:"nok"`。 -- Redis/存储/文档引擎:停用对应服务后再次请求,可在 JSON 中看到相应字段为 `"nok"`(不影响 200/500 判定)。 - -### 5) 浏览器验证 -- 直接打开 `http:///v1/system/healthz`,在 DevTools Network 查看 200/500;页面正文就是 JSON。 -- 反向代理注意:若有自定义 500 错页,需对 `/healthz` 关闭错误页拦截(如 `proxy_intercept_errors off;`)。 - -## K8s 探针示例 -```yaml -readinessProbe: - httpGet: - path: /v1/system/healthz - port: 80 - initialDelaySeconds: 5 - periodSeconds: 10 - timeoutSeconds: 2 - failureThreshold: 1 -livenessProbe: - httpGet: - path: /v1/system/healthz - port: 80 - initialDelaySeconds: 10 - periodSeconds: 10 - timeoutSeconds: 2 - failureThreshold: 3 -``` - -提示:如有反向代理(Nginx)自定义 500 错页,需对 `/healthz` 关闭错误页拦截,以便保留 JSON。 diff --git a/api/apps/canvas_app.py b/api/apps/canvas_app.py index 1c5390826..3d7b13d8d 100644 --- a/api/apps/canvas_app.py +++ b/api/apps/canvas_app.py @@ -174,6 +174,16 @@ def run(): return resp +@manager.route('/cancel/', methods=['PUT']) # noqa: F821 +@login_required +def cancel(task_id): + try: + REDIS_CONN.set(f"{task_id}-cancel", "x") + except Exception as e: + logging.exception(e) + return get_json_result(data=True) + + @manager.route('/reset', methods=['POST']) # noqa: F821 @validate_request("id") @login_required diff --git a/api/apps/kb_app.py b/api/apps/kb_app.py index d2c2e35e2..9ee62d1e0 100644 --- a/api/apps/kb_app.py +++ b/api/apps/kb_app.py @@ -61,6 +61,8 @@ def create(): req["name"] = dataset_name req["tenant_id"] = current_user.id req["created_by"] = current_user.id + if not req.get("parser_id"): + req["parser_id"] = "naive" e, t = TenantService.get_by_id(current_user.id) if not e: return get_data_error_result(message="Tenant not found.") diff --git a/api/db/services/document_service.py b/api/db/services/document_service.py index 71bed0847..605f14049 100644 --- a/api/db/services/document_service.py +++ b/api/db/services/document_service.py @@ -24,12 +24,12 @@ from io import BytesIO import trio import xxhash -from peewee import fn, Case +from peewee import fn, Case, JOIN from api import settings from api.constants import IMG_BASE64_PREFIX, FILE_NAME_LEN_LIMIT -from api.db import FileType, LLMType, ParserType, StatusEnum, TaskStatus, UserTenantRole -from api.db.db_models import DB, Document, Knowledgebase, Task, Tenant, UserTenant, File2Document, File +from api.db import FileType, LLMType, ParserType, StatusEnum, TaskStatus, UserTenantRole, CanvasCategory +from api.db.db_models import DB, Document, Knowledgebase, Task, Tenant, UserTenant, File2Document, File, UserCanvas from api.db.db_utils import bulk_insert_into_db from api.db.services.common_service import CommonService from api.db.services.knowledgebase_service import KnowledgebaseService @@ -51,6 +51,7 @@ class DocumentService(CommonService): cls.model.thumbnail, cls.model.kb_id, cls.model.parser_id, + cls.model.pipeline_id, cls.model.parser_config, cls.model.source_type, cls.model.type, @@ -79,7 +80,10 @@ class DocumentService(CommonService): def get_list(cls, kb_id, page_number, items_per_page, orderby, desc, keywords, id, name): fields = cls.get_cls_model_fields() - docs = cls.model.select(*fields).join(File2Document, on = (File2Document.document_id == cls.model.id)).join(File, on = (File.id == File2Document.file_id)).where(cls.model.kb_id == kb_id) + docs = cls.model.select(*[*fields, UserCanvas.title]).join(File2Document, on = (File2Document.document_id == cls.model.id))\ + .join(File, on = (File.id == File2Document.file_id))\ + .join(UserCanvas, on = ((cls.model.pipeline_id == UserCanvas.id) & (UserCanvas.canvas_category == CanvasCategory.DataFlow.value)), join_type=JOIN.LEFT_OUTER)\ + .where(cls.model.kb_id == kb_id) if id: docs = docs.where( cls.model.id == id) diff --git a/api/utils/base64_image.py b/api/utils/base64_image.py index ed01d2708..aa24ac63a 100644 --- a/api/utils/base64_image.py +++ b/api/utils/base64_image.py @@ -7,7 +7,8 @@ from PIL import Image test_image_base64 = "iVBORw0KGgoAAAANSUhEUgAAAGQAAABkCAIAAAD/gAIDAAAA6ElEQVR4nO3QwQ3AIBDAsIP9d25XIC+EZE8QZc18w5l9O+AlZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBWYFZgVmBT+IYAHHLHkdEgAAAABJRU5ErkJggg==" test_image = base64.b64decode(test_image_base64) -async def image2id(d: dict, storage_put_func: partial, bucket:str, objname:str): + +async def image2id(d: dict, storage_put_func: partial, objname:str, bucket:str="IMAGETEMPS"): import logging from io import BytesIO import trio diff --git a/rag/flow/hierarchical_merger/hierarchical_merger.py b/rag/flow/hierarchical_merger/hierarchical_merger.py index ee85c581c..4931ed619 100644 --- a/rag/flow/hierarchical_merger/hierarchical_merger.py +++ b/rag/flow/hierarchical_merger/hierarchical_merger.py @@ -173,6 +173,6 @@ class HierarchicalMerger(ProcessBase): ] async with trio.open_nursery() as nursery: for d in cks: - nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), "_image_temps", get_uuid()) + nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), get_uuid()) self.callback(1, "Done.") diff --git a/rag/flow/parser/parser.py b/rag/flow/parser/parser.py index d15bed7de..919a98dfd 100644 --- a/rag/flow/parser/parser.py +++ b/rag/flow/parser/parser.py @@ -59,11 +59,8 @@ class ParserParam(ProcessParamBase): "image": [ "text" ], - "email": [ - "text", - "json" - ], - "text": [ + "email": ["text", "json"], + "text&markdown": [ "text", "json" ], @@ -102,7 +99,6 @@ class ParserParam(ProcessParamBase): "output_format": "json", }, "slides": { - "parse_method": "presentation", "suffix": [ "pptx", ], @@ -122,12 +118,6 @@ class ParserParam(ProcessParamBase): "fields": ["from", "to", "cc", "bcc", "date", "subject", "body", "attachments", "metadata"], "output_format": "json", }, - "text": { - "suffix": [ - "txt" - ], - "output_format": "json", - }, "audio": { "suffix":[ "da", @@ -168,10 +158,10 @@ class ParserParam(ProcessParamBase): spreadsheet_output_format = spreadsheet_config.get("output_format", "") self.check_valid_value(spreadsheet_output_format, "Spreadsheet output format abnormal.", self.allowed_output_format["spreadsheet"]) - doc_config = self.setups.get("doc", "") + doc_config = self.setups.get("word", "") if doc_config: doc_output_format = doc_config.get("output_format", "") - self.check_valid_value(doc_output_format, "Word processer document output format abnormal.", self.allowed_output_format["doc"]) + self.check_valid_value(doc_output_format, "Word processer document output format abnormal.", self.allowed_output_format["word"]) slides_config = self.setups.get("slides", "") if slides_config: @@ -181,17 +171,13 @@ class ParserParam(ProcessParamBase): image_config = self.setups.get("image", "") if image_config: image_parse_method = image_config.get("parse_method", "") - self.check_valid_value(image_parse_method.lower(), "Parse method abnormal.", ["ocr", "vlm"]) if image_parse_method not in ["ocr"]: - self.check_empty(image_config.get("llm_id"), "VLM") + self.check_empty(image_config.get("lang", ""), "Language") - image_language = image_config.get("lang", "") - self.check_empty(image_language, "Language") - - text_config = self.setups.get("text", "") + text_config = self.setups.get("text&markdown", "") if text_config: text_output_format = text_config.get("output_format", "") - self.check_valid_value(text_output_format, "Text output format abnormal.", self.allowed_output_format["text"]) + self.check_valid_value(text_output_format, "Text output format abnormal.", self.allowed_output_format["text&markdown"]) audio_config = self.setups.get("audio", "") if audio_config: @@ -216,9 +202,9 @@ class Parser(ProcessBase): conf = self._param.setups["pdf"] self.set_output("output_format", conf["output_format"]) - if conf.get("parse_method") == "deepdoc": + if conf.get("parse_method").lower() == "deepdoc": bboxes = RAGFlowPdfParser().parse_into_bboxes(blob, callback=self.callback) - elif conf.get("parse_method") == "plain_text": + elif conf.get("parse_method").lower() == "plain_text": lines, _ = PlainParser()(blob) bboxes = [{"text": t} for t, _ in lines] else: @@ -299,7 +285,7 @@ class Parser(ProcessBase): from rag.nlp import concat_img self.callback(random.randint(1, 5) / 100.0, "Start to work on a markdown.") - conf = self._param.setups["markdown"] + conf = self._param.setups["text&markdown"] self.set_output("output_format", conf["output_format"]) markdown_parser = naive_markdown_parser() @@ -326,25 +312,22 @@ class Parser(ProcessBase): self.set_output("text", "\n".join([section_text for section_text, _ in sections])) - def _image(self, from_upstream: ParserFromUpstream): + def _image(self, name, blob): from deepdoc.vision import OCR self.callback(random.randint(1, 5) / 100.0, "Start to work on an image.") - - blob = from_upstream.blob conf = self._param.setups["image"] self.set_output("output_format", conf["output_format"]) img = Image.open(io.BytesIO(blob)).convert("RGB") - lang = conf["lang"] if conf["parse_method"] == "ocr": # use ocr, recognize chars only ocr = OCR() bxs = ocr(np.array(img)) # return boxes and recognize result txt = "\n".join([t[0] for _, t in bxs if t[0]]) - else: + lang = conf["lang"] # use VLM to describe the picture cv_model = LLMBundle(self._canvas.get_tenant_id(), LLMType.IMAGE2TEXT, llm_name=conf["llm_id"], lang=lang) img_binary = io.BytesIO() @@ -519,13 +502,18 @@ class Parser(ProcessBase): else: blob = FileService.get_blob(from_upstream.file["created_by"], from_upstream.file["id"]) + done = False for p_type, conf in self._param.setups.items(): if from_upstream.name.split(".")[-1].lower() not in conf.get("suffix", []): continue await trio.to_thread.run_sync(function_map[p_type], name, blob) + done = True break + if not done: + raise Exception("No suitable for file extension: `.%s`" % from_upstream.name.split(".")[-1].lower()) + outs = self.output() async with trio.open_nursery() as nursery: for d in outs.get("json", []): - nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), "_image_temps", get_uuid()) + nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), get_uuid()) diff --git a/rag/flow/pipeline.py b/rag/flow/pipeline.py index cf020f68c..36ac531dc 100644 --- a/rag/flow/pipeline.py +++ b/rag/flow/pipeline.py @@ -19,29 +19,38 @@ import logging import random import time from timeit import default_timer as timer - import trio from agent.canvas import Graph from api.db import PipelineTaskType from api.db.services.document_service import DocumentService +from api.db.services.task_service import has_canceled from api.db.services.pipeline_operation_log_service import PipelineOperationLogService from rag.utils.redis_conn import REDIS_CONN class Pipeline(Graph): - def __init__(self, dsl: str, tenant_id=None, doc_id=None, task_id=None, flow_id=None): + def __init__(self, dsl: str|dict, tenant_id=None, doc_id=None, task_id=None, flow_id=None): + if isinstance(dsl, dict): + dsl = json.dumps(dsl, ensure_ascii=False) super().__init__(dsl, tenant_id, task_id) + if self._doc_id == "x": + self._doc_id = None self._doc_id = doc_id self._flow_id = flow_id self._kb_id = None - if doc_id: + if self._doc_id: self._kb_id = DocumentService.get_knowledgebase_id(doc_id) - assert self._kb_id, f"Can't find KB of this document: {doc_id}" + if not self._kb_id: + self._doc_id = None def callback(self, component_name: str, progress: float | int | None = None, message: str = "") -> None: + from rag.svr.task_executor import TaskCanceledException log_key = f"{self._flow_id}-{self.task_id}-logs" timestamp = timer() + if has_canceled(self.task_id): + progress = -1 + message += "[CANCEL]" try: bin = REDIS_CONN.get(log_key) obj = json.loads(bin.encode("utf-8")) @@ -91,6 +100,9 @@ class Pipeline(Graph): except Exception as e: logging.exception(e) + if has_canceled(self.task_id): + raise TaskCanceledException(message) + def fetch_logs(self): log_key = f"{self._flow_id}-{self.task_id}-logs" try: diff --git a/rag/flow/splitter/splitter.py b/rag/flow/splitter/splitter.py index 7cf902a05..10976bb53 100644 --- a/rag/flow/splitter/splitter.py +++ b/rag/flow/splitter/splitter.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import json import random from functools import partial @@ -78,7 +77,7 @@ class Splitter(ProcessBase): deli, self._param.overlapped_percent, ) - self.set_output("chunks", [{"text": c} for c in cks]) + self.set_output("chunks", [{"text": c.strip()} for c in cks if c.strip()]) self.callback(1, "Done.") return @@ -106,7 +105,6 @@ class Splitter(ProcessBase): ] async with trio.open_nursery() as nursery: for d in cks: - nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), "_image_temps", get_uuid()) - print("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n", json.dumps(cks, ensure_ascii=False, indent=2)) + nursery.start_soon(image2id, d, partial(STORAGE_IMPL.put), get_uuid()) self.set_output("chunks", cks) self.callback(1, "Done.") diff --git a/rag/flow/tokenizer/tokenizer.py b/rag/flow/tokenizer/tokenizer.py index a382b6993..bdc4b9adc 100644 --- a/rag/flow/tokenizer/tokenizer.py +++ b/rag/flow/tokenizer/tokenizer.py @@ -114,9 +114,8 @@ class Tokenizer(ProcessBase): if from_upstream.chunks: chunks = from_upstream.chunks for i, ck in enumerate(chunks): - if ck.get("docnm_kwd"): # from presentation method - ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", ck["docnm_kwd"])) - ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"]) + ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", from_upstream.name)) + ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"]) if ck.get("questions"): ck["question_tks"] = rag_tokenizer.tokenize("\n".join(ck["questions"])) if ck.get("keywords"): @@ -125,6 +124,7 @@ class Tokenizer(ProcessBase): ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"]) if i % 100 == 99: self.callback(i * 1.0 / len(chunks) / parts) + elif from_upstream.output_format in ["markdown", "text", "html"]: if from_upstream.output_format == "markdown": payload = from_upstream.markdown_result @@ -138,18 +138,16 @@ class Tokenizer(ProcessBase): ck = {"text": payload} if "full_text" in self._param.search_method: - if ck.get("docnm_kwd"): # from presentation method - ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", ck["docnm_kwd"])) - ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"]) - ck["content_ltks"] = rag_tokenizer.tokenize(kwargs.get(kwargs["output_format"], "")) + ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", from_upstream.name)) + ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"]) + ck["content_ltks"] = rag_tokenizer.tokenize(payload) ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"]) chunks = [ck] else: chunks = from_upstream.json_result for i, ck in enumerate(chunks): - if ck.get("docnm_kwd"): # from presentation method - ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", ck["docnm_kwd"])) - ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"]) + ck["title_tks"] = rag_tokenizer.tokenize(re.sub(r"\.[a-zA-Z]+$", "", from_upstream.name)) + ck["title_sm_tks"] = rag_tokenizer.fine_grained_tokenize(ck["title_tks"]) ck["content_ltks"] = rag_tokenizer.tokenize(ck["text"]) ck["content_sm_ltks"] = rag_tokenizer.fine_grained_tokenize(ck["content_ltks"]) if i % 100 == 99: diff --git a/rag/nlp/__init__.py b/rag/nlp/__init__.py index 2424ba033..4acf5da44 100644 --- a/rag/nlp/__init__.py +++ b/rag/nlp/__init__.py @@ -522,7 +522,9 @@ def naive_merge(sections: str | list, chunk_token_num=128, delimiter="\n。; from deepdoc.parser.pdf_parser import RAGFlowPdfParser if not sections: return [] - if isinstance(sections[0], type("")): + if isinstance(sections, str): + sections = [sections] + if isinstance(sections[0], str): sections = [(s, "") for s in sections] cks = [""] tk_nums = [0] diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index fca3c6ccc..b6ccce655 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -301,7 +301,7 @@ async def build_chunks(task, progress_callback): d["img_id"] = "" docs.append(d) return - await image2id(d, partial(STORAGE_IMPL.put), task["kb_id"], d["id"]) + await image2id(d, partial(STORAGE_IMPL.put), d["id"], task["kb_id"]) docs.append(d) except Exception: logging.exception( @@ -531,7 +531,6 @@ async def do_handle_task(task): task_parser_config = task["parser_config"] task_start_ts = timer() - # prepare the progress callback function progress_callback = partial(set_progress, task_id, task_from_page, task_to_page)