diff --git a/deepdoc/parser/mineru_parser.py b/deepdoc/parser/mineru_parser.py index f22c6e48b..aba237dd1 100644 --- a/deepdoc/parser/mineru_parser.py +++ b/deepdoc/parser/mineru_parser.py @@ -511,7 +511,7 @@ class MinerUParser(RAGFlowPdfParser): for output in outputs: match output["type"]: case MinerUContentType.TEXT: - section = output["text"] + section = output.get("text", "") case MinerUContentType.TABLE: section = output.get("table_body", "") + "\n".join(output.get("table_caption", [])) + "\n".join( output.get("table_footnote", [])) @@ -521,13 +521,13 @@ class MinerUParser(RAGFlowPdfParser): section = "".join(output.get("image_caption", [])) + "\n" + "".join( output.get("image_footnote", [])) case MinerUContentType.EQUATION: - section = output["text"] + section = output.get("text", "") case MinerUContentType.CODE: - section = output["code_body"] + "\n".join(output.get("code_caption", [])) + section = output.get("code_body", "") + "\n".join(output.get("code_caption", [])) case MinerUContentType.LIST: section = "\n".join(output.get("list_items", [])) case MinerUContentType.DISCARDED: - pass + continue # Skip discarded blocks entirely if section and parse_method == "manual": sections.append((section, output["type"], self._line_tag(output))) diff --git a/rag/app/naive.py b/rag/app/naive.py index 7756f7a9a..1046a8f62 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -651,7 +651,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, lang="Chinese", ca "parser_config", { "chunk_token_num": 512, "delimiter": "\n!?。;!?", "layout_recognize": "DeepDOC", "analyze_hyperlink": True}) - child_deli = parser_config.get("children_delimiter", "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8') + child_deli = (parser_config.get("children_delimiter") or "").encode('utf-8').decode('unicode_escape').encode('latin1').decode('utf-8') cust_child_deli = re.findall(r"`([^`]+)`", child_deli) child_deli = "|".join(re.sub(r"`([^`]+)`", "", child_deli)) if cust_child_deli: diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index 8668d9178..37c6f5aec 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -1165,6 +1165,19 @@ async def task_manager(): async def main(): + # Stagger executor startup to prevent connection storm to Infinity + # Extract worker number from CONSUMER_NAME (e.g., "task_executor_abc123_5" -> 5) + try: + worker_num = int(CONSUMER_NAME.rsplit("_", 1)[-1]) + # Add random delay: base delay + worker_num * 2.0s + random jitter + # This spreads out connection attempts over several seconds + startup_delay = worker_num * 2.0 + random.uniform(0, 0.5) + if startup_delay > 0: + logging.info(f"Staggering startup by {startup_delay:.2f}s to prevent connection storm") + await asyncio.sleep(startup_delay) + except (ValueError, IndexError): + pass # Non-standard consumer name, skip delay + logging.info(r""" ____ __ _ / _/___ ____ ____ _____/ /_(_)___ ____ ________ ______ _____ _____ diff --git a/rag/utils/infinity_conn.py b/rag/utils/infinity_conn.py index 1ac2f7b33..1a0edd418 100644 --- a/rag/utils/infinity_conn.py +++ b/rag/utils/infinity_conn.py @@ -183,7 +183,7 @@ class InfinityConnection(DocStoreConnection): logger.info(f"Use Infinity {infinity_uri} as the doc engine.") for _ in range(24): try: - connPool = ConnectionPool(infinity_uri, max_size=32) + connPool = ConnectionPool(infinity_uri, max_size=4) inf_conn = connPool.get_conn() res = inf_conn.show_current_node() if res.error_code == ErrorCode.OK and res.server_status in ["started", "alive"]: