Feat: add forgot password reset, solve #8547 (#10586)

### What problem does this PR solve?

Feat: add forgot password reset, solve #8547

### Type of change

- [X] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Billy Bao
2025-10-16 15:07:49 +08:00
committed by GitHub
parent f0375c4acd
commit 447041d265
5 changed files with 276 additions and 21 deletions

View File

@ -15,11 +15,14 @@
#
import json
import logging
import string
import os
import re
import secrets
import time
from datetime import datetime
from flask import redirect, request, session
from flask import redirect, request, session, Response
from flask_login import current_user, login_required, login_user, logout_user
from werkzeug.security import check_password_hash, generate_password_hash
@ -46,6 +49,19 @@ from api.utils.api_utils import (
validate_request,
)
from api.utils.crypt import decrypt
from rag.utils.redis_conn import REDIS_CONN
from api.apps import smtp_mail_server
from api.utils.web_utils import (
send_email_html,
OTP_LENGTH,
OTP_TTL_SECONDS,
ATTEMPT_LIMIT,
ATTEMPT_LOCK_SECONDS,
RESEND_COOLDOWN_SECONDS,
otp_keys,
hash_code,
captcha_key,
)
@manager.route("/login", methods=["POST", "GET"]) # noqa: F821
@ -825,3 +841,170 @@ def set_tenant_info():
return get_json_result(data=True)
except Exception as e:
return server_error_response(e)
@manager.route("/forget/get-captcha", methods=["GET"]) # noqa: F821
def forget_get_otp():
"""
GET /forget/get-captcha?email=<email>
- Generate an image captcha and cache it in Redis under key captcha:{email} with TTL = OTP_TTL_SECONDS.
- Returns the captcha as a PNG image.
"""
email = (request.args.get("email") or "")
if not email:
return get_json_result(data=False, code=settings.RetCode.ARGUMENT_ERROR, message="email is required")
users = UserService.query(email=email)
if not users:
return get_json_result(data=False, code=settings.RetCode.DATA_ERROR, message="invalid email")
# Generate captcha text
allowed = string.ascii_uppercase + string.digits
captcha_text = "".join(secrets.choice(allowed) for _ in range(OTP_LENGTH))
REDIS_CONN.set(captcha_key(email), captcha_text, 60) # Valid for 60 seconds
from captcha.image import ImageCaptcha
image = ImageCaptcha(width=300, height=120, font_sizes=[50, 60, 70])
img_bytes = image.generate(captcha_text).read()
return Response(img_bytes, mimetype="image/png")
@manager.route("/forget/send-otp", methods=["POST"]) # noqa: F821
def forget_send_otp():
"""
POST /forget/send-otp
- Verify the image captcha stored at captcha:{email} (case-insensitive).
- On success, generate an email OTP (AZ with length = OTP_LENGTH), store hash + salt (and timestamp) in Redis with TTL, reset attempts and cooldown, and send the OTP via email.
"""
req = request.get_json()
email = req.get("email") or ""
captcha = (req.get("captcha") or "").strip()
if not email or not captcha:
return get_json_result(data=False, code=settings.RetCode.ARGUMENT_ERROR, message="email and captcha required")
users = UserService.query(email=email)
if not users:
return get_json_result(data=False, code=settings.RetCode.DATA_ERROR, message="invalid email")
stored_captcha = REDIS_CONN.get(captcha_key(email))
if not stored_captcha:
return get_json_result(data=False, code=settings.RetCode.NOT_EFFECTIVE, message="invalid or expired captcha")
if (stored_captcha or "").strip().lower() != captcha.lower():
return get_json_result(data=False, code=settings.RetCode.AUTHENTICATION_ERROR, message="invalid or expired captcha")
# Delete captcha to prevent reuse
REDIS_CONN.delete(captcha_key(email))
k_code, k_attempts, k_last, k_lock = otp_keys(email)
now = int(time.time())
last_ts = REDIS_CONN.get(k_last)
if last_ts:
try:
elapsed = now - int(last_ts)
except Exception:
elapsed = RESEND_COOLDOWN_SECONDS
remaining = RESEND_COOLDOWN_SECONDS - elapsed
if remaining > 0:
return get_json_result(data=False, code=settings.RetCode.NOT_EFFECTIVE, message=f"you still have to wait {remaining} seconds")
# Generate OTP (uppercase letters only) and store hashed
otp = "".join(secrets.choice(string.ascii_uppercase) for _ in range(OTP_LENGTH))
salt = os.urandom(16)
code_hash = hash_code(otp, salt)
REDIS_CONN.set(k_code, f"{code_hash}:{salt.hex()}", OTP_TTL_SECONDS)
REDIS_CONN.set(k_attempts, 0, OTP_TTL_SECONDS)
REDIS_CONN.set(k_last, now, OTP_TTL_SECONDS)
REDIS_CONN.delete(k_lock)
ttl_min = OTP_TTL_SECONDS // 60
if not smtp_mail_server:
logging.warning("SMTP mail server not initialized; skip sending email.")
else:
try:
send_email_html(
subject="Your Password Reset Code",
to_email=email,
template_key="reset_code",
code=otp,
ttl_min=ttl_min,
)
except Exception:
return get_json_result(data=False, code=settings.RetCode.SERVER_ERROR, message="failed to send email")
return get_json_result(data=True, code=settings.RetCode.SUCCESS, message="verification passed, email sent")
@manager.route("/forget", methods=["POST"]) # noqa: F821
def forget():
"""
POST: Verify email + OTP and reset password, then log the user in.
Request JSON: { email, otp, new_password, confirm_new_password }
"""
req = request.get_json()
email = req.get("email") or ""
otp = (req.get("otp") or "").strip()
new_pwd = req.get("new_password")
new_pwd2 = req.get("confirm_new_password")
if not all([email, otp, new_pwd, new_pwd2]):
return get_json_result(data=False, code=settings.RetCode.ARGUMENT_ERROR, message="email, otp and passwords are required")
# For reset, passwords are provided as-is (no decrypt needed)
if new_pwd != new_pwd2:
return get_json_result(data=False, code=settings.RetCode.ARGUMENT_ERROR, message="passwords do not match")
users = UserService.query(email=email)
if not users:
return get_json_result(data=False, code=settings.RetCode.DATA_ERROR, message="invalid email")
user = users[0]
# Verify OTP from Redis
k_code, k_attempts, k_last, k_lock = otp_keys(email)
if REDIS_CONN.get(k_lock):
return get_json_result(data=False, code=settings.RetCode.NOT_EFFECTIVE, message="too many attempts, try later")
stored = REDIS_CONN.get(k_code)
if not stored:
return get_json_result(data=False, code=settings.RetCode.NOT_EFFECTIVE, message="expired otp")
try:
stored_hash, salt_hex = str(stored).split(":", 1)
salt = bytes.fromhex(salt_hex)
except Exception:
return get_json_result(data=False, code=settings.RetCode.EXCEPTION_ERROR, message="otp storage corrupted")
# Case-insensitive verification: OTP generated uppercase
calc = hash_code(otp.upper(), salt)
if calc != stored_hash:
# bump attempts
try:
attempts = int(REDIS_CONN.get(k_attempts) or 0) + 1
except Exception:
attempts = 1
REDIS_CONN.set(k_attempts, attempts, OTP_TTL_SECONDS)
if attempts >= ATTEMPT_LIMIT:
REDIS_CONN.set(k_lock, int(time.time()), ATTEMPT_LOCK_SECONDS)
return get_json_result(data=False, code=settings.RetCode.AUTHENTICATION_ERROR, message="expired otp")
# Success: consume OTP and reset password
REDIS_CONN.delete(k_code)
REDIS_CONN.delete(k_attempts)
REDIS_CONN.delete(k_last)
REDIS_CONN.delete(k_lock)
try:
UserService.update_user_password(user.id, new_pwd)
except Exception as e:
logging.exception(e)
return get_json_result(data=False, code=settings.RetCode.EXCEPTION_ERROR, message="failed to reset password")
# Auto login (reuse login flow)
user.access_token = get_uuid()
login_user(user)
user.update_time = (current_timestamp(),)
user.update_date = (datetime_format(datetime.now()),)
user.save()
msg = "Password reset successful. Logged in."
return construct_response(data=user.to_json(), auth=user.get_id(), message=msg)