apply pep8 formalize (#155)

This commit is contained in:
KevinHuSh
2024-03-27 11:33:46 +08:00
committed by GitHub
parent a02e836790
commit fd7fcb5baf
55 changed files with 1568 additions and 753 deletions

View File

@ -40,13 +40,16 @@ def login():
email = request.json.get('email', "")
users = UserService.query(email=email)
if not users: return get_json_result(data=False, retcode=RetCode.AUTHENTICATION_ERROR, retmsg=f'This Email is not registered!')
if not users:
return get_json_result(
data=False, retcode=RetCode.AUTHENTICATION_ERROR, retmsg=f'This Email is not registered!')
password = request.json.get('password')
try:
password = decrypt(password)
except:
return get_json_result(data=False, retcode=RetCode.SERVER_ERROR, retmsg='Fail to crypt password')
except BaseException:
return get_json_result(
data=False, retcode=RetCode.SERVER_ERROR, retmsg='Fail to crypt password')
user = UserService.query_user(email, password)
if user:
@ -57,7 +60,8 @@ def login():
msg = "Welcome back!"
return cors_reponse(data=response_data, auth=user.get_id(), retmsg=msg)
else:
return get_json_result(data=False, retcode=RetCode.AUTHENTICATION_ERROR, retmsg='Email and Password do not match!')
return get_json_result(data=False, retcode=RetCode.AUTHENTICATION_ERROR,
retmsg='Email and Password do not match!')
@manager.route('/github_callback', methods=['GET'])
@ -65,7 +69,7 @@ def github_callback():
import requests
res = requests.post(GITHUB_OAUTH.get("url"), data={
"client_id": GITHUB_OAUTH.get("client_id"),
"client_secret": GITHUB_OAUTH.get("secret_key"),
"client_secret": GITHUB_OAUTH.get("secret_key"),
"code": request.args.get('code')
}, headers={"Accept": "application/json"})
res = res.json()
@ -96,15 +100,17 @@ def github_callback():
"last_login_time": get_format_time(),
"is_superuser": False,
})
if not users: raise Exception('Register user failure.')
if len(users) > 1: raise Exception('Same E-mail exist!')
if not users:
raise Exception('Register user failure.')
if len(users) > 1:
raise Exception('Same E-mail exist!')
user = users[0]
login_user(user)
return redirect("/?auth=%s"%user.get_id())
return redirect("/?auth=%s" % user.get_id())
except Exception as e:
rollback_user_registration(user_id)
stat_logger.exception(e)
return redirect("/?error=%s"%str(e))
return redirect("/?error=%s" % str(e))
user = users[0]
user.access_token = get_uuid()
login_user(user)
@ -114,11 +120,18 @@ def github_callback():
def user_info_from_github(access_token):
import requests
headers = {"Accept": "application/json", 'Authorization': f"token {access_token}"}
res = requests.get(f"https://api.github.com/user?access_token={access_token}", headers=headers)
headers = {"Accept": "application/json",
'Authorization': f"token {access_token}"}
res = requests.get(
f"https://api.github.com/user?access_token={access_token}",
headers=headers)
user_info = res.json()
email_info = requests.get(f"https://api.github.com/user/emails?access_token={access_token}", headers=headers).json()
user_info["email"] = next((email for email in email_info if email['primary'] == True), None)["email"]
email_info = requests.get(
f"https://api.github.com/user/emails?access_token={access_token}",
headers=headers).json()
user_info["email"] = next(
(email for email in email_info if email['primary'] == True),
None)["email"]
return user_info
@ -138,13 +151,18 @@ def setting_user():
request_data = request.json
if request_data.get("password"):
new_password = request_data.get("new_password")
if not check_password_hash(current_user.password, decrypt(request_data["password"])):
return get_json_result(data=False, retcode=RetCode.AUTHENTICATION_ERROR, retmsg='Password error!')
if not check_password_hash(
current_user.password, decrypt(request_data["password"])):
return get_json_result(
data=False, retcode=RetCode.AUTHENTICATION_ERROR, retmsg='Password error!')
if new_password: update_dict["password"] = generate_password_hash(decrypt(new_password))
if new_password:
update_dict["password"] = generate_password_hash(
decrypt(new_password))
for k in request_data.keys():
if k in ["password", "new_password"]:continue
if k in ["password", "new_password"]:
continue
update_dict[k] = request_data[k]
try:
@ -152,7 +170,8 @@ def setting_user():
return get_json_result(data=True)
except Exception as e:
stat_logger.exception(e)
return get_json_result(data=False, retmsg='Update failure!', retcode=RetCode.EXCEPTION_ERROR)
return get_json_result(
data=False, retmsg='Update failure!', retcode=RetCode.EXCEPTION_ERROR)
@manager.route("/info", methods=["GET"])
@ -173,11 +192,11 @@ def rollback_user_registration(user_id):
except Exception as e:
pass
try:
TenantLLM.delete().where(TenantLLM.tenant_id==user_id).excute()
TenantLLM.delete().where(TenantLLM.tenant_id == user_id).excute()
except Exception as e:
pass
def user_register(user_id, user):
user["id"] = user_id
tenant = {
@ -197,9 +216,14 @@ def user_register(user_id, user):
}
tenant_llm = []
for llm in LLMService.query(fid=LLM_FACTORY):
tenant_llm.append({"tenant_id": user_id, "llm_factory": LLM_FACTORY, "llm_name": llm.llm_name, "model_type":llm.model_type, "api_key": API_KEY})
tenant_llm.append({"tenant_id": user_id,
"llm_factory": LLM_FACTORY,
"llm_name": llm.llm_name,
"model_type": llm.model_type,
"api_key": API_KEY})
if not UserService.save(**user):return
if not UserService.save(**user):
return
TenantService.insert(**tenant)
UserTenantService.insert(**usr_tenant)
TenantLLMService.insert_many(tenant_llm)
@ -211,7 +235,8 @@ def user_register(user_id, user):
def user_add():
req = request.json
if UserService.query(email=req["email"]):
return get_json_result(data=False, retmsg=f'Email: {req["email"]} has already registered!', retcode=RetCode.OPERATING_ERROR)
return get_json_result(
data=False, retmsg=f'Email: {req["email"]} has already registered!', retcode=RetCode.OPERATING_ERROR)
if not re.match(r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,4}$", req["email"]):
return get_json_result(data=False, retmsg=f'Invaliad e-mail: {req["email"]}!',
retcode=RetCode.OPERATING_ERROR)
@ -229,16 +254,19 @@ def user_add():
user_id = get_uuid()
try:
users = user_register(user_id, user_dict)
if not users: raise Exception('Register user failure.')
if len(users) > 1: raise Exception('Same E-mail exist!')
if not users:
raise Exception('Register user failure.')
if len(users) > 1:
raise Exception('Same E-mail exist!')
user = users[0]
login_user(user)
return cors_reponse(data=user.to_json(), auth=user.get_id(), retmsg="Welcome aboard!")
return cors_reponse(data=user.to_json(),
auth=user.get_id(), retmsg="Welcome aboard!")
except Exception as e:
rollback_user_registration(user_id)
stat_logger.exception(e)
return get_json_result(data=False, retmsg='User registration failure!', retcode=RetCode.EXCEPTION_ERROR)
return get_json_result(
data=False, retmsg='User registration failure!', retcode=RetCode.EXCEPTION_ERROR)
@manager.route("/tenant_info", methods=["GET"])