From 4dd8cdc38bbadfab6eea8fd4fb17dde405872460 Mon Sep 17 00:00:00 2001 From: concertdictate <49596780+concertdictate@users.noreply.github.com> Date: Thu, 18 Dec 2025 04:03:30 +0200 Subject: [PATCH] task executor issues (#12006) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? **Fixes #8706** - `InfinityException: TOO_MANY_CONNECTIONS` when running multiple task executor workers ### Problem Description When running RAGFlow with 8-16 task executor workers, most workers fail to start properly. Checking logs revealed that workers were stuck/hanging during Infinity connection initialization - only 1-2 workers would successfully register in Redis while the rest remained blocked. ### Root Cause The Infinity SDK `ConnectionPool` pre-allocates all connections in `__init__`. With the default `max_size=32` and multiple workers (e.g., 16), this creates 16×32=512 connections immediately on startup, exceeding Infinity's default 128 connection limit. Workers hang while waiting for connections that can never be established. ### Changes 1. **Prevent Infinity connection storm** (`rag/utils/infinity_conn.py`, `rag/svr/task_executor.py`) - Reduced ConnectionPool `max_size` from 32 to 4 (sufficient since operations are synchronous) - Added staggered startup delay (2s per worker) to spread connection initialization 2. **Handle None children_delimiter** (`rag/app/naive.py`) - Use `or ""` to handle explicitly set None values from parser config 3. **MinerU parser robustness** (`deepdoc/parser/mineru_parser.py`) - Use `.get()` for optional output fields that may be missing - Fix DISCARDED block handling: change `pass` to `continue` to skip discarded blocks entirely ### Why `max_size=4` is sufficient | Workers | Pool Size | Total Connections | Infinity Limit | |---------|-----------|-------------------|----------------| | 16 | 32 | 512 | 128 ❌ | | 16 | 4 | 64 | 128 ✅ | | 32 | 4 | 128 | 128 ✅ | - All RAGFlow operations are synchronous: `get_conn()` → operation → `release_conn()` - No parallel `docStoreConn` operations in the codebase - Maximum 1-2 concurrent connections needed per worker; 4 provides safety margin ### MinerU DISCARDED block bug When MinerU returns blocks with `type: "discarded"` (headers, footers, watermarks, page numbers, artifacts), the previous code used `pass` which left the `section` variable undefined, causing: - **UnboundLocalError** if DISCARDED is the first block - **Duplicate content** if DISCARDED follows another block (stale value from previous iteration) **Root cause confirmed via MinerU source code:** From [`mineru/utils/enum_class.py`](https://github.com/opendatalab/MinerU/blob/main/mineru/utils/enum_class.py#L14): ```python class BlockType: DISCARDED = 'discarded' # VLM 2.5+ also has: HEADER, FOOTER, PAGE_NUMBER, ASIDE_TEXT, PAGE_FOOTNOTE ``` Per [MinerU documentation](https://opendatalab.github.io/MinerU/reference/output_files/), discarded blocks contain content that should be filtered out for clean text extraction. **Fix:** Changed `pass` to `continue` to skip discarded blocks entirely. ### Testing - Verified all 16 workers now register successfully in Redis - All workers heartbeating correctly - Document parsing works as expected - MinerU parsing with DISCARDED blocks no longer crashes ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --------- Co-authored-by: user210 --- deepdoc/parser/mineru_parser.py | 8 ++++---- rag/app/naive.py | 2 +- rag/svr/task_executor.py | 13 +++++++++++++ rag/utils/infinity_conn.py | 2 +- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/deepdoc/parser/mineru_parser.py b/deepdoc/parser/mineru_parser.py index f22c6e48b..aba237dd1 100644 --- a/deepdoc/parser/mineru_parser.py +++ b/deepdoc/parser/mineru_parser.py @@ -511,7 +511,7 @@ class MinerUParser(RAGFlowPdfParser): for output in outputs: match output["type"]: case MinerUContentType.TEXT: - section = output["text"] + section = output.get("text", "") case MinerUContentType.TABLE: section = output.get("table_body", "") + "\n".join(output.get("table_caption", [])) + "\n".join( output.get("table_footnote", [])) @@ -521,13 +521,13 @@ class MinerUParser(RAGFlowPdfParser): section = "".join(output.get("image_caption", [])) + "\n" + "".join( output.get("image_footnote", [])) case MinerUContentType.EQUATION: - section = output["text"] + section = output.get("text", "") case MinerUContentType.CODE: - section = output["code_body"] + "\n".join(output.get("code_caption", [])) + section = output.get("code_body", "") + "\n".join(output.get("code_caption", [])) case MinerUContentType.LIST: section = "\n".join(output.get("list_items", [])) case MinerUContentType.DISCARDED: - pass + continue # Skip discarded blocks entirely if section and parse_method == "manual": sections.append((section, output["type"], self._line_tag(output))) diff --git a/rag/app/naive.py b/rag/app/naive.py index 7756f7a9a..1046a8f62 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -651,7 +651,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca "parser_config", { "chunk_token_num": 512, "delimiter": "\n!?。;!?", "layout_recognize": "DeepDOC", "analyze_hyperlink": True}) - child_deli = parser_config.get("children_delimiter", "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8') + child_deli = (parser_config.get("children_delimiter") or "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8') cust_child_deli = re.findall(r"`([^`]+)`", child_deli) child_deli = "|".join(re.sub(r"`([^`]+)`", "", child_deli)) if cust_child_deli: diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 8668d9178..37c6f5aec 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -1165,6 +1165,19 @@ async def task_manager(): async def main(): + # Stagger executor startup to prevent connection storm to Infinity + # Extract worker number from CONSUMER_NAME (e.g., "task_executor_abc123_5" -> 5) + try: + worker_num = int(CONSUMER_NAME.rsplit("_", 1)[-1]) + # Add random delay: base delay + worker_num * 2.0s + random jitter + # This spreads out connection attempts over several seconds + startup_delay = worker_num * 2.0 + random.uniform(0, 0.5) + if startup_delay > 0: + logging.info(f"Staggering startup by {startup_delay:.2f}s to prevent connection storm") + await asyncio.sleep(startup_delay) + except (ValueError, IndexError): + pass # Non-standard consumer name, skip delay + logging.info(r""" ____ __ _ / _/___ ____ ____ _____/ /_(_)___ ____ ________ ______ _____ _____ diff --git a/rag/utils/infinity_conn.py b/rag/utils/infinity_conn.py index 1ac2f7b33..1a0edd418 100644 --- a/rag/utils/infinity_conn.py +++ b/rag/utils/infinity_conn.py @@ -183,7 +183,7 @@ class InfinityConnection(DocStoreConnection): logger.info(f"Use Infinity {infinity_uri} as the doc engine.") for _ in range(24): try: - connPool = ConnectionPool(infinity_uri, max_size=32) + connPool = ConnectionPool(infinity_uri, max_size=4) inf_conn = connPool.get_conn() res = inf_conn.show_current_node() if res.error_code == ErrorCode.OK and res.server_status in ["started", "alive"]: