diff --git a/admin/client/admin_client.py b/admin/client/admin_client.py index fdf502b7f..7fc8780f6 100644 --- a/admin/client/admin_client.py +++ b/admin/client/admin_client.py @@ -21,7 +21,7 @@ from cmd import Cmd from Cryptodome.PublicKey import RSA from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 from typing import Dict, List, Any -from lark import Lark, Transformer, Tree, Token +from lark import Lark, Transformer, Tree import requests GRAMMAR = r""" @@ -42,6 +42,15 @@ sql_command: list_services | activate_user | list_datasets | list_agents + | create_role + | drop_role + | alter_role + | list_roles + | show_role + | grant_permission + | revoke_permission + | alter_user_role + | show_user_permission // meta command definition meta_command: "\\" meta_command_name [meta_args] @@ -70,6 +79,19 @@ PASSWORD: "PASSWORD"i DATASETS: "DATASETS"i OF: "OF"i AGENTS: "AGENTS"i +ROLE: "ROLE"i +ROLES: "ROLES"i +DESCRIPTION: "DESCRIPTION"i +GRANT: "GRANT"i +REVOKE: "REVOKE"i +ALL: "ALL"i +PERMISSION: "PERMISSION"i +TO: "TO"i +FROM: "FROM"i +FOR: "FOR"i +RESOURCES: "RESOURCES"i +ON: "ON"i +SET: "SET"i list_services: LIST SERVICES ";" show_service: SHOW SERVICE NUMBER ";" @@ -87,6 +109,19 @@ activate_user: ALTER USER ACTIVE quoted_string status ";" list_datasets: LIST DATASETS OF quoted_string ";" list_agents: LIST AGENTS OF quoted_string ";" +create_role: CREATE ROLE identifier [DESCRIPTION quoted_string] ";" +drop_role: DROP ROLE identifier ";" +alter_role: ALTER ROLE identifier SET DESCRIPTION quoted_string ";" +list_roles: LIST ROLES ";" +show_role: SHOW ROLE identifier ";" + +grant_permission: GRANT action_list ON identifier TO ROLE identifier ";" +revoke_permission: REVOKE action_list ON identifier FROM ROLE identifier ";" +alter_user_role: ALTER USER quoted_string SET ROLE identifier ";" +show_user_permission: SHOW USER PERMISSION quoted_string ";" + +action_list: identifier ("," identifier)* + identifier: WORD quoted_string: QUOTED_STRING status: WORD @@ -133,34 +168,86 @@ class AdminTransformer(Transformer): def show_user(self, items): user_name = items[2] - return {"type": "show_user", "username": user_name} + return {"type": "show_user", "user_name": user_name} def drop_user(self, items): user_name = items[2] - return {"type": "drop_user", "username": user_name} + return {"type": "drop_user", "user_name": user_name} def alter_user(self, items): user_name = items[3] new_password = items[4] - return {"type": "alter_user", "username": user_name, "password": new_password} + return {"type": "alter_user", "user_name": user_name, "password": new_password} def create_user(self, items): user_name = items[2] password = items[3] - return {"type": "create_user", "username": user_name, "password": password, "role": "user"} + return {"type": "create_user", "user_name": user_name, "password": password, "role": "user"} def activate_user(self, items): user_name = items[3] activate_status = items[4] - return {"type": "activate_user", "activate_status": activate_status, "username": user_name} + return {"type": "activate_user", "activate_status": activate_status, "user_name": user_name} def list_datasets(self, items): user_name = items[3] - return {"type": "list_datasets", "username": user_name} + return {"type": "list_datasets", "user_name": user_name} def list_agents(self, items): user_name = items[3] - return {"type": "list_agents", "username": user_name} + return {"type": "list_agents", "user_name": user_name} + + def create_role(self, items): + role_name = items[2] + if len(items) > 4: + description = items[4] + return {"type": "create_role", "role_name": role_name, "description": description} + else: + return {"type": "create_role", "role_name": role_name} + + def drop_role(self, items): + role_name = items[2] + return {"type": "drop_role", "role_name": role_name} + + def alter_role(self, items): + role_name = items[2] + description = items[5] + return {"type": "alter_role", "role_name": role_name, "description": description} + + def list_roles(self, items): + return {"type": "list_roles"} + + def show_role(self, items): + role_name = items[2] + return {"type": "show_role", "role_name": role_name} + + def grant_permission(self, items): + action_list = items[1] + resource = items[3] + role_name = items[6] + return {"type": "grant_permission", "role_name": role_name, "resource": resource, "actions": action_list} + + def revoke_permission(self, items): + action_list = items[1] + resource = items[3] + role_name = items[6] + return { + "type": "revoke_permission", + "role_name": role_name, + "resource": resource, "actions": action_list + } + + def alter_user_role(self, items): + user_name = items[2] + role_name = items[5] + return {"type": "alter_user_role", "user_name": user_name, "role_name": role_name} + + def show_user_permission(self, items): + user_name = items[3] + return {"type": "show_user_permission", "user_name": user_name} + + def action_list(self, items): + return items def meta_command(self, items): command_name = str(items[0]).lower() @@ -241,7 +328,7 @@ class AdminCLI(Cmd): def default(self, line: str) -> bool: return self.onecmd(line) - def parse_command(self, command_str: str) -> dict[str, str] | Tree[Token]: + def parse_command(self, command_str: str) -> dict[str, str]: if not command_str.strip(): return {'type': 'empty'} @@ -441,6 +528,24 @@ class AdminCLI(Cmd): self._handle_list_datasets(command_dict) case 'list_agents': self._handle_list_agents(command_dict) + case 'create_role': + self._create_role(command_dict) + case 'drop_role': + self._drop_role(command_dict) + case 'alter_role': + self._alter_role(command_dict) + case 'list_roles': + self._list_roles(command_dict) + case 'show_role': + self._show_role(command_dict) + case 'grant_permission': + self._grant_permission(command_dict) + case 'revoke_permission': + self._revoke_permission(command_dict) + case 'alter_user_role': + self._alter_user_role(command_dict) + case 'show_user_permission': + self._show_user_permission(command_dict) case 'meta': self._handle_meta_command(command_dict) case _: @@ -501,22 +606,22 @@ class AdminCLI(Cmd): print(f"Fail to get all users, code: {res_json['code']}, message: {res_json['message']}") def _handle_show_user(self, command): - username_tree: Tree = command['username'] - username: str = username_tree.children[0].strip("'\"") - print(f"Showing user: {username}") - url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}' + username_tree: Tree = command['user_name'] + user_name: str = username_tree.children[0].strip("'\"") + print(f"Showing user: {user_name}") + url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}' response = self.session.get(url) res_json = response.json() if response.status_code == 200: self._print_table_simple(res_json['data']) else: - print(f"Fail to get user {username}, code: {res_json['code']}, message: {res_json['message']}") + print(f"Fail to get user {user_name}, code: {res_json['code']}, message: {res_json['message']}") def _handle_drop_user(self, command): - username_tree: Tree = command['username'] - username: str = username_tree.children[0].strip("'\"") - print(f"Drop user: {username}") - url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}' + username_tree: Tree = command['user_name'] + user_name: str = username_tree.children[0].strip("'\"") + print(f"Drop user: {user_name}") + url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}' response = self.session.delete(url) res_json = response.json() if response.status_code == 200: @@ -525,12 +630,12 @@ class AdminCLI(Cmd): print(f"Fail to drop user, code: {res_json['code']}, message: {res_json['message']}") def _handle_alter_user(self, command): - username_tree: Tree = command['username'] - username: str = username_tree.children[0].strip("'\"") + user_name_tree: Tree = command['user_name'] + user_name: str = user_name_tree.children[0].strip("'\"") password_tree: Tree = command['password'] password: str = password_tree.children[0].strip("'\"") - print(f"Alter user: {username}, password: {password}") - url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/password' + print(f"Alter user: {user_name}, password: {password}") + url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/password' response = self.session.put(url, json={'new_password': encrypt(password)}) res_json = response.json() if response.status_code == 200: @@ -539,31 +644,31 @@ class AdminCLI(Cmd): print(f"Fail to alter password, code: {res_json['code']}, message: {res_json['message']}") def _handle_create_user(self, command): - username_tree: Tree = command['username'] - username: str = username_tree.children[0].strip("'\"") + user_name_tree: Tree = command['user_name'] + user_name: str = user_name_tree.children[0].strip("'\"") password_tree: Tree = command['password'] password: str = password_tree.children[0].strip("'\"") role: str = command['role'] - print(f"Create user: {username}, password: {password}, role: {role}") + print(f"Create user: {user_name}, password: {password}, role: {role}") url = f'http://{self.host}:{self.port}/api/v1/admin/users' response = self.session.post( url, - json={'username': username, 'password': encrypt(password), 'role': role} + json={'user_name': user_name, 'password': encrypt(password), 'role': role} ) res_json = response.json() if response.status_code == 200: self._print_table_simple(res_json['data']) else: - print(f"Fail to create user {username}, code: {res_json['code']}, message: {res_json['message']}") + print(f"Fail to create user {user_name}, code: {res_json['code']}, message: {res_json['message']}") def _handle_activate_user(self, command): - username_tree: Tree = command['username'] - username: str = username_tree.children[0].strip("'\"") + user_name_tree: Tree = command['user_name'] + user_name: str = user_name_tree.children[0].strip("'\"") activate_tree: Tree = command['activate_status'] activate_status: str = activate_tree.children[0].strip("'\"") if activate_status.lower() in ['on', 'off']: - print(f"Alter user {username} activate status, turn {activate_status.lower()}.") - url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/activate' + print(f"Alter user {user_name} activate status, turn {activate_status.lower()}.") + url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/activate' response = self.session.put(url, json={'activate_status': activate_status}) res_json = response.json() if response.status_code == 200: @@ -574,28 +679,178 @@ class AdminCLI(Cmd): print(f"Unknown activate status: {activate_status}.") def _handle_list_datasets(self, command): - username_tree: Tree = command['username'] - username: str = username_tree.children[0].strip("'\"") - print(f"Listing all datasets of user: {username}") - url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/datasets' + username_tree: Tree = command['user_name'] + user_name: str = username_tree.children[0].strip("'\"") + print(f"Listing all datasets of user: {user_name}") + url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/datasets' response = self.session.get(url) res_json = response.json() if response.status_code == 200: self._print_table_simple(res_json['data']) else: - print(f"Fail to get all datasets of {username}, code: {res_json['code']}, message: {res_json['message']}") + print(f"Fail to get all datasets of {user_name}, code: {res_json['code']}, message: {res_json['message']}") def _handle_list_agents(self, command): - username_tree: Tree = command['username'] - username: str = username_tree.children[0].strip("'\"") - print(f"Listing all agents of user: {username}") - url = f'http://{self.host}:{self.port}/api/v1/admin/users/{username}/agents' + username_tree: Tree = command['user_name'] + user_name: str = username_tree.children[0].strip("'\"") + print(f"Listing all agents of user: {user_name}") + url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/agents' response = self.session.get(url) res_json = response.json() if response.status_code == 200: self._print_table_simple(res_json['data']) else: - print(f"Fail to get all agents of {username}, code: {res_json['code']}, message: {res_json['message']}") + print(f"Fail to get all agents of {user_name}, code: {res_json['code']}, message: {res_json['message']}") + + def _create_role(self, command): + role_name_tree: Tree = command['role_name'] + role_name: str = role_name_tree.children[0].strip("'\"") + desc_str: str = '' + if 'description' in command: + desc_tree: Tree = command['description'] + desc_str = desc_tree.children[0].strip("'\"") + + print(f"create role name: {role_name}, description: {desc_str}") + url = f'http://{self.host}:{self.port}/api/v1/admin/roles' + response = self.session.post( + url, + json={'role_name': role_name, 'description': desc_str} + ) + res_json = response.json() + if response.status_code == 200: + self._print_table_simple(res_json['data']) + else: + print(f"Fail to create role {role_name}, code: {res_json['code']}, message: {res_json['message']}") + + def _drop_role(self, command): + role_name_tree: Tree = command['role_name'] + role_name: str = role_name_tree.children[0].strip("'\"") + print(f"drop role name: {role_name}") + url = f'http://{self.host}:{self.port}/api/v1/admin/roles/{role_name}' + response = self.session.delete(url) + res_json = response.json() + if response.status_code == 200: + self._print_table_simple(res_json['data']) + else: + print(f"Fail to drop role {role_name}, code: {res_json['code']}, message: {res_json['message']}") + + def _alter_role(self, command): + role_name_tree: Tree = command['role_name'] + role_name: str = role_name_tree.children[0].strip("'\"") + desc_tree: Tree = command['description'] + desc_str: str = desc_tree.children[0].strip("'\"") + + print(f"alter role name: {role_name}, description: {desc_str}") + url = f'http://{self.host}:{self.port}/api/v1/admin/roles/{role_name}' + response = self.session.put( + url, + json={'description': desc_str} + ) + res_json = response.json() + if response.status_code == 200: + self._print_table_simple(res_json['data']) + else: + print( + f"Fail to update role {role_name} with description: {desc_str}, code: {res_json['code']}, message: {res_json['message']}") + + def _list_roles(self, command): + print("Listing all roles") + url = f'http://{self.host}:{self.port}/api/v1/admin/roles' + response = self.session.get(url) + res_json = response.json() + if response.status_code == 200: + self._print_table_simple(res_json['data']) + else: + print(f"Fail to list roles, code: {res_json['code']}, message: {res_json['message']}") + + def _show_role(self, command): + role_name_tree: Tree = command['role_name'] + role_name: str = role_name_tree.children[0].strip("'\"") + print(f"show role: {role_name}") + url = f'http://{self.host}:{self.port}/api/v1/admin/roles/{role_name}/permission' + response = self.session.get(url) + res_json = response.json() + if response.status_code == 200: + self._print_table_simple(res_json['data']) + else: + print(f"Fail to list roles, code: {res_json['code']}, message: {res_json['message']}") + + def _grant_permission(self, command): + role_name_tree: Tree = command['role_name'] + role_name_str: str = role_name_tree.children[0].strip("'\"") + resource_tree: Tree = command['resource'] + resource_str: str = resource_tree.children[0].strip("'\"") + action_tree_list: list = command['actions'] + actions: list = [] + for action_tree in action_tree_list: + action_str: str = action_tree.children[0].strip("'\"") + actions.append(action_str) + print(f"grant role_name: {role_name_str}, resource: {resource_str}, actions: {actions}") + url = f'http://{self.host}:{self.port}/api/v1/admin/roles/{role_name_str}/permission' + response = self.session.post( + url, + json={'actions': actions, 'resource': resource_str} + ) + res_json = response.json() + if response.status_code == 200: + self._print_table_simple(res_json['data']) + else: + print( + f"Fail to grant role {role_name_str} with {actions} on {resource_str}, code: {res_json['code']}, message: {res_json['message']}") + + def _revoke_permission(self, command): + role_name_tree: Tree = command['role_name'] + role_name_str: str = role_name_tree.children[0].strip("'\"") + resource_tree: Tree = command['resource'] + resource_str: str = resource_tree.children[0].strip("'\"") + action_tree_list: list = command['actions'] + actions: list = [] + for action_tree in action_tree_list: + action_str: str = action_tree.children[0].strip("'\"") + actions.append(action_str) + print(f"revoke role_name: {role_name_str}, resource: {resource_str}, actions: {actions}") + url = f'http://{self.host}:{self.port}/api/v1/admin/roles/{role_name_str}/permission' + response = self.session.delete( + url, + json={'actions': actions, 'resource': resource_str} + ) + res_json = response.json() + if response.status_code == 200: + self._print_table_simple(res_json['data']) + else: + print( + f"Fail to revoke role {role_name_str} with {actions} on {resource_str}, code: {res_json['code']}, message: {res_json['message']}") + + def _alter_user_role(self, command): + role_name_tree: Tree = command['role_name'] + role_name_str: str = role_name_tree.children[0].strip("'\"") + user_name_tree: Tree = command['user_name'] + user_name_str: str = user_name_tree.children[0].strip("'\"") + print(f"alter_user_role user_name: {user_name_str}, role_name: {role_name_str}") + url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name_str}/role' + response = self.session.put( + url, + json={'role_name': role_name_str} + ) + res_json = response.json() + if response.status_code == 200: + self._print_table_simple(res_json['data']) + else: + print( + f"Fail to alter user: {user_name_str} to role {role_name_str}, code: {res_json['code']}, message: {res_json['message']}") + + def _show_user_permission(self, command): + user_name_tree: Tree = command['user_name'] + user_name_str: str = user_name_tree.children[0].strip("'\"") + print(f"show_user_permission user_name: {user_name_str}") + url = f'http://{self.host}:{self.port}/api/v1/admin/users/{user_name_str}/permission' + response = self.session.get(url) + res_json = response.json() + if response.status_code == 200: + self._print_table_simple(res_json['data']) + else: + print( + f"Fail to show user: {user_name_str} permission, code: {res_json['code']}, message: {res_json['message']}") def _handle_meta_command(self, command): meta_command = command['command'] diff --git a/admin/server/auth.py b/admin/server/auth.py index e7c97577a..dd6c71ef9 100644 --- a/admin/server/auth.py +++ b/admin/server/auth.py @@ -38,8 +38,8 @@ from api.utils.api_utils import ( construct_response, ) -def setup_auth(login_manager): +def setup_auth(login_manager): @login_manager.request_loader def load_user(web_request): jwt = Serializer(secret_key=settings.SECRET_KEY) @@ -172,11 +172,18 @@ def login_verify(f): username = auth.parameters['username'] password = auth.parameters['password'] - if check_admin(username, password) is False: + try: + if check_admin(username, password) is False: + return jsonify({ + "code": 500, + "message": "Access denied", + "data": None + }), 200 + except Exception as e: + error_msg = str(e) return jsonify({ - "code": 403, - "message": "Access denied", - "data": None + "code": 500, + "message": error_msg }), 200 return f(*args, **kwargs) diff --git a/admin/server/roles.py b/admin/server/roles.py new file mode 100644 index 000000000..ac04179f0 --- /dev/null +++ b/admin/server/roles.py @@ -0,0 +1,76 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging + +from typing import Dict, Any + +from api.common.exceptions import AdminException + + +class RoleMgr: + @staticmethod + def create_role(role_name: str, description: str): + error_msg = f"not implement: create role: {role_name}, description: {description}" + logging.error(error_msg) + raise AdminException(error_msg) + + @staticmethod + def update_role_description(role_name: str, description: str) -> Dict[str, Any]: + error_msg = f"not implement: update role: {role_name} with description: {description}" + logging.error(error_msg) + raise AdminException(error_msg) + + @staticmethod + def delete_role(role_name: str) -> Dict[str, Any]: + error_msg = f"not implement: drop role: {role_name}" + logging.error(error_msg) + raise AdminException(error_msg) + + @staticmethod + def list_roles() -> Dict[str, Any]: + error_msg = "not implement: list roles" + logging.error(error_msg) + raise AdminException(error_msg) + + @staticmethod + def get_role_permission(role_name: str) -> Dict[str, Any]: + error_msg = f"not implement: show role {role_name}" + logging.error(error_msg) + raise AdminException(error_msg) + + @staticmethod + def grant_role_permission(role_name: str, actions: list, resource: str) -> Dict[str, Any]: + error_msg = f"not implement: grant role {role_name} actions: {actions} on {resource}" + logging.error(error_msg) + raise AdminException(error_msg) + + @staticmethod + def revoke_role_permission(role_name: str, actions: list, resource: str) -> Dict[str, Any]: + error_msg = f"not implement: revoke role {role_name} actions: {actions} on {resource}" + logging.error(error_msg) + raise AdminException(error_msg) + + @staticmethod + def update_user_role(user_name: str, role_name: str) -> Dict[str, Any]: + error_msg = f"not implement: update user role: {user_name} to role {role_name}" + logging.error(error_msg) + raise AdminException(error_msg) + + @staticmethod + def get_user_permission(user_name: str) -> Dict[str, Any]: + error_msg = f"not implement: get user permission: {user_name}" + logging.error(error_msg) + raise AdminException(error_msg) diff --git a/admin/server/routes.py b/admin/server/routes.py index 9322a8244..5c82fe849 100644 --- a/admin/server/routes.py +++ b/admin/server/routes.py @@ -22,6 +22,7 @@ from flask_login import current_user, logout_user, login_required from auth import login_verify, login_admin, check_admin_auth from responses import success_response, error_response from services import UserMgr, ServiceMgr, UserServiceMgr +from roles import RoleMgr from api.common.exceptions import AdminException admin_bp = Blueprint('admin', __name__, url_prefix='/api/v1/admin') @@ -240,3 +241,125 @@ def restart_service(service_id): return success_response(services) except Exception as e: return error_response(str(e), 500) + + +@admin_bp.route('/roles', methods=['POST']) +@login_required +@check_admin_auth +def create_role(): + try: + data = request.get_json() + if not data or 'role_name' not in data: + return error_response("Role name is required", 400) + role_name: str = data['role_name'] + description: str = data['description'] + res = RoleMgr.create_role(role_name, description) + return success_response(res) + except Exception as e: + return error_response(str(e), 500) + + +@admin_bp.route('/roles/', methods=['PUT']) +@login_required +@check_admin_auth +def update_role(role_name: str): + try: + data = request.get_json() + if not data or 'description' not in data: + return error_response("Role description is required", 400) + description: str = data['description'] + res = RoleMgr.update_role_description(role_name, description) + return success_response(res) + except Exception as e: + return error_response(str(e), 500) + + +@admin_bp.route('/roles/', methods=['DELETE']) +@login_required +@check_admin_auth +def delete_role(role_name: str): + try: + res = RoleMgr.delete_role(role_name) + return success_response(res) + except Exception as e: + return error_response(str(e), 500) + + +@admin_bp.route('/roles', methods=['GET']) +@login_required +@check_admin_auth +def list_roles(): + try: + res = RoleMgr.list_roles() + return success_response(res) + except Exception as e: + return error_response(str(e), 500) + + +@admin_bp.route('/roles//permission', methods=['GET']) +@login_required +@check_admin_auth +def get_role_permission(role_name: str): + try: + res = RoleMgr.get_role_permission(role_name) + return success_response(res) + except Exception as e: + return error_response(str(e), 500) + + +@admin_bp.route('/roles//permission', methods=['POST']) +@login_required +@check_admin_auth +def grant_role_permission(role_name: str): + try: + data = request.get_json() + if not data or 'actions' not in data or 'resource' not in data: + return error_response("Permission is required", 400) + actions: list = data['actions'] + resource: str = data['resource'] + res = RoleMgr.grant_role_permission(role_name, actions, resource) + return success_response(res) + except Exception as e: + return error_response(str(e), 500) + + +@admin_bp.route('/roles//permission', methods=['DELETE']) +@login_required +@check_admin_auth +def revoke_role_permission(role_name: str): + try: + data = request.get_json() + if not data or 'actions' not in data or 'resource' not in data: + return error_response("Permission is required", 400) + actions: list = data['actions'] + resource: str = data['resource'] + res = RoleMgr.revoke_role_permission(role_name, actions, resource) + return success_response(res) + except Exception as e: + return error_response(str(e), 500) + + +@admin_bp.route('/users//role', methods=['PUT']) +@login_required +@check_admin_auth +def update_user_role(user_name: str): + try: + data = request.get_json() + if not data or 'role_name' not in data: + return error_response("Role name is required", 400) + role_name: str = data['role_name'] + res = RoleMgr.update_user_role(user_name, role_name) + return success_response(res) + except Exception as e: + return error_response(str(e), 500) + + +@admin_bp.route('/users//permission', methods=['GET']) +@login_required +@check_admin_auth +def get_user_permission(user_name: str): + try: + res = RoleMgr.get_user_permission(user_name) + return success_response(res) + except Exception as e: + return error_response(str(e), 500)