rename web_server to api (#29)

* add front end code

* change licence

* rename web_server to API

* change name to web_server
This commit is contained in:
KevinHuSh
2024-01-17 09:43:27 +08:00
committed by GitHub
parent c372afe40a
commit 6be3dd56fa
41 changed files with 284 additions and 262 deletions

57
api/hook/__init__.py Normal file
View File

@ -0,0 +1,57 @@
import importlib
from web_server.hook.common.parameters import SignatureParameters, AuthenticationParameters, \
SignatureReturn, AuthenticationReturn, PermissionReturn, ClientAuthenticationReturn, ClientAuthenticationParameters
from web_server.settings import HOOK_MODULE, stat_logger,RetCode
class HookManager:
SITE_SIGNATURE = []
SITE_AUTHENTICATION = []
CLIENT_AUTHENTICATION = []
PERMISSION_CHECK = []
@staticmethod
def init():
if HOOK_MODULE is not None:
for modules in HOOK_MODULE.values():
for module in modules.split(";"):
try:
importlib.import_module(module)
except Exception as e:
stat_logger.exception(e)
@staticmethod
def register_site_signature_hook(func):
HookManager.SITE_SIGNATURE.append(func)
@staticmethod
def register_site_authentication_hook(func):
HookManager.SITE_AUTHENTICATION.append(func)
@staticmethod
def register_client_authentication_hook(func):
HookManager.CLIENT_AUTHENTICATION.append(func)
@staticmethod
def register_permission_check_hook(func):
HookManager.PERMISSION_CHECK.append(func)
@staticmethod
def client_authentication(parm: ClientAuthenticationParameters) -> ClientAuthenticationReturn:
if HookManager.CLIENT_AUTHENTICATION:
return HookManager.CLIENT_AUTHENTICATION[0](parm)
return ClientAuthenticationReturn()
@staticmethod
def site_signature(parm: SignatureParameters) -> SignatureReturn:
if HookManager.SITE_SIGNATURE:
return HookManager.SITE_SIGNATURE[0](parm)
return SignatureReturn()
@staticmethod
def site_authentication(parm: AuthenticationParameters) -> AuthenticationReturn:
if HookManager.SITE_AUTHENTICATION:
return HookManager.SITE_AUTHENTICATION[0](parm)
return AuthenticationReturn()

View File

@ -0,0 +1,29 @@
import requests
from web_server.db.service_registry import ServiceRegistry
from web_server.settings import RegistryServiceName
from web_server.hook import HookManager
from web_server.hook.common.parameters import ClientAuthenticationParameters, ClientAuthenticationReturn
from web_server.settings import HOOK_SERVER_NAME
@HookManager.register_client_authentication_hook
def authentication(parm: ClientAuthenticationParameters) -> ClientAuthenticationReturn:
service_list = ServiceRegistry.load_service(
server_name=HOOK_SERVER_NAME,
service_name=RegistryServiceName.CLIENT_AUTHENTICATION.value
)
if not service_list:
raise Exception(f"client authentication error: no found server"
f" {HOOK_SERVER_NAME} service client_authentication")
service = service_list[0]
response = getattr(requests, service.f_method.lower(), None)(
url=service.f_url,
json=parm.to_dict()
)
if response.status_code != 200:
raise Exception(
f"client authentication error: request authentication url failed, status code {response.status_code}")
elif response.json().get("code") != 0:
return ClientAuthenticationReturn(code=response.json().get("code"), message=response.json().get("msg"))
return ClientAuthenticationReturn()

View File

@ -0,0 +1,25 @@
import requests
from web_server.db.service_registry import ServiceRegistry
from web_server.settings import RegistryServiceName
from web_server.hook import HookManager
from web_server.hook.common.parameters import PermissionCheckParameters, PermissionReturn
from web_server.settings import HOOK_SERVER_NAME
@HookManager.register_permission_check_hook
def permission(parm: PermissionCheckParameters) -> PermissionReturn:
service_list = ServiceRegistry.load_service(server_name=HOOK_SERVER_NAME, service_name=RegistryServiceName.PERMISSION_CHECK.value)
if not service_list:
raise Exception(f"permission check error: no found server {HOOK_SERVER_NAME} service permission")
service = service_list[0]
response = getattr(requests, service.f_method.lower(), None)(
url=service.f_url,
json=parm.to_dict()
)
if response.status_code != 200:
raise Exception(
f"permission check error: request permission url failed, status code {response.status_code}")
elif response.json().get("code") != 0:
return PermissionReturn(code=response.json().get("code"), message=response.json().get("msg"))
return PermissionReturn()

View File

@ -0,0 +1,49 @@
import requests
from web_server.db.service_registry import ServiceRegistry
from web_server.settings import RegistryServiceName
from web_server.hook import HookManager
from web_server.hook.common.parameters import SignatureParameters, AuthenticationParameters, AuthenticationReturn,\
SignatureReturn
from web_server.settings import HOOK_SERVER_NAME, PARTY_ID
@HookManager.register_site_signature_hook
def signature(parm: SignatureParameters) -> SignatureReturn:
service_list = ServiceRegistry.load_service(server_name=HOOK_SERVER_NAME, service_name=RegistryServiceName.SIGNATURE.value)
if not service_list:
raise Exception(f"signature error: no found server {HOOK_SERVER_NAME} service signature")
service = service_list[0]
response = getattr(requests, service.f_method.lower(), None)(
url=service.f_url,
json=parm.to_dict()
)
if response.status_code == 200:
if response.json().get("code") == 0:
return SignatureReturn(site_signature=response.json().get("data"))
else:
raise Exception(f"signature error: request signature url failed, result: {response.json()}")
else:
raise Exception(f"signature error: request signature url failed, status code {response.status_code}")
@HookManager.register_site_authentication_hook
def authentication(parm: AuthenticationParameters) -> AuthenticationReturn:
if not parm.src_party_id or str(parm.src_party_id) == "0":
parm.src_party_id = PARTY_ID
service_list = ServiceRegistry.load_service(server_name=HOOK_SERVER_NAME,
service_name=RegistryServiceName.SITE_AUTHENTICATION.value)
if not service_list:
raise Exception(
f"site authentication error: no found server {HOOK_SERVER_NAME} service site_authentication")
service = service_list[0]
response = getattr(requests, service.f_method.lower(), None)(
url=service.f_url,
json=parm.to_dict()
)
if response.status_code != 200:
raise Exception(
f"site authentication error: request site_authentication url failed, status code {response.status_code}")
elif response.json().get("code") != 0:
return AuthenticationReturn(code=response.json().get("code"), message=response.json().get("msg"))
return AuthenticationReturn()

View File

@ -0,0 +1,56 @@
from web_server.settings import RetCode
class ParametersBase:
def to_dict(self):
d = {}
for k, v in self.__dict__.items():
d[k] = v
return d
class ClientAuthenticationParameters(ParametersBase):
def __init__(self, full_path, headers, form, data, json):
self.full_path = full_path
self.headers = headers
self.form = form
self.data = data
self.json = json
class ClientAuthenticationReturn(ParametersBase):
def __init__(self, code=RetCode.SUCCESS, message="success"):
self.code = code
self.message = message
class SignatureParameters(ParametersBase):
def __init__(self, party_id, body):
self.party_id = party_id
self.body = body
class SignatureReturn(ParametersBase):
def __init__(self, code=RetCode.SUCCESS, site_signature=None):
self.code = code
self.site_signature = site_signature
class AuthenticationParameters(ParametersBase):
def __init__(self, site_signature, body):
self.site_signature = site_signature
self.body = body
class AuthenticationReturn(ParametersBase):
def __init__(self, code=RetCode.SUCCESS, message="success"):
self.code = code
self.message = message
class PermissionReturn(ParametersBase):
def __init__(self, code=RetCode.SUCCESS, message="success"):
self.code = code
self.message = message