Refactor user register & login (#1971)

### What problem does this PR solve?

1. Rename the variable
2. Refactor error message
3. Format the code

### Type of change

- [x] Refactoring

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai
2024-08-16 08:43:26 +08:00
committed by GitHub
parent c9551b7f68
commit 5b5e3677b6

View File

@ -37,23 +37,25 @@ from api.utils.api_utils import get_json_result, construct_response
@manager.route('/login', methods=['POST', 'GET']) @manager.route('/login', methods=['POST', 'GET'])
def login(): def login():
login_channel = "password"
if not request.json: if not request.json:
return get_json_result(data=False, retcode=RetCode.AUTHENTICATION_ERROR, return get_json_result(data=False,
retmsg='Unautherized!') retcode=RetCode.AUTHENTICATION_ERROR,
retmsg='Unauthorized!')
email = request.json.get('email', "") email = request.json.get('email', "")
users = UserService.query(email=email) users = UserService.query(email=email)
if not users: if not users:
return get_json_result( return get_json_result(data=False,
data=False, retcode=RetCode.AUTHENTICATION_ERROR, retmsg=f'This Email is not registered!') retcode=RetCode.AUTHENTICATION_ERROR,
retmsg=f'Email: {email} is not registered!')
password = request.json.get('password') password = request.json.get('password')
try: try:
password = decrypt(password) password = decrypt(password)
except BaseException: except BaseException:
return get_json_result( return get_json_result(data=False,
data=False, retcode=RetCode.SERVER_ERROR, retmsg='Fail to crypt password') retcode=RetCode.SERVER_ERROR,
retmsg='Fail to crypt password')
user = UserService.query_user(email, password) user = UserService.query_user(email, password)
if user: if user:
@ -66,18 +68,20 @@ def login():
msg = "Welcome back!" msg = "Welcome back!"
return construct_response(data=response_data, auth=user.get_id(), retmsg=msg) return construct_response(data=response_data, auth=user.get_id(), retmsg=msg)
else: else:
return get_json_result(data=False, retcode=RetCode.AUTHENTICATION_ERROR, return get_json_result(data=False,
retmsg='Email and Password do not match!') retcode=RetCode.AUTHENTICATION_ERROR,
retmsg='Email and password do not match!')
@manager.route('/github_callback', methods=['GET']) @manager.route('/github_callback', methods=['GET'])
def github_callback(): def github_callback():
import requests import requests
res = requests.post(GITHUB_OAUTH.get("url"), data={ res = requests.post(GITHUB_OAUTH.get("url"),
data={
"client_id": GITHUB_OAUTH.get("client_id"), "client_id": GITHUB_OAUTH.get("client_id"),
"client_secret": GITHUB_OAUTH.get("secret_key"), "client_secret": GITHUB_OAUTH.get("secret_key"),
"code": request.args.get('code') "code": request.args.get('code')},
}, headers={"Accept": "application/json"}) headers={"Accept": "application/json"})
res = res.json() res = res.json()
if "error" in res: if "error" in res:
return redirect("/?error=%s" % res["error_description"]) return redirect("/?error=%s" % res["error_description"])
@ -87,29 +91,33 @@ def github_callback():
session["access_token"] = res["access_token"] session["access_token"] = res["access_token"]
session["access_token_from"] = "github" session["access_token_from"] = "github"
userinfo = user_info_from_github(session["access_token"]) user_info = user_info_from_github(session["access_token"])
users = UserService.query(email=userinfo["email"]) email_address = user_info["email"]
users = UserService.query(email=email_address)
user_id = get_uuid() user_id = get_uuid()
if not users: if not users:
# User isn't try to register
try: try:
try: try:
avatar = download_img(userinfo["avatar_url"]) avatar = download_img(user_info["avatar_url"])
except Exception as e: except Exception as e:
stat_logger.exception(e) stat_logger.exception(e)
avatar = "" avatar = ""
users = user_register(user_id, { users = user_register(user_id, {
"access_token": session["access_token"], "access_token": session["access_token"],
"email": userinfo["email"], "email": email_address,
"avatar": avatar, "avatar": avatar,
"nickname": userinfo["login"], "nickname": user_info["login"],
"login_channel": "github", "login_channel": "github",
"last_login_time": get_format_time(), "last_login_time": get_format_time(),
"is_superuser": False, "is_superuser": False,
}) })
if not users: if not users:
raise Exception('Register user failure.') raise Exception(f'Fail to register {email_address}.')
if len(users) > 1: if len(users) > 1:
raise Exception('Same E-mail exist!') raise Exception(f'Same email: {email_address} exists!')
# Try to log in
user = users[0] user = users[0]
login_user(user) login_user(user)
return redirect("/?auth=%s" % user.get_id()) return redirect("/?auth=%s" % user.get_id())
@ -117,6 +125,8 @@ def github_callback():
rollback_user_registration(user_id) rollback_user_registration(user_id)
stat_logger.exception(e) stat_logger.exception(e)
return redirect("/?error=%s" % str(e)) return redirect("/?error=%s" % str(e))
# User has already registered, try to log in
user = users[0] user = users[0]
user.access_token = get_uuid() user.access_token = get_uuid()
login_user(user) login_user(user)
@ -127,19 +137,25 @@ def github_callback():
@manager.route('/feishu_callback', methods=['GET']) @manager.route('/feishu_callback', methods=['GET'])
def feishu_callback(): def feishu_callback():
import requests import requests
app_access_token_res = requests.post(FEISHU_OAUTH.get("app_access_token_url"), data=json.dumps({ app_access_token_res = requests.post(FEISHU_OAUTH.get("app_access_token_url"),
data=json.dumps({
"app_id": FEISHU_OAUTH.get("app_id"), "app_id": FEISHU_OAUTH.get("app_id"),
"app_secret": FEISHU_OAUTH.get("app_secret") "app_secret": FEISHU_OAUTH.get("app_secret")
}), headers={"Content-Type": "application/json; charset=utf-8"}) }),
headers={"Content-Type": "application/json; charset=utf-8"})
app_access_token_res = app_access_token_res.json() app_access_token_res = app_access_token_res.json()
if app_access_token_res['code'] != 0: if app_access_token_res['code'] != 0:
return redirect("/?error=%s" % app_access_token_res) return redirect("/?error=%s" % app_access_token_res)
res = requests.post(FEISHU_OAUTH.get("user_access_token_url"), data=json.dumps({ res = requests.post(FEISHU_OAUTH.get("user_access_token_url"),
data=json.dumps({
"grant_type": FEISHU_OAUTH.get("grant_type"), "grant_type": FEISHU_OAUTH.get("grant_type"),
"code": request.args.get('code') "code": request.args.get('code')
}), headers={"Content-Type": "application/json; charset=utf-8", }),
'Authorization': f"Bearer {app_access_token_res['app_access_token']}"}) headers={
"Content-Type": "application/json; charset=utf-8",
'Authorization': f"Bearer {app_access_token_res['app_access_token']}"
})
res = res.json() res = res.json()
if res['code'] != 0: if res['code'] != 0:
return redirect("/?error=%s" % res["message"]) return redirect("/?error=%s" % res["message"])
@ -148,29 +164,33 @@ def feishu_callback():
return redirect("/?error=contact:user.email:readonly not in scope") return redirect("/?error=contact:user.email:readonly not in scope")
session["access_token"] = res["data"]["access_token"] session["access_token"] = res["data"]["access_token"]
session["access_token_from"] = "feishu" session["access_token_from"] = "feishu"
userinfo = user_info_from_feishu(session["access_token"]) user_info = user_info_from_feishu(session["access_token"])
users = UserService.query(email=userinfo["email"]) email_address = user_info["email"]
users = UserService.query(email=email_address)
user_id = get_uuid() user_id = get_uuid()
if not users: if not users:
# User isn't try to register
try: try:
try: try:
avatar = download_img(userinfo["avatar_url"]) avatar = download_img(user_info["avatar_url"])
except Exception as e: except Exception as e:
stat_logger.exception(e) stat_logger.exception(e)
avatar = "" avatar = ""
users = user_register(user_id, { users = user_register(user_id, {
"access_token": session["access_token"], "access_token": session["access_token"],
"email": userinfo["email"], "email": email_address,
"avatar": avatar, "avatar": avatar,
"nickname": userinfo["en_name"], "nickname": user_info["en_name"],
"login_channel": "feishu", "login_channel": "feishu",
"last_login_time": get_format_time(), "last_login_time": get_format_time(),
"is_superuser": False, "is_superuser": False,
}) })
if not users: if not users:
raise Exception('Register user failure.') raise Exception(f'Fail to register {email_address}.')
if len(users) > 1: if len(users) > 1:
raise Exception('Same E-mail exist!') raise Exception(f'Same email: {email_address} exists!')
# Try to log in
user = users[0] user = users[0]
login_user(user) login_user(user)
return redirect("/?auth=%s" % user.get_id()) return redirect("/?auth=%s" % user.get_id())
@ -178,6 +198,8 @@ def feishu_callback():
rollback_user_registration(user_id) rollback_user_registration(user_id)
stat_logger.exception(e) stat_logger.exception(e)
return redirect("/?error=%s" % str(e)) return redirect("/?error=%s" % str(e))
# User has already registered, try to log in
user = users[0] user = users[0]
user.access_token = get_uuid() user.access_token = get_uuid()
login_user(user) login_user(user)
@ -232,12 +254,10 @@ def setting_user():
new_password = request_data.get("new_password") new_password = request_data.get("new_password")
if not check_password_hash( if not check_password_hash(
current_user.password, decrypt(request_data["password"])): current_user.password, decrypt(request_data["password"])):
return get_json_result( return get_json_result(data=False, retcode=RetCode.AUTHENTICATION_ERROR, retmsg='Password error!')
data=False, retcode=RetCode.AUTHENTICATION_ERROR, retmsg='Password error!')
if new_password: if new_password:
update_dict["password"] = generate_password_hash( update_dict["password"] = generate_password_hash(decrypt(new_password))
decrypt(new_password))
for k in request_data.keys(): for k in request_data.keys():
if k in ["password", "new_password"]: if k in ["password", "new_password"]:
@ -249,13 +269,12 @@ def setting_user():
return get_json_result(data=True) return get_json_result(data=True)
except Exception as e: except Exception as e:
stat_logger.exception(e) stat_logger.exception(e)
return get_json_result( return get_json_result(data=False, retmsg='Update failure!', retcode=RetCode.EXCEPTION_ERROR)
data=False, retmsg='Update failure!', retcode=RetCode.EXCEPTION_ERROR)
@manager.route("/info", methods=["GET"]) @manager.route("/info", methods=["GET"])
@login_required @login_required
def user_info(): def user_profile():
return get_json_result(data=current_user.to_dict()) return get_json_result(data=current_user.to_dict())
@ -337,7 +356,7 @@ def user_add():
# Validate the email address # Validate the email address
if not re.match(r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,4}$", email_address): if not re.match(r"^[\w\._-]+@([\w_-]+\.)+[\w-]{2,4}$", email_address):
return get_json_result(data=False, return get_json_result(data=False,
retmsg=f'Invalid Email address: {email_address}!', retmsg=f'Invalid email address: {email_address}!',
retcode=RetCode.OPERATING_ERROR) retcode=RetCode.OPERATING_ERROR)
# Check if the email address is already used # Check if the email address is already used
@ -365,7 +384,7 @@ def user_add():
if not users: if not users:
raise Exception(f'Fail to register {email_address}.') raise Exception(f'Fail to register {email_address}.')
if len(users) > 1: if len(users) > 1:
raise Exception(f'Same E-mail: {email_address} exists!') raise Exception(f'Same email: {email_address} exists!')
user = users[0] user = users[0]
login_user(user) login_user(user)
return construct_response(data=user.to_json(), return construct_response(data=user.to_json(),