From d016a06fd504a67e439f45306d02f6c79aeb5bbd Mon Sep 17 00:00:00 2001 From: Lynn Date: Mon, 10 Nov 2025 12:51:39 +0800 Subject: [PATCH] Feat/monitor task (#11116) ### What problem does this PR solve? Show task executor. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- admin/client/admin_client.py | 18 +++++++++++++++++- admin/client/pyproject.toml | 2 +- admin/server/config.py | 11 +++++++++++ admin/server/services.py | 4 ++++ api/utils/health_utils.py | 24 +++++++++++++++++++++++- conf/service_conf.yaml | 2 ++ rag/svr/task_executor.py | 16 ++++++++++++++++ 7 files changed, 74 insertions(+), 3 deletions(-) diff --git a/admin/client/admin_client.py b/admin/client/admin_client.py index e96d4afbc..098132fd3 100644 --- a/admin/client/admin_client.py +++ b/admin/client/admin_client.py @@ -392,6 +392,21 @@ class AdminCLI(Cmd): print(str(e)) print(f"Can't access {self.host}, port: {self.port}") + def _format_service_detail_table(self, data): + if not any([isinstance(v, list) for v in data.values()]): + # normal table + return data + # handle task_executor heartbeats map, for example {'name': [{'done': 2, 'now': timestamp1}, {'done': 3, 'now': timestamp2}] + task_executor_list = [] + for k, v in data.items(): + # display latest status + heartbeats = sorted(v, key=lambda x: x["now"], reverse=True) + task_executor_list.append({ + "task_executor_name": k, + **heartbeats[0], + }) + return task_executor_list + def _print_table_simple(self, data): if not data: print("No data to print") @@ -595,7 +610,8 @@ class AdminCLI(Cmd): if isinstance(res_data['message'], str): print(res_data['message']) else: - self._print_table_simple(res_data['message']) + data = self._format_service_detail_table(res_data['message']) + self._print_table_simple(data) else: print(f"Service {res_data['service_name']} is down, {res_data['message']}") else: diff --git a/admin/client/pyproject.toml b/admin/client/pyproject.toml index f967cd5b8..6dad77a2b 100644 --- a/admin/client/pyproject.toml +++ b/admin/client/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ragflow-cli" -version = "0.21.1" +version = "0.22.0" description = "Admin Service's client of [RAGFlow](https://github.com/infiniflow/ragflow). The Admin Service provides user management and system monitoring. " authors = [{ name = "Lynn", email = "lynn_inf@hotmail.com" }] license = { text = "Apache License, Version 2.0" } diff --git a/admin/server/config.py b/admin/server/config.py index 5d47f0d66..e2c7d11ef 100644 --- a/admin/server/config.py +++ b/admin/server/config.py @@ -183,11 +183,13 @@ class RAGFlowServerConfig(BaseConfig): class TaskExecutorConfig(BaseConfig): + message_queue_type: str def to_dict(self) -> dict[str, Any]: result = super().to_dict() if 'extra' not in result: result['extra'] = dict() + result['extra']['message_queue_type'] = self.message_queue_type return result @@ -299,6 +301,15 @@ def load_configurations(config_path: str) -> list[BaseConfig]: id_count += 1 case "admin": pass + case "task_executor": + name: str = 'task_executor' + host: str = v.get('host', '') + port: int = v.get('port', 0) + message_queue_type: str = v.get('message_queue_type') + config = TaskExecutorConfig(id=id_count, name=name, host=host, port=port, message_queue_type=message_queue_type, + service_type="task_executor", detail_func_name="check_task_executor_alive") + configurations.append(config) + id_count += 1 case _: logging.warning(f"Unknown configuration key: {k}") continue diff --git a/admin/server/services.py b/admin/server/services.py index 05f408918..e8cf4eb5d 100644 --- a/admin/server/services.py +++ b/admin/server/services.py @@ -192,6 +192,10 @@ class ServiceMgr: config_dict['status'] = 'timeout' except Exception: config_dict['status'] = 'timeout' + if not config_dict['host']: + config_dict['host'] = '-' + if not config_dict['port']: + config_dict['port'] = '-' result.append(config_dict) return result diff --git a/api/utils/health_utils.py b/api/utils/health_utils.py index f9a11323c..88e5aaebb 100644 --- a/api/utils/health_utils.py +++ b/api/utils/health_utils.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from datetime import datetime +import json import os import requests from timeit import default_timer as timer @@ -148,7 +150,7 @@ def check_ragflow_server_alive(): try: url = f'http://{settings.HOST_IP}:{settings.HOST_PORT}/v1/system/ping' if '0.0.0.0' in url: - url.replace('0.0.0.0', '127.0.0.1') + url = url.replace('0.0.0.0', '127.0.0.1') response = requests.get(url) if response.status_code == 200: return {"status": "alive", "message": f"Confirm elapsed: {(timer() - start_time) * 1000.0:.1f} ms."} @@ -161,6 +163,26 @@ def check_ragflow_server_alive(): } +def check_task_executor_alive(): + task_executor_heartbeats = {} + try: + task_executors = REDIS_CONN.smembers("TASKEXE") + now = datetime.now().timestamp() + for task_executor_id in task_executors: + heartbeats = REDIS_CONN.zrangebyscore(task_executor_id, now - 60 * 30, now) + heartbeats = [json.loads(heartbeat) for heartbeat in heartbeats] + task_executor_heartbeats[task_executor_id] = heartbeats + if task_executor_heartbeats: + return {"status": "alive", "message": task_executor_heartbeats} + else: + return {"status": "timeout", "message": "Not found any task executor."} + except Exception as e: + return { + "status": "timeout", + "message": f"error: {str(e)}" + } + + def run_health_checks() -> tuple[dict, bool]: result: dict[str, str | dict] = {} diff --git a/conf/service_conf.yaml b/conf/service_conf.yaml index 15b7c9d06..df6290042 100644 --- a/conf/service_conf.yaml +++ b/conf/service_conf.yaml @@ -32,6 +32,8 @@ redis: db: 1 password: 'infini_rag_flow' host: 'localhost:6379' +task_executor: + message_queue_type: 'redis' user_default_llm: default_models: embedding_model: diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index d0d9e27c5..73b61cd4e 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import socket import concurrent # from beartype import BeartypeConf # from beartype.claw import beartype_all # <-- you didn't sign up for this @@ -963,6 +964,17 @@ async def handle_task(): redis_msg.ack() +async def get_server_ip() -> str: + # get ip by udp + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + s.connect(("8.8.8.8", 80)) + return s.getsockname()[0] + except Exception as e: + logging.error(str(e)) + return 'Unknown' + + async def report_status(): global CONSUMER_NAME, BOOT_AT, PENDING_TASKS, LAG_TASKS, DONE_TASKS, FAILED_TASKS REDIS_CONN.sadd("TASKEXE", CONSUMER_NAME) @@ -975,8 +987,12 @@ async def report_status(): PENDING_TASKS = int(group_info.get("pending", 0)) LAG_TASKS = int(group_info.get("lag", 0)) + pid = os.getpid() + ip_address = await get_server_ip() current = copy.deepcopy(CURRENT_TASKS) heartbeat = json.dumps({ + "ip_address": ip_address, + "pid": pid, "name": CONSUMER_NAME, "now": now.astimezone().isoformat(timespec="milliseconds"), "boot_at": BOOT_AT,