task executor issues (#12006)

### What problem does this PR solve?

**Fixes #8706** - `InfinityException: TOO_MANY_CONNECTIONS` when running
multiple task executor workers

### Problem Description

When running RAGFlow with 8-16 task executor workers, most workers fail
to start properly. Checking logs revealed that workers were
stuck/hanging during Infinity connection initialization - only 1-2
workers would successfully register in Redis while the rest remained
blocked.

### Root Cause

The Infinity SDK `ConnectionPool` pre-allocates all connections in
`__init__`. With the default `max_size=32` and multiple workers (e.g.,
16), this creates 16×32=512 connections immediately on startup,
exceeding Infinity's default 128 connection limit. Workers hang while
waiting for connections that can never be established.

### Changes

1. **Prevent Infinity connection storm** (`rag/utils/infinity_conn.py`,
`rag/svr/task_executor.py`)
- Reduced ConnectionPool `max_size` from 32 to 4 (sufficient since
operations are synchronous)
- Added staggered startup delay (2s per worker) to spread connection
initialization

2. **Handle None children_delimiter** (`rag/app/naive.py`)
   - Use `or ""` to handle explicitly set None values from parser config

3. **MinerU parser robustness** (`deepdoc/parser/mineru_parser.py`)
   - Use `.get()` for optional output fields that may be missing
- Fix DISCARDED block handling: change `pass` to `continue` to skip
discarded blocks entirely

### Why `max_size=4` is sufficient

| Workers | Pool Size | Total Connections | Infinity Limit |
|---------|-----------|-------------------|----------------|
| 16      | 32        | 512               | 128          |
| 16      | 4         | 64                | 128          |
| 32      | 4         | 128               | 128          |

- All RAGFlow operations are synchronous: `get_conn()` → operation →
`release_conn()`
- No parallel `docStoreConn` operations in the codebase
- Maximum 1-2 concurrent connections needed per worker; 4 provides
safety margin

### MinerU DISCARDED block bug

When MinerU returns blocks with `type: "discarded"` (headers, footers,
watermarks, page numbers, artifacts), the previous code used `pass`
which left the `section` variable undefined, causing:

- **UnboundLocalError** if DISCARDED is the first block
- **Duplicate content** if DISCARDED follows another block (stale value
from previous iteration)

**Root cause confirmed via MinerU source code:**

From
[`mineru/utils/enum_class.py`](https://github.com/opendatalab/MinerU/blob/main/mineru/utils/enum_class.py#L14):
```python
class BlockType:
    DISCARDED = 'discarded'
    # VLM 2.5+ also has: HEADER, FOOTER, PAGE_NUMBER, ASIDE_TEXT, PAGE_FOOTNOTE
```

Per [MinerU
documentation](https://opendatalab.github.io/MinerU/reference/output_files/),
discarded blocks contain content that should be filtered out for clean
text extraction.

**Fix:** Changed `pass` to `continue` to skip discarded blocks entirely.

### Testing

- Verified all 16 workers now register successfully in Redis
- All workers heartbeating correctly
- Document parsing works as expected
- MinerU parsing with DISCARDED blocks no longer crashes

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

---------

Co-authored-by: user210 <user210@rt>
This commit is contained in:
concertdictate
2025-12-18 04:03:30 +02:00
committed by GitHub
parent 1a4822d6be
commit 4dd8cdc38b
4 changed files with 19 additions and 6 deletions

View File

@ -511,7 +511,7 @@ class MinerUParser(RAGFlowPdfParser):
for output in outputs:
match output["type"]:
case MinerUContentType.TEXT:
section = output["text"]
section = output.get("text", "")
case MinerUContentType.TABLE:
section = output.get("table_body", "") + "\n".join(output.get("table_caption", [])) + "\n".join(
output.get("table_footnote", []))
@ -521,13 +521,13 @@ class MinerUParser(RAGFlowPdfParser):
section = "".join(output.get("image_caption", [])) + "\n" + "".join(
output.get("image_footnote", []))
case MinerUContentType.EQUATION:
section = output["text"]
section = output.get("text", "")
case MinerUContentType.CODE:
section = output["code_body"] + "\n".join(output.get("code_caption", []))
section = output.get("code_body", "") + "\n".join(output.get("code_caption", []))
case MinerUContentType.LIST:
section = "\n".join(output.get("list_items", []))
case MinerUContentType.DISCARDED:
pass
continue # Skip discarded blocks entirely
if section and parse_method == "manual":
sections.append((section, output["type"], self._line_tag(output)))

View File

@ -651,7 +651,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca
"parser_config", {
"chunk_token_num": 512, "delimiter": "\n!?。;!?", "layout_recognize": "DeepDOC", "analyze_hyperlink": True})
child_deli = parser_config.get("children_delimiter", "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8')
child_deli = (parser_config.get("children_delimiter") or "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8')
cust_child_deli = re.findall(r"`([^`]+)`", child_deli)
child_deli = "|".join(re.sub(r"`([^`]+)`", "", child_deli))
if cust_child_deli:

View File

@ -1165,6 +1165,19 @@ async def task_manager():
async def main():
# Stagger executor startup to prevent connection storm to Infinity
# Extract worker number from CONSUMER_NAME (e.g., "task_executor_abc123_5" -> 5)
try:
worker_num = int(CONSUMER_NAME.rsplit("_", 1)[-1])
# Add random delay: base delay + worker_num * 2.0s + random jitter
# This spreads out connection attempts over several seconds
startup_delay = worker_num * 2.0 + random.uniform(0, 0.5)
if startup_delay > 0:
logging.info(f"Staggering startup by {startup_delay:.2f}s to prevent connection storm")
await asyncio.sleep(startup_delay)
except (ValueError, IndexError):
pass # Non-standard consumer name, skip delay
logging.info(r"""
____ __ _
/ _/___ ____ ____ _____/ /_(_)___ ____ ________ ______ _____ _____

View File

@ -183,7 +183,7 @@ class InfinityConnection(DocStoreConnection):
logger.info(f"Use Infinity {infinity_uri} as the doc engine.")
for _ in range(24):
try:
connPool = ConnectionPool(infinity_uri, max_size=32)
connPool = ConnectionPool(infinity_uri, max_size=4)
inf_conn = connPool.get_conn()
res = inf_conn.show_current_node()
if res.error_code == ErrorCode.OK and res.server_status in ["started", "alive"]: