Compare commits

...

3 Commits

Author SHA1 Message Date
1d955507e9 Supports running single command (#10651)
### What problem does this PR solve?

```
$ python admin_client.py -h 0.0.0.0 -p 9381 'list users;'
Attempt to access ip: 0.0.0.0, port: 9381
Authentication successful.
Run single command: list users;
Listing all users
+-------------------------------+------------------+-----------+----------+
| create_date                   | email            | is_active | nickname |
+-------------------------------+------------------+-----------+----------+
| Thu, 15 Aug 2024 15:35:53 GMT | abc@abc.com      | 1         | aaa      |
| Sat, 08 Jun 2024 16:43:21 GMT | aaaa@aaaa.com    | 1         | aaa      |
| Thu, 15 Aug 2024 15:38:10 GMT | cbde@ccc.com     | 1         | ccc      |
| Tue, 23 Sep 2025 14:07:27 GMT | aaa@aaa.aaa      | 1         | aaa      |
| Thu, 15 Aug 2024 19:44:19 GMT | aa@aa.com        | 1         | aa       |
| Tue, 23 Sep 2025 15:41:36 GMT | admin@ragflow.io | 1         | admin    |
+-------------------------------+------------------+-----------+----------+
```

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
2025-10-18 21:03:22 +08:00
cf09c2260a feat: implement CLI of role-based access control system (#10650)
### What problem does this PR solve?

- Add comprehensive RBAC support with role and permission management
- Implement CREATE/ALTER/DROP ROLE commands for role lifecycle
management
- Add GRANT/REVOKE commands for fine-grained permission control
- Support user role assignment via ALTER USER SET ROLE command
- Add SHOW ROLE and SHOW USER PERMISSION for permission inspection
- Implement corresponding RESTful API endpoints for role management
- Integrate role commands into existing command execution framework


### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
2025-10-18 17:53:34 +08:00
c9b18cbe18 Feat:admin api (#10642)
### What problem does this PR solve?

Support frontend auth.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
2025-10-18 16:09:48 +08:00
6 changed files with 758 additions and 121 deletions

View File

@ -21,9 +21,8 @@ from cmd import Cmd
from Cryptodome.PublicKey import RSA from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
from typing import Dict, List, Any from typing import Dict, List, Any
from lark import Lark, Transformer, Tree, Token from lark import Lark, Transformer, Tree
import requests import requests
from requests.auth import HTTPBasicAuth
GRAMMAR = r""" GRAMMAR = r"""
start: command start: command
@ -43,6 +42,15 @@ sql_command: list_services
| activate_user | activate_user
| list_datasets | list_datasets
| list_agents | list_agents
| create_role
| drop_role
| alter_role
| list_roles
| show_role
| grant_permission
| revoke_permission
| alter_user_role
| show_user_permission
// meta command definition // meta command definition
meta_command: "\\" meta_command_name [meta_args] meta_command: "\\" meta_command_name [meta_args]
@ -71,6 +79,19 @@ PASSWORD: "PASSWORD"i
DATASETS: "DATASETS"i DATASETS: "DATASETS"i
OF: "OF"i OF: "OF"i
AGENTS: "AGENTS"i AGENTS: "AGENTS"i
ROLE: "ROLE"i
ROLES: "ROLES"i
DESCRIPTION: "DESCRIPTION"i
GRANT: "GRANT"i
REVOKE: "REVOKE"i
ALL: "ALL"i
PERMISSION: "PERMISSION"i
TO: "TO"i
FROM: "FROM"i
FOR: "FOR"i
RESOURCES: "RESOURCES"i
ON: "ON"i
SET: "SET"i
list_services: LIST SERVICES ";" list_services: LIST SERVICES ";"
show_service: SHOW SERVICE NUMBER ";" show_service: SHOW SERVICE NUMBER ";"
@ -88,6 +109,19 @@ activate_user: ALTER USER ACTIVE quoted_string status ";"
list_datasets: LIST DATASETS OF quoted_string ";" list_datasets: LIST DATASETS OF quoted_string ";"
list_agents: LIST AGENTS OF quoted_string ";" list_agents: LIST AGENTS OF quoted_string ";"
create_role: CREATE ROLE identifier [DESCRIPTION quoted_string] ";"
drop_role: DROP ROLE identifier ";"
alter_role: ALTER ROLE identifier SET DESCRIPTION quoted_string ";"
list_roles: LIST ROLES ";"
show_role: SHOW ROLE identifier ";"
grant_permission: GRANT action_list ON identifier TO ROLE identifier ";"
revoke_permission: REVOKE action_list ON identifier FROM ROLE identifier ";"
alter_user_role: ALTER USER quoted_string SET ROLE identifier ";"
show_user_permission: SHOW USER PERMISSION quoted_string ";"
action_list: identifier ("," identifier)*
identifier: WORD identifier: WORD
quoted_string: QUOTED_STRING quoted_string: QUOTED_STRING
status: WORD status: WORD
@ -134,34 +168,86 @@ class AdminTransformer(Transformer):
def show_user(self, items): def show_user(self, items):
user_name = items[2] user_name = items[2]
return {"type": "show_user", "username": user_name} return {"type": "show_user", "user_name": user_name}
def drop_user(self, items): def drop_user(self, items):
user_name = items[2] user_name = items[2]
return {"type": "drop_user", "username": user_name} return {"type": "drop_user", "user_name": user_name}
def alter_user(self, items): def alter_user(self, items):
user_name = items[3] user_name = items[3]
new_password = items[4] new_password = items[4]
return {"type": "alter_user", "username": user_name, "password": new_password} return {"type": "alter_user", "user_name": user_name, "password": new_password}
def create_user(self, items): def create_user(self, items):
user_name = items[2] user_name = items[2]
password = items[3] password = items[3]
return {"type": "create_user", "username": user_name, "password": password, "role": "user"} return {"type": "create_user", "user_name": user_name, "password": password, "role": "user"}
def activate_user(self, items): def activate_user(self, items):
user_name = items[3] user_name = items[3]
activate_status = items[4] activate_status = items[4]
return {"type": "activate_user", "activate_status": activate_status, "username": user_name} return {"type": "activate_user", "activate_status": activate_status, "user_name": user_name}
def list_datasets(self, items): def list_datasets(self, items):
user_name = items[3] user_name = items[3]
return {"type": "list_datasets", "username": user_name} return {"type": "list_datasets", "user_name": user_name}
def list_agents(self, items): def list_agents(self, items):
user_name = items[3] user_name = items[3]
return {"type": "list_agents", "username": user_name} return {"type": "list_agents", "user_name": user_name}
def create_role(self, items):
role_name = items[2]
if len(items) > 4:
description = items[4]
return {"type": "create_role", "role_name": role_name, "description": description}
else:
return {"type": "create_role", "role_name": role_name}
def drop_role(self, items):
role_name = items[2]
return {"type": "drop_role", "role_name": role_name}
def alter_role(self, items):
role_name = items[2]
description = items[5]
return {"type": "alter_role", "role_name": role_name, "description": description}
def list_roles(self, items):
return {"type": "list_roles"}
def show_role(self, items):
role_name = items[2]
return {"type": "show_role", "role_name": role_name}
def grant_permission(self, items):
action_list = items[1]
resource = items[3]
role_name = items[6]
return {"type": "grant_permission", "role_name": role_name, "resource": resource, "actions": action_list}
def revoke_permission(self, items):
action_list = items[1]
resource = items[3]
role_name = items[6]
return {
"type": "revoke_permission",
"role_name": role_name,
"resource": resource, "actions": action_list
}
def alter_user_role(self, items):
user_name = items[2]
role_name = items[5]
return {"type": "alter_user_role", "user_name": user_name, "role_name": role_name}
def show_user_permission(self, items):
user_name = items[3]
return {"type": "show_user_permission", "user_name": user_name}
def action_list(self, items):
return items
def meta_command(self, items): def meta_command(self, items):
command_name = str(items[0]).lower() command_name = str(items[0]).lower()
@ -205,6 +291,8 @@ class AdminCLI(Cmd):
self.is_interactive = False self.is_interactive = False
self.admin_account = "admin@ragflow.io" self.admin_account = "admin@ragflow.io"
self.admin_password: str = "admin" self.admin_password: str = "admin"
self.session = requests.Session()
self.access_token: str = ""
self.host: str = "" self.host: str = ""
self.port: int = 0 self.port: int = 0
@ -240,7 +328,7 @@ class AdminCLI(Cmd):
def default(self, line: str) -> bool: def default(self, line: str) -> bool:
return self.onecmd(line) return self.onecmd(line)
def parse_command(self, command_str: str) -> dict[str, str] | Tree[Token]: def parse_command(self, command_str: str) -> dict[str, str]:
if not command_str.strip(): if not command_str.strip():
return {'type': 'empty'} return {'type': 'empty'}
@ -252,32 +340,38 @@ class AdminCLI(Cmd):
except Exception as e: except Exception as e:
return {'type': 'error', 'message': f'Parse error: {str(e)}'} return {'type': 'error', 'message': f'Parse error: {str(e)}'}
def verify_admin(self, args): def verify_admin(self, arguments: dict, single_command: bool):
self.host = arguments['host']
conn_info = self._parse_connection_args(args) self.port = arguments['port']
if 'error' in conn_info:
print(f"Error: {conn_info['error']}")
return
self.host = conn_info['host']
self.port = conn_info['port']
print(f"Attempt to access ip: {self.host}, port: {self.port}") print(f"Attempt to access ip: {self.host}, port: {self.port}")
url = f'http://{self.host}:{self.port}/api/v1/admin/auth' url = f"http://{self.host}:{self.port}/api/v1/admin/login"
attempt_count = 3
if single_command:
attempt_count = 1
try_count = 0 try_count = 0
while True: while True:
try_count += 1 try_count += 1
if try_count > 3: if try_count > attempt_count:
return False return False
admin_passwd = input(f"password for {self.admin_account}: ").strip() if single_command:
admin_passwd = arguments['password']
else:
admin_passwd = input(f"password for {self.admin_account}: ").strip()
try: try:
self.admin_password = encode_to_base64(admin_passwd) self.admin_password = encrypt(admin_passwd)
response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) response = self.session.post(url, json={'email': self.admin_account, 'password': self.admin_password})
if response.status_code == 200: if response.status_code == 200:
res_json = response.json() res_json = response.json()
error_code = res_json.get('code', -1) error_code = res_json.get('code', -1)
if error_code == 0: if error_code == 0:
self.session.headers.update({
'Content-Type': 'application/json',
'Authorization': response.headers['Authorization'],
'User-Agent': 'RAGFlow-CLI/0.21.0'
})
print("Authentication successful.") print("Authentication successful.")
return True return True
else: else:
@ -285,8 +379,9 @@ class AdminCLI(Cmd):
print(f"Authentication failed: {error_message}, try again") print(f"Authentication failed: {error_message}, try again")
continue continue
else: else:
print(f"Bad responsestatus: {response.status_code}, try again") print(f"Bad responsestatus: {response.status_code}, password is wrong")
except Exception: except Exception as e:
print(str(e))
print(f"Can't access {self.host}, port: {self.port}") print(f"Can't access {self.host}, port: {self.port}")
def _print_table_simple(self, data): def _print_table_simple(self, data):
@ -371,23 +466,31 @@ class AdminCLI(Cmd):
print("\nGoodbye!") print("\nGoodbye!")
break break
def run_single_command(self, args): def run_single_command(self, command: str):
conn_info = self._parse_connection_args(args) result = self.parse_command(command)
if 'error' in conn_info: self.execute_command(result)
print(f"Error: {conn_info['error']}")
return
def _parse_connection_args(self, args: List[str]) -> Dict[str, Any]: def parse_connection_args(self, args: List[str]) -> Dict[str, Any]:
parser = argparse.ArgumentParser(description='Admin CLI Client', add_help=False) parser = argparse.ArgumentParser(description='Admin CLI Client', add_help=False)
parser.add_argument('-h', '--host', default='localhost', help='Admin service host') parser.add_argument('-h', '--host', default='localhost', help='Admin service host')
parser.add_argument('-p', '--port', type=int, default=8080, help='Admin service port') parser.add_argument('-p', '--port', type=int, default=8080, help='Admin service port')
parser.add_argument('-w', '--password', default='admin', type=str, help='Superuser password')
parser.add_argument('command', nargs='?', help='Single command')
try: try:
parsed_args, remaining_args = parser.parse_known_args(args) parsed_args, remaining_args = parser.parse_known_args(args)
return { if remaining_args:
'host': parsed_args.host, command = remaining_args[0]
'port': parsed_args.port, return {
} 'host': parsed_args.host,
'port': parsed_args.port,
'password': parsed_args.password,
'command': command
}
else:
return {
'host': parsed_args.host,
'port': parsed_args.port,
}
except SystemExit: except SystemExit:
return {'error': 'Invalid connection arguments'} return {'error': 'Invalid connection arguments'}
@ -434,6 +537,24 @@ class AdminCLI(Cmd):
self._handle_list_datasets(command_dict) self._handle_list_datasets(command_dict)
case 'list_agents': case 'list_agents':
self._handle_list_agents(command_dict) self._handle_list_agents(command_dict)
case 'create_role':
self._create_role(command_dict)
case 'drop_role':
self._drop_role(command_dict)
case 'alter_role':
self._alter_role(command_dict)
case 'list_roles':
self._list_roles(command_dict)
case 'show_role':
self._show_role(command_dict)
case 'grant_permission':
self._grant_permission(command_dict)
case 'revoke_permission':
self._revoke_permission(command_dict)
case 'alter_user_role':
self._alter_user_role(command_dict)
case 'show_user_permission':
self._show_user_permission(command_dict)
case 'meta': case 'meta':
self._handle_meta_command(command_dict) self._handle_meta_command(command_dict)
case _: case _:
@ -443,7 +564,7 @@ class AdminCLI(Cmd):
print("Listing all services") print("Listing all services")
url = f'http://{self.host}:{self.port}/api/v1/admin/services' url = f'http://{self.host}:{self.port}/api/v1/admin/services'
response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) response = self.session.get(url)
res_json = response.json() res_json = response.json()
if response.status_code == 200: if response.status_code == 200:
self._print_table_simple(res_json['data']) self._print_table_simple(res_json['data'])
@ -455,7 +576,7 @@ class AdminCLI(Cmd):
print(f"Showing service: {service_id}") print(f"Showing service: {service_id}")
url = f'http://{self.host}:{self.port}/api/v1/admin/services/{service_id}' url = f'http://{self.host}:{self.port}/api/v1/admin/services/{service_id}'
response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) response = self.session.get(url)
res_json = response.json() res_json = response.json()
if response.status_code == 200: if response.status_code == 200:
res_data = res_json['data'] res_data = res_json['data']
@ -486,7 +607,7 @@ class AdminCLI(Cmd):
print("Listing all users") print("Listing all users")
url = f'http://{self.host}:{self.port}/api/v1/admin/users' url = f'http://{self.host}:{self.port}/api/v1/admin/users'
response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) response = self.session.get(url)
res_json = response.json() res_json = response.json()
if response.status_code == 200: if response.status_code == 200:
self._print_table_simple(res_json['data']) self._print_table_simple(res_json['data'])
@ -494,23 +615,23 @@ class AdminCLI(Cmd):
print(f"Fail to get all users, code: {res_json['code']}, message: {res_json['message']}") print(f"Fail to get all users, code: {res_json['code']}, message: {res_json['message']}")
def _handle_show_user(self, command): def _handle_show_user(self, command):
username_tree: Tree = command['username'] username_tree: Tree = command['user_name']
username: str = username_tree.children[0].strip("'\"") user_name: str = username_tree.children[0].strip("'\"")
print(f"Showing user: {username}") print(f"Showing user: {user_name}")
url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}' url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}'
response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) response = self.session.get(url)
res_json = response.json() res_json = response.json()
if response.status_code == 200: if response.status_code == 200:
self._print_table_simple(res_json['data']) self._print_table_simple(res_json['data'])
else: else:
print(f"Fail to get user {username}, code: {res_json['code']}, message: {res_json['message']}") print(f"Fail to get user {user_name}, code: {res_json['code']}, message: {res_json['message']}")
def _handle_drop_user(self, command): def _handle_drop_user(self, command):
username_tree: Tree = command['username'] username_tree: Tree = command['user_name']
username: str = username_tree.children[0].strip("'\"") user_name: str = username_tree.children[0].strip("'\"")
print(f"Drop user: {username}") print(f"Drop user: {user_name}")
url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}' url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}'
response = requests.delete(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) response = self.session.delete(url)
res_json = response.json() res_json = response.json()
if response.status_code == 200: if response.status_code == 200:
print(res_json["message"]) print(res_json["message"])
@ -518,14 +639,13 @@ class AdminCLI(Cmd):
print(f"Fail to drop user, code: {res_json['code']}, message: {res_json['message']}") print(f"Fail to drop user, code: {res_json['code']}, message: {res_json['message']}")
def _handle_alter_user(self, command): def _handle_alter_user(self, command):
username_tree: Tree = command['username'] user_name_tree: Tree = command['user_name']
username: str = username_tree.children[0].strip("'\"") user_name: str = user_name_tree.children[0].strip("'\"")
password_tree: Tree = command['password'] password_tree: Tree = command['password']
password: str = password_tree.children[0].strip("'\"") password: str = password_tree.children[0].strip("'\"")
print(f"Alter user: {username}, password: {password}") print(f"Alter user: {user_name}, password: {password}")
url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/password' url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/password'
response = requests.put(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password), response = self.session.put(url, json={'new_password': encrypt(password)})
json={'new_password': encrypt(password)})
res_json = response.json() res_json = response.json()
if response.status_code == 200: if response.status_code == 200:
print(res_json["message"]) print(res_json["message"])
@ -533,34 +653,32 @@ class AdminCLI(Cmd):
print(f"Fail to alter password, code: {res_json['code']}, message: {res_json['message']}") print(f"Fail to alter password, code: {res_json['code']}, message: {res_json['message']}")
def _handle_create_user(self, command): def _handle_create_user(self, command):
username_tree: Tree = command['username'] user_name_tree: Tree = command['user_name']
username: str = username_tree.children[0].strip("'\"") user_name: str = user_name_tree.children[0].strip("'\"")
password_tree: Tree = command['password'] password_tree: Tree = command['password']
password: str = password_tree.children[0].strip("'\"") password: str = password_tree.children[0].strip("'\"")
role: str = command['role'] role: str = command['role']
print(f"Create user: {username}, password: {password}, role: {role}") print(f"Create user: {user_name}, password: {password}, role: {role}")
url = f'http://{self.host}:{self.port}/api/v1/admin/users' url = f'http://{self.host}:{self.port}/api/v1/admin/users'
response = requests.post( response = self.session.post(
url, url,
auth=HTTPBasicAuth(self.admin_account, self.admin_password), json={'user_name': user_name, 'password': encrypt(password), 'role': role}
json={'username': username, 'password': encrypt(password), 'role': role}
) )
res_json = response.json() res_json = response.json()
if response.status_code == 200: if response.status_code == 200:
self._print_table_simple(res_json['data']) self._print_table_simple(res_json['data'])
else: else:
print(f"Fail to create user {username}, code: {res_json['code']}, message: {res_json['message']}") print(f"Fail to create user {user_name}, code: {res_json['code']}, message: {res_json['message']}")
def _handle_activate_user(self, command): def _handle_activate_user(self, command):
username_tree: Tree = command['username'] user_name_tree: Tree = command['user_name']
username: str = username_tree.children[0].strip("'\"") user_name: str = user_name_tree.children[0].strip("'\"")
activate_tree: Tree = command['activate_status'] activate_tree: Tree = command['activate_status']
activate_status: str = activate_tree.children[0].strip("'\"") activate_status: str = activate_tree.children[0].strip("'\"")
if activate_status.lower() in ['on', 'off']: if activate_status.lower() in ['on', 'off']:
print(f"Alter user {username} activate status, turn {activate_status.lower()}.") print(f"Alter user {user_name} activate status, turn {activate_status.lower()}.")
url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/activate' url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/activate'
response = requests.put(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password), response = self.session.put(url, json={'activate_status': activate_status})
json={'activate_status': activate_status})
res_json = response.json() res_json = response.json()
if response.status_code == 200: if response.status_code == 200:
print(res_json["message"]) print(res_json["message"])
@ -570,28 +688,178 @@ class AdminCLI(Cmd):
print(f"Unknown activate status: {activate_status}.") print(f"Unknown activate status: {activate_status}.")
def _handle_list_datasets(self, command): def _handle_list_datasets(self, command):
username_tree: Tree = command['username'] username_tree: Tree = command['user_name']
username: str = username_tree.children[0].strip("'\"") user_name: str = username_tree.children[0].strip("'\"")
print(f"Listing all datasets of user: {username}") print(f"Listing all datasets of user: {user_name}")
url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/datasets' url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/datasets'
response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) response = self.session.get(url)
res_json = response.json() res_json = response.json()
if response.status_code == 200: if response.status_code == 200:
self._print_table_simple(res_json['data']) self._print_table_simple(res_json['data'])
else: else:
print(f"Fail to get all datasets of {username}, code: {res_json['code']}, message: {res_json['message']}") print(f"Fail to get all datasets of {user_name}, code: {res_json['code']}, message: {res_json['message']}")
def _handle_list_agents(self, command): def _handle_list_agents(self, command):
username_tree: Tree = command['username'] username_tree: Tree = command['user_name']
username: str = username_tree.children[0].strip("'\"") user_name: str = username_tree.children[0].strip("'\"")
print(f"Listing all agents of user: {username}") print(f"Listing all agents of user: {user_name}")
url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/agents' url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/agents'
response = requests.get(url, auth=HTTPBasicAuth(self.admin_account, self.admin_password)) response = self.session.get(url)
res_json = response.json() res_json = response.json()
if response.status_code == 200: if response.status_code == 200:
self._print_table_simple(res_json['data']) self._print_table_simple(res_json['data'])
else: else:
print(f"Fail to get all agents of {username}, code: {res_json['code']}, message: {res_json['message']}") print(f"Fail to get all agents of {user_name}, code: {res_json['code']}, message: {res_json['message']}")
def _create_role(self, command):
role_name_tree: Tree = command['role_name']
role_name: str = role_name_tree.children[0].strip("'\"")
desc_str: str = ''
if 'description' in command:
desc_tree: Tree = command['description']
desc_str = desc_tree.children[0].strip("'\"")
print(f"create role name: {role_name}, description: {desc_str}")
url = f'http://{self.host}:{self.port}/api/v1/admin/roles'
response = self.session.post(
url,
json={'role_name': role_name, 'description': desc_str}
)
res_json = response.json()
if response.status_code == 200:
self._print_table_simple(res_json['data'])
else:
print(f"Fail to create role {role_name}, code: {res_json['code']}, message: {res_json['message']}")
def _drop_role(self, command):
role_name_tree: Tree = command['role_name']
role_name: str = role_name_tree.children[0].strip("'\"")
print(f"drop role name: {role_name}")
url = f'http://{self.host}:{self.port}/api/v1/admin/roles/{role_name}'
response = self.session.delete(url)
res_json = response.json()
if response.status_code == 200:
self._print_table_simple(res_json['data'])
else:
print(f"Fail to drop role {role_name}, code: {res_json['code']}, message: {res_json['message']}")
def _alter_role(self, command):
role_name_tree: Tree = command['role_name']
role_name: str = role_name_tree.children[0].strip("'\"")
desc_tree: Tree = command['description']
desc_str: str = desc_tree.children[0].strip("'\"")
print(f"alter role name: {role_name}, description: {desc_str}")
url = f'http://{self.host}:{self.port}/api/v1/admin/roles/{role_name}'
response = self.session.put(
url,
json={'description': desc_str}
)
res_json = response.json()
if response.status_code == 200:
self._print_table_simple(res_json['data'])
else:
print(
f"Fail to update role {role_name} with description: {desc_str}, code: {res_json['code']}, message: {res_json['message']}")
def _list_roles(self, command):
print("Listing all roles")
url = f'http://{self.host}:{self.port}/api/v1/admin/roles'
response = self.session.get(url)
res_json = response.json()
if response.status_code == 200:
self._print_table_simple(res_json['data'])
else:
print(f"Fail to list roles, code: {res_json['code']}, message: {res_json['message']}")
def _show_role(self, command):
role_name_tree: Tree = command['role_name']
role_name: str = role_name_tree.children[0].strip("'\"")
print(f"show role: {role_name}")
url = f'http://{self.host}:{self.port}/api/v1/admin/roles/{role_name}/permission'
response = self.session.get(url)
res_json = response.json()
if response.status_code == 200:
self._print_table_simple(res_json['data'])
else:
print(f"Fail to list roles, code: {res_json['code']}, message: {res_json['message']}")
def _grant_permission(self, command):
role_name_tree: Tree = command['role_name']
role_name_str: str = role_name_tree.children[0].strip("'\"")
resource_tree: Tree = command['resource']
resource_str: str = resource_tree.children[0].strip("'\"")
action_tree_list: list = command['actions']
actions: list = []
for action_tree in action_tree_list:
action_str: str = action_tree.children[0].strip("'\"")
actions.append(action_str)
print(f"grant role_name: {role_name_str}, resource: {resource_str}, actions: {actions}")
url = f'http://{self.host}:{self.port}/api/v1/admin/roles/{role_name_str}/permission'
response = self.session.post(
url,
json={'actions': actions, 'resource': resource_str}
)
res_json = response.json()
if response.status_code == 200:
self._print_table_simple(res_json['data'])
else:
print(
f"Fail to grant role {role_name_str} with {actions} on {resource_str}, code: {res_json['code']}, message: {res_json['message']}")
def _revoke_permission(self, command):
role_name_tree: Tree = command['role_name']
role_name_str: str = role_name_tree.children[0].strip("'\"")
resource_tree: Tree = command['resource']
resource_str: str = resource_tree.children[0].strip("'\"")
action_tree_list: list = command['actions']
actions: list = []
for action_tree in action_tree_list:
action_str: str = action_tree.children[0].strip("'\"")
actions.append(action_str)
print(f"revoke role_name: {role_name_str}, resource: {resource_str}, actions: {actions}")
url = f'http://{self.host}:{self.port}/api/v1/admin/roles/{role_name_str}/permission'
response = self.session.delete(
url,
json={'actions': actions, 'resource': resource_str}
)
res_json = response.json()
if response.status_code == 200:
self._print_table_simple(res_json['data'])
else:
print(
f"Fail to revoke role {role_name_str} with {actions} on {resource_str}, code: {res_json['code']}, message: {res_json['message']}")
def _alter_user_role(self, command):
role_name_tree: Tree = command['role_name']
role_name_str: str = role_name_tree.children[0].strip("'\"")
user_name_tree: Tree = command['user_name']
user_name_str: str = user_name_tree.children[0].strip("'\"")
print(f"alter_user_role user_name: {user_name_str}, role_name: {role_name_str}")
url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name_str}/role'
response = self.session.put(
url,
json={'role_name': role_name_str}
)
res_json = response.json()
if response.status_code == 200:
self._print_table_simple(res_json['data'])
else:
print(
f"Fail to alter user: {user_name_str} to role {role_name_str}, code: {res_json['code']}, message: {res_json['message']}")
def _show_user_permission(self, command):
user_name_tree: Tree = command['user_name']
user_name_str: str = user_name_tree.children[0].strip("'\"")
print(f"show_user_permission user_name: {user_name_str}")
url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name_str}/permission'
response = self.session.get(url)
res_json = response.json()
if response.status_code == 200:
self._print_table_simple(res_json['data'])
else:
print(
f"Fail to show user: {user_name_str} permission, code: {res_json['code']}, message: {res_json['message']}")
def _handle_meta_command(self, command): def _handle_meta_command(self, command):
meta_command = command['command'] meta_command = command['command']
@ -634,27 +902,29 @@ def main():
cli = AdminCLI() cli = AdminCLI()
if len(sys.argv) == 1 or (len(sys.argv) > 1 and sys.argv[1] == '-'): args = cli.parse_connection_args(sys.argv)
print(r""" if 'error' in args:
____ ___ ______________ ___ __ _ print(f"Error: {args['error']}")
/ __ \/ | / ____/ ____/ /___ _ __ / | ____/ /___ ___ (_)___ return
/ /_/ / /| |/ / __/ /_ / / __ \ | /| / / / /| |/ __ / __ `__ \/ / __ \
/ _, _/ ___ / /_/ / __/ / / /_/ / |/ |/ / / ___ / /_/ / / / / / / / / / / if 'command' in args:
/_/ |_/_/ |_\____/_/ /_/\____/|__/|__/ /_/ |_\__,_/_/ /_/ /_/_/_/ /_/ if 'password' not in args:
""") print("Error: password is missing")
if cli.verify_admin(sys.argv): return
cli.cmdloop() if cli.verify_admin(args, single_command=True):
command: str = args['command']
print(f"Run single command: {command}")
cli.run_single_command(command)
else: else:
print(r""" if cli.verify_admin(args, single_command=False):
____ ___ ______________ ___ __ _ print(r"""
/ __ \/ | / ____/ ____/ /___ _ __ / | ____/ /___ ___ (_)___ ____ ___ ______________ ___ __ _
/ /_/ / /| |/ / __/ /_ / / __ \ | /| / / / /| |/ __ / __ `__ \/ / __ \ / __ \/ | / ____/ ____/ /___ _ __ / | ____/ /___ ___ (_)___
/ _, _/ ___ / /_/ / __/ / / /_/ / |/ |/ / / ___ / /_/ / / / / / / / / / / / /_/ / /| |/ / __/ /_ / / __ \ | /| / / / /| |/ __ / __ `__ \/ / __ \
/_/ |_/_/ |_\____/_/ /_/\____/|__/|__/ /_/ |_\__,_/_/ /_/ /_/_/_/ /_/ / _, _/ ___ / /_/ / __/ / / /_/ / |/ |/ / / ___ / /_/ / / / / / / / / / /
""") /_/ |_/_/ |_\____/_/ /_/\____/|__/|__/ /_/ |_\__,_/_/ /_/ /_/_/_/ /_/
if cli.verify_admin(sys.argv): """)
cli.cmdloop() cli.cmdloop()
# cli.run_single_command(sys.argv[1:])
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -27,6 +27,9 @@ from api.utils.log_utils import init_root_logger
from api.constants import SERVICE_CONF from api.constants import SERVICE_CONF
from api import settings from api import settings
from config import load_configurations, SERVICE_CONFIGS from config import load_configurations, SERVICE_CONFIGS
from auth import init_default_admin, setup_auth
from flask_session import Session
from flask_login import LoginManager
stop_event = threading.Event() stop_event = threading.Event()
@ -42,7 +45,17 @@ if __name__ == '__main__':
app = Flask(__name__) app = Flask(__name__)
app.register_blueprint(admin_bp) app.register_blueprint(admin_bp)
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
app.config["MAX_CONTENT_LENGTH"] = int(
os.environ.get("MAX_CONTENT_LENGTH", 1024 * 1024 * 1024)
)
Session(app)
login_manager = LoginManager()
login_manager.init_app(app)
settings.init_settings() settings.init_settings()
setup_auth(login_manager)
init_default_admin()
SERVICE_CONFIGS.configs = load_configurations(SERVICE_CONF) SERVICE_CONFIGS.configs = load_configurations(SERVICE_CONF)
try: try:

View File

@ -18,11 +18,122 @@
import logging import logging
import uuid import uuid
from functools import wraps from functools import wraps
from datetime import datetime
from flask import request, jsonify from flask import request, jsonify
from flask_login import current_user, login_user
from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer
from api.common.exceptions import AdminException from api import settings
from api.common.exceptions import AdminException, UserNotFoundError
from api.db.init_data import encode_to_base64 from api.db.init_data import encode_to_base64
from api.db.services import UserService from api.db.services import UserService
from api.db import ActiveEnum, StatusEnum
from api.utils.crypt import decrypt
from api.utils import (
current_timestamp,
datetime_format,
get_uuid,
)
from api.utils.api_utils import (
construct_response,
)
def setup_auth(login_manager):
@login_manager.request_loader
def load_user(web_request):
jwt = Serializer(secret_key=settings.SECRET_KEY)
authorization = web_request.headers.get("Authorization")
if authorization:
try:
access_token = str(jwt.loads(authorization))
if not access_token or not access_token.strip():
logging.warning("Authentication attempt with empty access token")
return None
# Access tokens should be UUIDs (32 hex characters)
if len(access_token.strip()) < 32:
logging.warning(f"Authentication attempt with invalid token format: {len(access_token)} chars")
return None
user = UserService.query(
access_token=access_token, status=StatusEnum.VALID.value
)
if user:
if not user[0].access_token or not user[0].access_token.strip():
logging.warning(f"User {user[0].email} has empty access_token in database")
return None
return user[0]
else:
return None
except Exception as e:
logging.warning(f"load_user got exception {e}")
return None
else:
return None
def init_default_admin():
# Verify that at least one active admin user exists. If not, create a default one.
users = UserService.query(is_superuser=True)
if not users:
default_admin = {
"id": uuid.uuid1().hex,
"password": encode_to_base64("admin"),
"nickname": "admin",
"is_superuser": True,
"email": "admin@ragflow.io",
"creator": "system",
"status": "1",
}
if not UserService.save(**default_admin):
raise AdminException("Can't init admin.", 500)
elif not any([u.is_active == ActiveEnum.ACTIVE.value for u in users]):
raise AdminException("No active admin. Please update 'is_active' in db manually.", 500)
def check_admin_auth(func):
@wraps(func)
def wrapper(*args, **kwargs):
user = UserService.filter_by_id(current_user.id)
if not user:
raise UserNotFoundError(current_user.email)
if not user.is_superuser:
raise AdminException("Not admin", 403)
if user.is_active == ActiveEnum.INACTIVE.value:
raise AdminException(f"User {current_user.email} inactive", 403)
return func(*args, **kwargs)
return wrapper
def login_admin(email: str, password: str):
"""
:param email: admin email
:param password: string before decrypt
"""
users = UserService.query(email=email)
if not users:
raise UserNotFoundError(email)
psw = decrypt(password)
user = UserService.query_user(email, psw)
if not user:
raise AdminException("Email and password do not match!")
if not user.is_superuser:
raise AdminException("Not admin", 403)
if user.is_active == ActiveEnum.INACTIVE.value:
raise AdminException(f"User {email} inactive", 403)
resp = user.to_json()
user.access_token = get_uuid()
login_user(user)
user.update_time = (current_timestamp(),)
user.update_date = (datetime_format(datetime.now()),)
user.save()
msg = "Welcome back!"
return construct_response(data=resp, auth=user.get_id(), message=msg)
def check_admin(username: str, password: str): def check_admin(username: str, password: str):
@ -61,12 +172,18 @@ def login_verify(f):
username = auth.parameters['username'] username = auth.parameters['username']
password = auth.parameters['password'] password = auth.parameters['password']
# TODO: to check the username and password from DB try:
if check_admin(username, password) is False: if check_admin(username, password) is False:
return jsonify({
"code": 500,
"message": "Access denied",
"data": None
}), 200
except Exception as e:
error_msg = str(e)
return jsonify({ return jsonify({
"code": 403, "code": 500,
"message": "Access denied", "message": error_msg
"data": None
}), 200 }), 200
return f(*args, **kwargs) return f(*args, **kwargs)

76
admin/server/roles.py Normal file
View File

@ -0,0 +1,76 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from typing import Dict, Any
from api.common.exceptions import AdminException
class RoleMgr:
@staticmethod
def create_role(role_name: str, description: str):
error_msg = f"not implement: create role: {role_name}, description: {description}"
logging.error(error_msg)
raise AdminException(error_msg)
@staticmethod
def update_role_description(role_name: str, description: str) -> Dict[str, Any]:
error_msg = f"not implement: update role: {role_name} with description: {description}"
logging.error(error_msg)
raise AdminException(error_msg)
@staticmethod
def delete_role(role_name: str) -> Dict[str, Any]:
error_msg = f"not implement: drop role: {role_name}"
logging.error(error_msg)
raise AdminException(error_msg)
@staticmethod
def list_roles() -> Dict[str, Any]:
error_msg = "not implement: list roles"
logging.error(error_msg)
raise AdminException(error_msg)
@staticmethod
def get_role_permission(role_name: str) -> Dict[str, Any]:
error_msg = f"not implement: show role {role_name}"
logging.error(error_msg)
raise AdminException(error_msg)
@staticmethod
def grant_role_permission(role_name: str, actions: list, resource: str) -> Dict[str, Any]:
error_msg = f"not implement: grant role {role_name} actions: {actions} on {resource}"
logging.error(error_msg)
raise AdminException(error_msg)
@staticmethod
def revoke_role_permission(role_name: str, actions: list, resource: str) -> Dict[str, Any]:
error_msg = f"not implement: revoke role {role_name} actions: {actions} on {resource}"
logging.error(error_msg)
raise AdminException(error_msg)
@staticmethod
def update_user_role(user_name: str, role_name: str) -> Dict[str, Any]:
error_msg = f"not implement: update user role: {user_name} to role {role_name}"
logging.error(error_msg)
raise AdminException(error_msg)
@staticmethod
def get_user_permission(user_name: str) -> Dict[str, Any]:
error_msg = f"not implement: get user permission: {user_name}"
logging.error(error_msg)
raise AdminException(error_msg)

View File

@ -14,17 +14,38 @@
# limitations under the License. # limitations under the License.
# #
import secrets
from flask import Blueprint, request from flask import Blueprint, request
from flask_login import current_user, logout_user, login_required
from auth import login_verify from auth import login_verify, login_admin, check_admin_auth
from responses import success_response, error_response from responses import success_response, error_response
from services import UserMgr, ServiceMgr, UserServiceMgr from services import UserMgr, ServiceMgr, UserServiceMgr
from roles import RoleMgr
from api.common.exceptions import AdminException from api.common.exceptions import AdminException
admin_bp = Blueprint('admin', __name__, url_prefix='/api/v1/admin') admin_bp = Blueprint('admin', __name__, url_prefix='/api/v1/admin')
@admin_bp.route('/login', methods=['POST'])
def login():
if not request.json:
return error_response('Authorize admin failed.' ,400)
email = request.json.get("email", "")
password = request.json.get("password", "")
return login_admin(email, password)
@admin_bp.route('/logout', methods=['GET'])
@login_required
def logout():
current_user.access_token = f"INVALID_{secrets.token_hex(16)}"
current_user.save()
logout_user()
return success_response(True)
@admin_bp.route('/auth', methods=['GET']) @admin_bp.route('/auth', methods=['GET'])
@login_verify @login_verify
def auth_admin(): def auth_admin():
@ -35,7 +56,8 @@ def auth_admin():
@admin_bp.route('/users', methods=['GET']) @admin_bp.route('/users', methods=['GET'])
@login_verify @login_required
@check_admin_auth
def list_users(): def list_users():
try: try:
users = UserMgr.get_all_users() users = UserMgr.get_all_users()
@ -45,7 +67,8 @@ def list_users():
@admin_bp.route('/users', methods=['POST']) @admin_bp.route('/users', methods=['POST'])
@login_verify @login_required
@check_admin_auth
def create_user(): def create_user():
try: try:
data = request.get_json() data = request.get_json()
@ -71,7 +94,8 @@ def create_user():
@admin_bp.route('/users/<username>', methods=['DELETE']) @admin_bp.route('/users/<username>', methods=['DELETE'])
@login_verify @login_required
@check_admin_auth
def delete_user(username): def delete_user(username):
try: try:
res = UserMgr.delete_user(username) res = UserMgr.delete_user(username)
@ -87,7 +111,8 @@ def delete_user(username):
@admin_bp.route('/users/<username>/password', methods=['PUT']) @admin_bp.route('/users/<username>/password', methods=['PUT'])
@login_verify @login_required
@check_admin_auth
def change_password(username): def change_password(username):
try: try:
data = request.get_json() data = request.get_json()
@ -105,7 +130,8 @@ def change_password(username):
@admin_bp.route('/users/<username>/activate', methods=['PUT']) @admin_bp.route('/users/<username>/activate', methods=['PUT'])
@login_verify @login_required
@check_admin_auth
def alter_user_activate_status(username): def alter_user_activate_status(username):
try: try:
data = request.get_json() data = request.get_json()
@ -121,7 +147,8 @@ def alter_user_activate_status(username):
@admin_bp.route('/users/<username>', methods=['GET']) @admin_bp.route('/users/<username>', methods=['GET'])
@login_verify @login_required
@check_admin_auth
def get_user_details(username): def get_user_details(username):
try: try:
user_details = UserMgr.get_user_details(username) user_details = UserMgr.get_user_details(username)
@ -134,7 +161,8 @@ def get_user_details(username):
@admin_bp.route('/users/<username>/datasets', methods=['GET']) @admin_bp.route('/users/<username>/datasets', methods=['GET'])
@login_verify @login_required
@check_admin_auth
def get_user_datasets(username): def get_user_datasets(username):
try: try:
datasets_list = UserServiceMgr.get_user_datasets(username) datasets_list = UserServiceMgr.get_user_datasets(username)
@ -147,7 +175,8 @@ def get_user_datasets(username):
@admin_bp.route('/users/<username>/agents', methods=['GET']) @admin_bp.route('/users/<username>/agents', methods=['GET'])
@login_verify @login_required
@check_admin_auth
def get_user_agents(username): def get_user_agents(username):
try: try:
agents_list = UserServiceMgr.get_user_agents(username) agents_list = UserServiceMgr.get_user_agents(username)
@ -160,7 +189,8 @@ def get_user_agents(username):
@admin_bp.route('/services', methods=['GET']) @admin_bp.route('/services', methods=['GET'])
@login_verify @login_required
@check_admin_auth
def get_services(): def get_services():
try: try:
services = ServiceMgr.get_all_services() services = ServiceMgr.get_all_services()
@ -170,7 +200,8 @@ def get_services():
@admin_bp.route('/service_types/<service_type>', methods=['GET']) @admin_bp.route('/service_types/<service_type>', methods=['GET'])
@login_verify @login_required
@check_admin_auth
def get_services_by_type(service_type_str): def get_services_by_type(service_type_str):
try: try:
services = ServiceMgr.get_services_by_type(service_type_str) services = ServiceMgr.get_services_by_type(service_type_str)
@ -180,7 +211,8 @@ def get_services_by_type(service_type_str):
@admin_bp.route('/services/<service_id>', methods=['GET']) @admin_bp.route('/services/<service_id>', methods=['GET'])
@login_verify @login_required
@check_admin_auth
def get_service(service_id): def get_service(service_id):
try: try:
services = ServiceMgr.get_service_details(service_id) services = ServiceMgr.get_service_details(service_id)
@ -190,7 +222,8 @@ def get_service(service_id):
@admin_bp.route('/services/<service_id>', methods=['DELETE']) @admin_bp.route('/services/<service_id>', methods=['DELETE'])
@login_verify @login_required
@check_admin_auth
def shutdown_service(service_id): def shutdown_service(service_id):
try: try:
services = ServiceMgr.shutdown_service(service_id) services = ServiceMgr.shutdown_service(service_id)
@ -200,10 +233,133 @@ def shutdown_service(service_id):
@admin_bp.route('/services/<service_id>', methods=['PUT']) @admin_bp.route('/services/<service_id>', methods=['PUT'])
@login_verify @login_required
@check_admin_auth
def restart_service(service_id): def restart_service(service_id):
try: try:
services = ServiceMgr.restart_service(service_id) services = ServiceMgr.restart_service(service_id)
return success_response(services) return success_response(services)
except Exception as e: except Exception as e:
return error_response(str(e), 500) return error_response(str(e), 500)
@admin_bp.route('/roles', methods=['POST'])
@login_required
@check_admin_auth
def create_role():
try:
data = request.get_json()
if not data or 'role_name' not in data:
return error_response("Role name is required", 400)
role_name: str = data['role_name']
description: str = data['description']
res = RoleMgr.create_role(role_name, description)
return success_response(res)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/roles/<role_name>', methods=['PUT'])
@login_required
@check_admin_auth
def update_role(role_name: str):
try:
data = request.get_json()
if not data or 'description' not in data:
return error_response("Role description is required", 400)
description: str = data['description']
res = RoleMgr.update_role_description(role_name, description)
return success_response(res)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/roles/<role_name>', methods=['DELETE'])
@login_required
@check_admin_auth
def delete_role(role_name: str):
try:
res = RoleMgr.delete_role(role_name)
return success_response(res)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/roles', methods=['GET'])
@login_required
@check_admin_auth
def list_roles():
try:
res = RoleMgr.list_roles()
return success_response(res)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/roles/<role_name>/permission', methods=['GET'])
@login_required
@check_admin_auth
def get_role_permission(role_name: str):
try:
res = RoleMgr.get_role_permission(role_name)
return success_response(res)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/roles/<role_name>/permission', methods=['POST'])
@login_required
@check_admin_auth
def grant_role_permission(role_name: str):
try:
data = request.get_json()
if not data or 'actions' not in data or 'resource' not in data:
return error_response("Permission is required", 400)
actions: list = data['actions']
resource: str = data['resource']
res = RoleMgr.grant_role_permission(role_name, actions, resource)
return success_response(res)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/roles/<role_name>/permission', methods=['DELETE'])
@login_required
@check_admin_auth
def revoke_role_permission(role_name: str):
try:
data = request.get_json()
if not data or 'actions' not in data or 'resource' not in data:
return error_response("Permission is required", 400)
actions: list = data['actions']
resource: str = data['resource']
res = RoleMgr.revoke_role_permission(role_name, actions, resource)
return success_response(res)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/users/<user_name>/role', methods=['PUT'])
@login_required
@check_admin_auth
def update_user_role(user_name: str):
try:
data = request.get_json()
if not data or 'role_name' not in data:
return error_response("Role name is required", 400)
role_name: str = data['role_name']
res = RoleMgr.update_user_role(user_name, role_name)
return success_response(res)
except Exception as e:
return error_response(str(e), 500)
@admin_bp.route('/users/<user_name>/permission', methods=['GET'])
@login_required
@check_admin_auth
def get_user_permission(user_name: str):
try:
res = RoleMgr.get_user_permission(user_name)
return success_response(res)
except Exception as e:
return error_response(str(e), 500)

View File

@ -36,3 +36,8 @@ class UserAlreadyExistsError(AdminException):
class CannotDeleteAdminError(AdminException): class CannotDeleteAdminError(AdminException):
def __init__(self): def __init__(self):
super().__init__("Cannot delete admin account", 403) super().__init__("Cannot delete admin account", 403)
class NotAdminError(AdminException):
def __init__(self, username):
super().__init__(f"User '{username}' is not admin", 403)