From fb950079eff953f29a17d079f1ae94a61f515b60 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 30 Sep 2025 16:23:09 +0800 Subject: [PATCH] Feat/service manage (#10381) ### What problem does this PR solve? - Admin service support SHOW SERVICE . ### Type of change - [x] New Feature (non-breaking change which adds functionality) issue: #10241 --- admin/admin_client.py | 16 ++++ admin/config.py | 15 ++-- admin/services.py | 18 ++++- api/apps/system_app.py | 5 ++ api/utils/common.py | 23 ++++++ api/utils/health_utils.py | 97 +++++++++++++++++++++++- docs/guides/manage_users_and_services.md | 0 rag/utils/es_conn.py | 50 ++++++++++++ rag/utils/redis_conn.py | 14 ++++ 9 files changed, 229 insertions(+), 9 deletions(-) create mode 100644 docs/guides/manage_users_and_services.md diff --git a/admin/admin_client.py b/admin/admin_client.py index 59e0893a1..007b73e29 100644 --- a/admin/admin_client.py +++ b/admin/admin_client.py @@ -390,6 +390,22 @@ class AdminCLI: service_id: int = command['number'] print(f"Showing service: {service_id}") + url = f'http://{self.host}:{self.port}/api/v1/admin/services/{service_id}' + response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) + res_json = response.json() + if response.status_code == 200: + res_data = res_json['data'] + if res_data['alive']: + print(f"Service {res_data['service_name']} is alive. Detail:") + if isinstance(res_data['message'], str): + print(res_data['message']) + else: + self._print_table_simple(res_data['message']) + else: + print(f"Service {res_data['service_name']} is down. Detail: {res_data['message']}") + else: + print(f"Fail to show service, code: {res_json['code']}, message: {res_json['message']}") + def _handle_restart_service(self, command): service_id: int = command['number'] print(f"Restart service {service_id}") diff --git a/admin/config.py b/admin/config.py index 570807737..94147de8e 100644 --- a/admin/config.py +++ b/admin/config.py @@ -32,6 +32,7 @@ class BaseConfig(BaseModel): host: str port: int service_type: str + detail_func_name: str def to_dict(self) -> dict[str, Any]: return {'id': self.id, 'name': self.name, 'host': self.host, 'port': self.port, 'service_type': self.service_type} @@ -209,7 +210,8 @@ def load_configurations(config_path: str) -> list[BaseConfig]: name: str = f'ragflow_{ragflow_count}' host: str = v['host'] http_port: int = v['http_port'] - config = RAGFlowServerConfig(id=id_count, name=name, host=host, port=http_port, service_type="ragflow_server") + config = RAGFlowServerConfig(id=id_count, name=name, host=host, port=http_port, + service_type="ragflow_server", detail_func_name="check_ragflow_server_alive") configurations.append(config) id_count += 1 case "es": @@ -222,7 +224,8 @@ def load_configurations(config_path: str) -> list[BaseConfig]: password: str = v.get('password') config = ElasticsearchConfig(id=id_count, name=name, host=host, port=port, service_type="retrieval", retrieval_type="elasticsearch", - username=username, password=password) + username=username, password=password, + detail_func_name="get_es_cluster_stats") configurations.append(config) id_count += 1 @@ -234,7 +237,7 @@ def load_configurations(config_path: str) -> list[BaseConfig]: port = int(parts[1]) database: str = v.get('db_name', 'default_db') config = InfinityConfig(id=id_count, name=name, host=host, port=port, service_type="retrieval", retrieval_type="infinity", - db_name=database) + db_name=database, detail_func_name="get_infinity_status") configurations.append(config) id_count += 1 case "minio": @@ -246,7 +249,7 @@ def load_configurations(config_path: str) -> list[BaseConfig]: user = v.get('user') password = v.get('password') config = MinioConfig(id=id_count, name=name, host=host, port=port, user=user, password=password, service_type="file_store", - store_type="minio") + store_type="minio", detail_func_name="check_minio_alive") configurations.append(config) id_count += 1 case "redis": @@ -258,7 +261,7 @@ def load_configurations(config_path: str) -> list[BaseConfig]: password = v.get('password') db: int = v.get('db') config = RedisConfig(id=id_count, name=name, host=host, port=port, password=password, database=db, - service_type="message_queue", mq_type="redis") + service_type="message_queue", mq_type="redis", detail_func_name="get_redis_info") configurations.append(config) id_count += 1 case "mysql": @@ -268,7 +271,7 @@ def load_configurations(config_path: str) -> list[BaseConfig]: username = v.get('user') password = v.get('password') config = MySQLConfig(id=id_count, name=name, host=host, port=port, username=username, password=password, - service_type="meta_data", meta_type="mysql") + service_type="meta_data", meta_type="mysql", detail_func_name="get_mysql_status") configurations.append(config) id_count += 1 case "admin": diff --git a/admin/services.py b/admin/services.py index 24151d46c..ec98b10c5 100644 --- a/admin/services.py +++ b/admin/services.py @@ -7,6 +7,7 @@ from api.db.services.canvas_service import UserCanvasService from api.db.services.user_service import TenantService from api.db.services.knowledgebase_service import KnowledgebaseService from api.utils.crypt import decrypt +from api.utils import health_utils from api.common.exceptions import AdminException, UserAlreadyExistsError, UserNotFoundError from config import SERVICE_CONFIGS @@ -164,7 +165,22 @@ class ServiceMgr: @staticmethod def get_service_details(service_id: int): - raise AdminException("get_service_details: not implemented") + service_id = int(service_id) + configs = SERVICE_CONFIGS.configs + service_config_mapping = { + c.id: { + 'name': c.name, + 'detail_func_name': c.detail_func_name + } for c in configs + } + service_info = service_config_mapping.get(service_id, {}) + if not service_info: + raise AdminException(f"Invalid service_id: {service_id}") + + detail_func = getattr(health_utils, service_info.get('detail_func_name')) + res = detail_func() + res.update({'service_name': service_info.get('name')}) + return res @staticmethod def shutdown_service(service_id: int): diff --git a/api/apps/system_app.py b/api/apps/system_app.py index 0ec808fb9..fa2b5f116 100644 --- a/api/apps/system_app.py +++ b/api/apps/system_app.py @@ -177,6 +177,11 @@ def healthz(): return jsonify(result), (200 if all_ok else 500) +@manager.route("/ping", methods=["GET"]) # noqa: F821 +def ping(): + return "pong", 200 + + @manager.route("/new_token", methods=["POST"]) # noqa: F821 @login_required def new_token(): diff --git a/api/utils/common.py b/api/utils/common.py index ce7428507..ae1d7b44d 100644 --- a/api/utils/common.py +++ b/api/utils/common.py @@ -21,3 +21,26 @@ def string_to_bytes(string): def bytes_to_string(byte): return byte.decode(encoding="utf-8") + + +def convert_bytes(size_in_bytes: int) -> str: + """ + Format size in bytes. + """ + if size_in_bytes == 0: + return "0 B" + + units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] + i = 0 + size = float(size_in_bytes) + + while size >= 1024 and i < len(units) - 1: + size /= 1024 + i += 1 + + if i == 0 or size >= 100: + return f"{size:.0f} {units[i]}" + elif size >= 10: + return f"{size:.1f} {units[i]}" + else: + return f"{size:.2f} {units[i]}" diff --git a/api/utils/health_utils.py b/api/utils/health_utils.py index 967fa71b7..1396d7595 100644 --- a/api/utils/health_utils.py +++ b/api/utils/health_utils.py @@ -13,14 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # - - +import os +import requests from timeit import default_timer as timer from api import settings from api.db.db_models import DB +from rag import settings as rag_settings from rag.utils.redis_conn import REDIS_CONN from rag.utils.storage_factory import STORAGE_IMPL +from rag.utils.es_conn import ESConnection +from rag.utils.infinity_conn import InfinityConnection def _ok_nok(ok: bool) -> str: @@ -65,6 +68,96 @@ def check_storage() -> tuple[bool, dict]: return False, {"elapsed": f"{(timer() - st) * 1000.0:.1f}", "error": str(e)} +def get_es_cluster_stats() -> dict: + doc_engine = os.getenv('DOC_ENGINE', 'elasticsearch') + if doc_engine != 'elasticsearch': + raise Exception("Elasticsearch is not in use.") + try: + return { + "alive": True, + "message": ESConnection().get_cluster_stats() + } + except Exception as e: + return { + "alive": False, + "message": f"error: {str(e)}", + } + + +def get_infinity_status(): + doc_engine = os.getenv('DOC_ENGINE', 'elasticsearch') + if doc_engine != 'infinity': + raise Exception("Infinity is not in use.") + try: + return { + "alive": True, + "message": InfinityConnection().health() + } + except Exception as e: + return { + "alive": False, + "message": f"error: {str(e)}", + } + + +def get_mysql_status(): + try: + cursor = DB.execute_sql("SHOW PROCESSLIST;") + res_rows = cursor.fetchall() + headers = ['id', 'user', 'host', 'db', 'command', 'time', 'state', 'info'] + cursor.close() + return { + "alive": True, + "message": [dict(zip(headers, r)) for r in res_rows] + } + except Exception as e: + return { + "alive": False, + "message": f"error: {str(e)}", + } + + +def check_minio_alive(): + start_time = timer() + try: + response = requests.get(f'http://{rag_settings.MINIO["host"]}/minio/health/live') + if response.status_code == 200: + return {'alive': True, "message": f"Confirm elapsed: {(timer() - start_time) * 1000.0:.1f} ms."} + else: + return {'alive': False, "message": f"Confirm elapsed: {(timer() - start_time) * 1000.0:.1f} ms."} + except Exception as e: + return { + "alive": False, + "message": f"error: {str(e)}", + } + + +def get_redis_info(): + try: + return { + "alive": True, + "message": REDIS_CONN.info() + } + except Exception as e: + return { + "alive": False, + "message": f"error: {str(e)}", + } + + +def check_ragflow_server_alive(): + start_time = timer() + try: + response = requests.get(f'http://{settings.HOST_IP}:{settings.HOST_PORT}/v1/system/ping') + if response.status_code == 200: + return {'alive': True, "message": f"Confirm elapsed: {(timer() - start_time) * 1000.0:.1f} ms."} + else: + return {'alive': False, "message": f"Confirm elapsed: {(timer() - start_time) * 1000.0:.1f} ms."} + except Exception as e: + return { + "alive": False, + "message": f"error: {str(e)}", + } def run_health_checks() -> tuple[dict, bool]: diff --git a/docs/guides/manage_users_and_services.md b/docs/guides/manage_users_and_services.md new file mode 100644 index 000000000..e69de29bb diff --git a/rag/utils/es_conn.py b/rag/utils/es_conn.py index 47d4f1a4f..81ec1533e 100644 --- a/rag/utils/es_conn.py +++ b/rag/utils/es_conn.py @@ -28,6 +28,7 @@ from rag import settings from rag.settings import TAG_FLD, PAGERANK_FLD from rag.utils import singleton, get_float from api.utils.file_utils import get_project_base_directory +from api.utils.common import convert_bytes from rag.utils.doc_store_conn import DocStoreConnection, MatchExpr, OrderByExpr, MatchTextExpr, MatchDenseExpr, \ FusionExpr from rag.nlp import is_english, rag_tokenizer @@ -579,3 +580,52 @@ class ESConnection(DocStoreConnection): break logger.error(f"ESConnection.sql timeout for {ATTEMPT_TIME} times!") return None + + def get_cluster_stats(self): + """ + curl -XGET "http://{es_host}/_cluster/stats" -H "kbn-xsrf: reporting" to view raw stats. + """ + raw_stats = self.es.cluster.stats() + logger.debug(f"ESConnection.get_cluster_stats: {raw_stats}") + try: + res = { + 'cluster_name': raw_stats['cluster_name'], + 'status': raw_stats['status'] + } + indices_status = raw_stats['indices'] + res.update({ + 'indices': indices_status['count'], + 'indices_shards': indices_status['shards']['total'] + }) + doc_info = indices_status['docs'] + res.update({ + 'docs': doc_info['count'], + 'docs_deleted': doc_info['deleted'] + }) + store_info = indices_status['store'] + res.update({ + 'store_size': convert_bytes(store_info['size_in_bytes']), + 'total_dataset_size': convert_bytes(store_info['total_data_set_size_in_bytes']) + }) + mappings_info = indices_status['mappings'] + res.update({ + 'mappings_fields': mappings_info['total_field_count'], + 'mappings_deduplicated_fields': mappings_info['total_deduplicated_field_count'], + 'mappings_deduplicated_size': convert_bytes(mappings_info['total_deduplicated_mapping_size_in_bytes']) + }) + node_info = raw_stats['nodes'] + res.update({ + 'nodes': node_info['count']['total'], + 'nodes_version': node_info['versions'], + 'os_mem': convert_bytes(node_info['os']['mem']['total_in_bytes']), + 'os_mem_used': convert_bytes(node_info['os']['mem']['used_in_bytes']), + 'os_mem_used_percent': node_info['os']['mem']['used_percent'], + 'jvm_versions': node_info['jvm']['versions'][0]['vm_version'], + 'jvm_heap_used': convert_bytes(node_info['jvm']['mem']['heap_used_in_bytes']), + 'jvm_heap_max': convert_bytes(node_info['jvm']['mem']['heap_max_in_bytes']) + }) + return res + + except Exception as e: + logger.exception(f"ESConnection.get_cluster_stats: {e}") + return None diff --git a/rag/utils/redis_conn.py b/rag/utils/redis_conn.py index 3d0d10b17..17248db6d 100644 --- a/rag/utils/redis_conn.py +++ b/rag/utils/redis_conn.py @@ -91,6 +91,20 @@ class RedisDB: if self.REDIS.get(a) == b: return True + def info(self): + info = self.REDIS.info() + return { + 'redis_version': info["redis_version"], + 'server_mode': info["server_mode"], + 'used_memory': info["used_memory_human"], + 'total_system_memory': info["total_system_memory_human"], + 'mem_fragmentation_ratio': info["mem_fragmentation_ratio"], + 'connected_clients': info["connected_clients"], + 'blocked_clients': info["blocked_clients"], + 'instantaneous_ops_per_sec': info["instantaneous_ops_per_sec"], + 'total_commands_processed': info["total_commands_processed"] + } + def is_alive(self): return self.REDIS is not None