From 89f438fe456ec0c0eae6194eb278085e03d45948 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Thu, 22 Jan 2026 00:18:29 +0800 Subject: [PATCH] Add ping command to test ping API (#12757) ### What problem does this PR solve? As title. ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Signed-off-by: Jin Hai --- admin/client/http_client.py | 59 ++++++++++++++------------ admin/client/parser.py | 18 +++++++- admin/client/ragflow_cli.py | 11 ++--- admin/client/ragflow_client.py | 75 ++++++++++++++++++++++++++-------- api/apps/system_app.py | 2 +- 5 files changed, 110 insertions(+), 55 deletions(-) diff --git a/admin/client/http_client.py b/admin/client/http_client.py index 97012c297..ec94a1df0 100644 --- a/admin/client/http_client.py +++ b/admin/client/http_client.py @@ -16,9 +16,10 @@ import time import json -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, Optional import requests +# from requests.sessions import HTTPAdapter class HttpClient: @@ -85,42 +86,46 @@ class HttpClient: ) -> requests.Response | dict: url = self.build_url(path, use_api_base=use_api_base) merged_headers = self._headers(auth_kind, headers) - timeout: Tuple[float, float] = (self.connect_timeout, self.read_timeout) + # timeout: Tuple[float, float] = (self.connect_timeout, self.read_timeout) + session = requests.Session() + # adapter = HTTPAdapter(pool_connections=100, pool_maxsize=100) + # session.mount("http://", adapter) if iterations > 1: response_list = [] total_duration = 0.0 for _ in range(iterations): start_time = time.perf_counter() - response = requests.request( - method=method, - url=url, - headers=merged_headers, - json=json_body, - data=data, - files=files, - params=params, - timeout=timeout, - stream=stream, - verify=self.verify_ssl, - ) + response = session.get(url, headers=merged_headers, json=json_body, data=data, stream=stream) + # response = requests.request( + # method=method, + # url=url, + # headers=merged_headers, + # json=json_body, + # data=data, + # files=files, + # params=params, + # timeout=timeout, + # stream=stream, + # verify=self.verify_ssl, + # ) end_time = time.perf_counter() total_duration += end_time - start_time response_list.append(response) return {"duration": total_duration, "response_list": response_list} else: - return requests.request( - method=method, - url=url, - headers=merged_headers, - json=json_body, - data=data, - files=files, - params=params, - timeout=timeout, - stream=stream, - verify=self.verify_ssl, - ) - + return session.get(url, headers=merged_headers, json=json_body, data=data, stream=stream) + # return requests.request( + # method=method, + # url=url, + # headers=merged_headers, + # json=json_body, + # data=data, + # files=files, + # params=params, + # timeout=timeout, + # stream=stream, + # verify=self.verify_ssl, + # ) def request_json( self, diff --git a/admin/client/parser.py b/admin/client/parser.py index cf6bf31f9..d1d5c6262 100644 --- a/admin/client/parser.py +++ b/admin/client/parser.py @@ -21,7 +21,9 @@ start: command command: sql_command | meta_command -sql_command: list_services +sql_command: login_user + | ping_server + | list_services | show_service | startup_service | shutdown_service @@ -98,6 +100,7 @@ meta_arg: /[^\\s"']+/ | quoted_string // command definition +LOGIN: "LOGIN"i REGISTER: "REGISTER"i LIST: "LIST"i SERVICES: "SERVICES"i @@ -166,7 +169,9 @@ TTS: "TTS"i ASYNC: "ASYNC"i SYNC: "SYNC"i BENCHMARK: "BENCHMARK"i +PING: "PING"i +login_user: LOGIN USER quoted_string ";" list_services: LIST SERVICES ";" show_service: SHOW SERVICE NUMBER ";" startup_service: STARTUP SERVICE NUMBER ";" @@ -212,7 +217,8 @@ list_environments: LIST ENVS ";" benchmark: BENCHMARK NUMBER NUMBER user_statement -user_statement: show_current_user +user_statement: ping_server + | show_current_user | create_model_provider | drop_model_provider | set_default_llm @@ -241,6 +247,7 @@ user_statement: show_current_user | import_docs_into_dataset | search_on_datasets +ping_server: PING ";" show_current_user: SHOW CURRENT USER ";" create_model_provider: CREATE MODEL PROVIDER quoted_string quoted_string ";" drop_model_provider: DROP MODEL PROVIDER quoted_string ";" @@ -298,6 +305,13 @@ class RAGFlowCLITransformer(Transformer): def command(self, items): return items[0] + def login_user(self, items): + email = items[2].children[0].strip("'\"") + return {"type": "login_user", "email": email} + + def ping_server(self, items): + return {"type": "ping_server"} + def list_services(self, items): result = {"type": "list_services"} return result diff --git a/admin/client/ragflow_cli.py b/admin/client/ragflow_cli.py index ac3d49b30..38c32ddff 100644 --- a/admin/client/ragflow_cli.py +++ b/admin/client/ragflow_cli.py @@ -54,7 +54,6 @@ class RAGFlowCLI(Cmd): super().__init__() self.parser = Lark(GRAMMAR, start="start", parser="lalr", transformer=RAGFlowCLITransformer()) self.command_history = [] - self.is_interactive = False self.account = "admin@ragflow.io" self.account_password: str = "admin" self.session = requests.Session() @@ -212,7 +211,6 @@ class RAGFlowCLI(Cmd): print(separator) def run_interactive(self, args): - self.is_interactive = True if self.verify_auth(args, single_command=False, auth=args["auth"]): print(r""" ____ ___ ______________ ________ ____ @@ -226,7 +224,6 @@ class RAGFlowCLI(Cmd): print("RAGFlow command line interface - Type '\\?' for help, '\\q' to quit") def run_single_command(self, args): - self.is_interactive = False if self.verify_auth(args, single_command=True, auth=args["auth"]): command = args["command"] result = self.parse_command(command) @@ -272,15 +269,15 @@ class RAGFlowCLI(Cmd): else: return {"error": "Invalid command"} else: + auth = True if username is None: - print("Error: username (-u) is required in user mode") - return {"error": "Username required"} + auth = False return { "host": parsed_args.host, "port": parsed_args.port, "type": parsed_args.type, "username": username, - "auth": True + "auth": auth } except SystemExit: return {"error": "Invalid connection arguments"} @@ -297,7 +294,7 @@ class RAGFlowCLI(Cmd): command_dict = parsed_command # print(f"Parsed command: {command_dict}") - run_command(self.ragflow_client, command_dict, self.is_interactive) + run_command(self.ragflow_client, command_dict) def main(): diff --git a/admin/client/ragflow_client.py b/admin/client/ragflow_client.py index 6830b279a..8f4f604e8 100644 --- a/admin/client/ragflow_client.py +++ b/admin/client/ragflow_client.py @@ -22,8 +22,9 @@ import urllib.parse from pathlib import Path from http_client import HttpClient from lark import Tree -from user import encrypt_password +from user import encrypt_password, login_user +import getpass import base64 from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 from Cryptodome.PublicKey import RSA @@ -48,6 +49,31 @@ class RAGFlowClient: self.http_client = http_client self.server_type = server_type + def login_user(self, command): + email : str = command["email"] + user_password = getpass.getpass(f"password for {email}: ").strip() + try: + token = login_user(self.http_client, self.server_type, email, user_password) + self.http_client.login_token = token + print(f"Login user {email} successfully") + except Exception as e: + print(str(e)) + print("Can't access server for login (connection failed)") + + def ping_server(self, command): + iterations = command.get("iterations", 1) + if iterations > 1: + response = self.http_client.request("GET", "/system/ping", use_api_base=False, auth_kind="web", + iterations=iterations) + return response + else: + response = self.http_client.request("GET", "/system/ping", use_api_base=False, auth_kind="web") + if response.status_code == 200 and response.content == b"pong": + print("Server is alive") + else: + print("Server is down") + return None + def register_user(self, command): if self.server_type != "user": print("This command is only allowed in USER mode") @@ -1222,16 +1248,17 @@ class RAGFlowClient: print(separator) -def run_command(client: RAGFlowClient, command_dict: dict, is_interactive: bool): +def run_command(client: RAGFlowClient, command_dict: dict): command_type = command_dict["type"] match command_type: case "benchmark": - run_benchmark(client, command_dict, is_interactive) + run_benchmark(client, command_dict) + case "login_user": + client.login_user(command_dict) + case "ping_server": + return client.ping_server(command_dict) case "register_user": - if is_interactive: - print("Register user command is not supported in interactive mode") - return client.register_user(command_dict) case "list_services": client.list_services() @@ -1395,23 +1422,31 @@ Meta Commands: print(help_text) -def run_benchmark(client: RAGFlowClient, command_dict: dict, is_interactive: bool): +def run_benchmark(client: RAGFlowClient, command_dict: dict): concurrency = command_dict.get("concurrency", 1) iterations = command_dict.get("iterations", 1) command: dict = command_dict["command"] command.update({"iterations": iterations}) + + command_type = command["type"] if concurrency < 1: print("Concurrency must be greater than 0") return elif concurrency == 1: - result = run_command(client, command, is_interactive) + result = run_command(client, command) + success_count: int = 0 response_list = result["response_list"] - total_duration = result["duration"] - success_count = 0 for response in response_list: - res_json = response.json() - if response.status_code == 200 and res_json["code"] == 0: - success_count += 1 + match command_type: + case "ping_server": + if response.status_code == 200: + success_count += 1 + case _: + res_json = response.json() + if response.status_code == 200 and res_json["code"] == 0: + success_count += 1 + + total_duration = result["duration"] qps = iterations / total_duration if total_duration > 0 else None print(f"command: {command}, Concurrency: {concurrency}, iterations: {iterations}") print( @@ -1426,8 +1461,7 @@ def run_benchmark(client: RAGFlowClient, command_dict: dict, is_interactive: boo executor.submit( run_command, client, - command, - is_interactive + command ): idx for idx in range(concurrency) } @@ -1439,9 +1473,14 @@ def run_benchmark(client: RAGFlowClient, command_dict: dict, is_interactive: boo for result in results: response_list = result["response_list"] for response in response_list: - res_json = response.json() - if response.status_code == 200 and res_json["code"] == 0: - success_count += 1 + match command_type: + case "ping_server": + if response.status_code == 200: + success_count += 1 + case _: + res_json = response.json() + if response.status_code == 200 and res_json["code"] == 0: + success_count += 1 total_duration = end_time - start_time total_command_count = iterations * concurrency diff --git a/api/apps/system_app.py b/api/apps/system_app.py index 379b597de..8b8cee0b1 100644 --- a/api/apps/system_app.py +++ b/api/apps/system_app.py @@ -178,7 +178,7 @@ def healthz(): @manager.route("/ping", methods=["GET"]) # noqa: F821 -def ping(): +async def ping(): return "pong", 200