Feat/service manage (#10381)

### What problem does this PR solve?

- Admin service support SHOW SERVICE <id>.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

issue: #10241
This commit is contained in:
Lynn
2025-09-30 16:23:09 +08:00
committed by GitHub
parent aec8c15e7e
commit fb950079ef
9 changed files with 229 additions and 9 deletions

View File

@ -390,6 +390,22 @@ class AdminCLI:
service_id: int = command['number']
print(f"Showing service: {service_id}")
url = f'http://{self.host}:{self.port}/api/v1/admin/services/{service_id}'
response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password))
res_json = response.json()
if response.status_code == 200:
res_data = res_json['data']
if res_data['alive']:
print(f"Service {res_data['service_name']} is alive. Detail:")
if isinstance(res_data['message'], str):
print(res_data['message'])
else:
self._print_table_simple(res_data['message'])
else:
print(f"Service {res_data['service_name']} is down. Detail: {res_data['message']}")
else:
print(f"Fail to show service, code: {res_json['code']}, message: {res_json['message']}")
def _handle_restart_service(self, command):
service_id: int = command['number']
print(f"Restart service {service_id}")

View File

@ -32,6 +32,7 @@ class BaseConfig(BaseModel):
host: str
port: int
service_type: str
detail_func_name: str
def to_dict(self) -> dict[str, Any]:
return {'id': self.id, 'name': self.name, 'host': self.host, 'port': self.port, 'service_type': self.service_type}
@ -209,7 +210,8 @@ def load_configurations(config_path: str) -> list[BaseConfig]:
name: str = f'ragflow_{ragflow_count}'
host: str = v['host']
http_port: int = v['http_port']
config = RAGFlowServerConfig(id=id_count, name=name, host=host, port=http_port, service_type="ragflow_server")
config = RAGFlowServerConfig(id=id_count, name=name, host=host, port=http_port,
service_type="ragflow_server", detail_func_name="check_ragflow_server_alive")
configurations.append(config)
id_count += 1
case "es":
@ -222,7 +224,8 @@ def load_configurations(config_path: str) -> list[BaseConfig]:
password: str = v.get('password')
config = ElasticsearchConfig(id=id_count, name=name, host=host, port=port, service_type="retrieval",
retrieval_type="elasticsearch",
username=username, password=password)
username=username, password=password,
detail_func_name="get_es_cluster_stats")
configurations.append(config)
id_count += 1
@ -234,7 +237,7 @@ def load_configurations(config_path: str) -> list[BaseConfig]:
port = int(parts[1])
database: str = v.get('db_name', 'default_db')
config = InfinityConfig(id=id_count, name=name, host=host, port=port, service_type="retrieval", retrieval_type="infinity",
db_name=database)
db_name=database, detail_func_name="get_infinity_status")
configurations.append(config)
id_count += 1
case "minio":
@ -246,7 +249,7 @@ def load_configurations(config_path: str) -> list[BaseConfig]:
user = v.get('user')
password = v.get('password')
config = MinioConfig(id=id_count, name=name, host=host, port=port, user=user, password=password, service_type="file_store",
store_type="minio")
store_type="minio", detail_func_name="check_minio_alive")
configurations.append(config)
id_count += 1
case "redis":
@ -258,7 +261,7 @@ def load_configurations(config_path: str) -> list[BaseConfig]:
password = v.get('password')
db: int = v.get('db')
config = RedisConfig(id=id_count, name=name, host=host, port=port, password=password, database=db,
service_type="message_queue", mq_type="redis")
service_type="message_queue", mq_type="redis", detail_func_name="get_redis_info")
configurations.append(config)
id_count += 1
case "mysql":
@ -268,7 +271,7 @@ def load_configurations(config_path: str) -> list[BaseConfig]:
username = v.get('user')
password = v.get('password')
config = MySQLConfig(id=id_count, name=name, host=host, port=port, username=username, password=password,
service_type="meta_data", meta_type="mysql")
service_type="meta_data", meta_type="mysql", detail_func_name="get_mysql_status")
configurations.append(config)
id_count += 1
case "admin":

View File

@ -7,6 +7,7 @@ from api.db.services.canvas_service import UserCanvasService
from api.db.services.user_service import TenantService
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.utils.crypt import decrypt
from api.utils import health_utils
from api.common.exceptions import AdminException, UserAlreadyExistsError, UserNotFoundError
from config import SERVICE_CONFIGS
@ -164,7 +165,22 @@ class ServiceMgr:
@staticmethod
def get_service_details(service_id: int):
raise AdminException("get_service_details: not implemented")
service_id = int(service_id)
configs = SERVICE_CONFIGS.configs
service_config_mapping = {
c.id: {
'name': c.name,
'detail_func_name': c.detail_func_name
} for c in configs
}
service_info = service_config_mapping.get(service_id, {})
if not service_info:
raise AdminException(f"Invalid service_id: {service_id}")
detail_func = getattr(health_utils, service_info.get('detail_func_name'))
res = detail_func()
res.update({'service_name': service_info.get('name')})
return res
@staticmethod
def shutdown_service(service_id: int):

View File

@ -177,6 +177,11 @@ def healthz():
return jsonify(result), (200 if all_ok else 500)
@manager.route("/ping", methods=["GET"]) # noqa: F821
def ping():
return "pong", 200
@manager.route("/new_token", methods=["POST"]) # noqa: F821
@login_required
def new_token():

View File

@ -21,3 +21,26 @@ def string_to_bytes(string):
def bytes_to_string(byte):
return byte.decode(encoding="utf-8")
def convert_bytes(size_in_bytes: int) -> str:
"""
Format size in bytes.
"""
if size_in_bytes == 0:
return "0 B"
units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
i = 0
size = float(size_in_bytes)
while size >= 1024 and i < len(units) - 1:
size /= 1024
i += 1
if i == 0 or size >= 100:
return f"{size:.0f} {units[i]}"
elif size >= 10:
return f"{size:.1f} {units[i]}"
else:
return f"{size:.2f} {units[i]}"

View File

@ -13,14 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import requests
from timeit import default_timer as timer
from api import settings
from api.db.db_models import DB
from rag import settings as rag_settings
from rag.utils.redis_conn import REDIS_CONN
from rag.utils.storage_factory import STORAGE_IMPL
from rag.utils.es_conn import ESConnection
from rag.utils.infinity_conn import InfinityConnection
def _ok_nok(ok: bool) -> str:
@ -65,6 +68,96 @@ def check_storage() -> tuple[bool, dict]:
return False, {"elapsed": f"{(timer() - st) * 1000.0:.1f}", "error": str(e)}
def get_es_cluster_stats() -> dict:
doc_engine = os.getenv('DOC_ENGINE', 'elasticsearch')
if doc_engine != 'elasticsearch':
raise Exception("Elasticsearch is not in use.")
try:
return {
"alive": True,
"message": ESConnection().get_cluster_stats()
}
except Exception as e:
return {
"alive": False,
"message": f"error: {str(e)}",
}
def get_infinity_status():
doc_engine = os.getenv('DOC_ENGINE', 'elasticsearch')
if doc_engine != 'infinity':
raise Exception("Infinity is not in use.")
try:
return {
"alive": True,
"message": InfinityConnection().health()
}
except Exception as e:
return {
"alive": False,
"message": f"error: {str(e)}",
}
def get_mysql_status():
try:
cursor = DB.execute_sql("SHOW PROCESSLIST;")
res_rows = cursor.fetchall()
headers = ['id', 'user', 'host', 'db', 'command', 'time', 'state', 'info']
cursor.close()
return {
"alive": True,
"message": [dict(zip(headers, r)) for r in res_rows]
}
except Exception as e:
return {
"alive": False,
"message": f"error: {str(e)}",
}
def check_minio_alive():
start_time = timer()
try:
response = requests.get(f'http://{rag_settings.MINIO["host"]}/minio/health/live')
if response.status_code == 200:
return {'alive': True, "message": f"Confirm elapsed: {(timer() - start_time) * 1000.0:.1f} ms."}
else:
return {'alive': False, "message": f"Confirm elapsed: {(timer() - start_time) * 1000.0:.1f} ms."}
except Exception as e:
return {
"alive": False,
"message": f"error: {str(e)}",
}
def get_redis_info():
try:
return {
"alive": True,
"message": REDIS_CONN.info()
}
except Exception as e:
return {
"alive": False,
"message": f"error: {str(e)}",
}
def check_ragflow_server_alive():
start_time = timer()
try:
response = requests.get(f'http://{settings.HOST_IP}:{settings.HOST_PORT}/v1/system/ping')
if response.status_code == 200:
return {'alive': True, "message": f"Confirm elapsed: {(timer() - start_time) * 1000.0:.1f} ms."}
else:
return {'alive': False, "message": f"Confirm elapsed: {(timer() - start_time) * 1000.0:.1f} ms."}
except Exception as e:
return {
"alive": False,
"message": f"error: {str(e)}",
}
def run_health_checks() -> tuple[dict, bool]:

View File

View File

@ -28,6 +28,7 @@ from rag import settings
from rag.settings import TAG_FLD, PAGERANK_FLD
from rag.utils import singleton, get_float
from api.utils.file_utils import get_project_base_directory
from api.utils.common import convert_bytes
from rag.utils.doc_store_conn import DocStoreConnection, MatchExpr, OrderByExpr, MatchTextExpr, MatchDenseExpr, \
FusionExpr
from rag.nlp import is_english, rag_tokenizer
@ -579,3 +580,52 @@ class ESConnection(DocStoreConnection):
break
logger.error(f"ESConnection.sql timeout for {ATTEMPT_TIME} times!")
return None
def get_cluster_stats(self):
"""
curl -XGET "http://{es_host}/_cluster/stats" -H "kbn-xsrf: reporting" to view raw stats.
"""
raw_stats = self.es.cluster.stats()
logger.debug(f"ESConnection.get_cluster_stats: {raw_stats}")
try:
res = {
'cluster_name': raw_stats['cluster_name'],
'status': raw_stats['status']
}
indices_status = raw_stats['indices']
res.update({
'indices': indices_status['count'],
'indices_shards': indices_status['shards']['total']
})
doc_info = indices_status['docs']
res.update({
'docs': doc_info['count'],
'docs_deleted': doc_info['deleted']
})
store_info = indices_status['store']
res.update({
'store_size': convert_bytes(store_info['size_in_bytes']),
'total_dataset_size': convert_bytes(store_info['total_data_set_size_in_bytes'])
})
mappings_info = indices_status['mappings']
res.update({
'mappings_fields': mappings_info['total_field_count'],
'mappings_deduplicated_fields': mappings_info['total_deduplicated_field_count'],
'mappings_deduplicated_size': convert_bytes(mappings_info['total_deduplicated_mapping_size_in_bytes'])
})
node_info = raw_stats['nodes']
res.update({
'nodes': node_info['count']['total'],
'nodes_version': node_info['versions'],
'os_mem': convert_bytes(node_info['os']['mem']['total_in_bytes']),
'os_mem_used': convert_bytes(node_info['os']['mem']['used_in_bytes']),
'os_mem_used_percent': node_info['os']['mem']['used_percent'],
'jvm_versions': node_info['jvm']['versions'][0]['vm_version'],
'jvm_heap_used': convert_bytes(node_info['jvm']['mem']['heap_used_in_bytes']),
'jvm_heap_max': convert_bytes(node_info['jvm']['mem']['heap_max_in_bytes'])
})
return res
except Exception as e:
logger.exception(f"ESConnection.get_cluster_stats: {e}")
return None

View File

@ -91,6 +91,20 @@ class RedisDB:
if self.REDIS.get(a) == b:
return True
def info(self):
info = self.REDIS.info()
return {
'redis_version': info["redis_version"],
'server_mode': info["server_mode"],
'used_memory': info["used_memory_human"],
'total_system_memory': info["total_system_memory_human"],
'mem_fragmentation_ratio': info["mem_fragmentation_ratio"],
'connected_clients': info["connected_clients"],
'blocked_clients': info["blocked_clients"],
'instantaneous_ops_per_sec': info["instantaneous_ops_per_sec"],
'total_commands_processed': info["total_commands_processed"]
}
def is_alive(self):
return self.REDIS is not None