From 814a210f5d643047b53ad9d31936290605bb8806 Mon Sep 17 00:00:00 2001 From: liwenju0 Date: Tue, 25 Mar 2025 15:09:56 +0800 Subject: [PATCH] Fix: failed to acquire lock exception with retry mechanism for postgres and mysql (#6483) Added the with_retry decorator in db_models.py to add a retry mechanism for database operations. Applied the retry mechanism to the lock and unlock methods of the PostgresDatabaseLock and MysqlDatabaseLock classes to enhance the reliability of lock operations. ### What problem does this PR solve? resolve failed to acquire lock exception with retry mechanism ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --------- Co-authored-by: wenju.li --- api/db/db_models.py | 53 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/api/db/db_models.py b/api/db/db_models.py index abd0efb7f..490eecc47 100644 --- a/api/db/db_models.py +++ b/api/db/db_models.py @@ -19,8 +19,10 @@ import operator import os import sys import typing +import time from enum import Enum from functools import wraps +import hashlib from flask_login import UserMixin from itsdangerous.url_safe import URLSafeTimedSerializer as Serializer @@ -260,14 +262,54 @@ class BaseDataBase: logging.info("init database on cluster mode successfully") +def with_retry(max_retries=3, retry_delay=1.0): + """Decorator: Add retry mechanism to database operations + + Args: + max_retries (int): maximum number of retries + retry_delay (float): initial retry delay (seconds), will increase exponentially + + Returns: + decorated function + """ + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + last_exception = None + for retry in range(max_retries): + try: + return func(*args, **kwargs) + except Exception as e: + last_exception = e + # get self and method name for logging + self_obj = args[0] if args else None + func_name = func.__name__ + lock_name = getattr(self_obj, 'lock_name', 'unknown') if self_obj else 'unknown' + + if retry < max_retries - 1: + current_delay = retry_delay * (2 ** retry) + logging.warning(f"{func_name} {lock_name} failed: {str(e)}, retrying ({retry+1}/{max_retries})") + time.sleep(current_delay) + else: + logging.error(f"{func_name} {lock_name} failed after all attempts: {str(e)}") + + if last_exception: + raise last_exception + return False + return wrapper + return decorator + + class PostgresDatabaseLock: def __init__(self, lock_name, timeout=10, db=None): self.lock_name = lock_name + self.lock_id = int(hashlib.md5(lock_name.encode()).hexdigest(), 16) % (2**31-1) self.timeout = int(timeout) self.db = db if db else DB + @with_retry(max_retries=3, retry_delay=1.0) def lock(self): - cursor = self.db.execute_sql("SELECT pg_try_advisory_lock(%s)", self.timeout) + cursor = self.db.execute_sql("SELECT pg_try_advisory_lock(%s)", (self.lock_id,)) ret = cursor.fetchone() if ret[0] == 0: raise Exception(f"acquire postgres lock {self.lock_name} timeout") @@ -276,8 +318,9 @@ class PostgresDatabaseLock: else: raise Exception(f"failed to acquire lock {self.lock_name}") + @with_retry(max_retries=3, retry_delay=1.0) def unlock(self): - cursor = self.db.execute_sql("SELECT pg_advisory_unlock(%s)", self.timeout) + cursor = self.db.execute_sql("SELECT pg_advisory_unlock(%s)", (self.lock_id,)) ret = cursor.fetchone() if ret[0] == 0: raise Exception(f"postgres lock {self.lock_name} was not established by this thread") @@ -287,12 +330,12 @@ class PostgresDatabaseLock: raise Exception(f"postgres lock {self.lock_name} does not exist") def __enter__(self): - if isinstance(self.db, PostgresDatabaseLock): + if isinstance(self.db, PooledPostgresqlDatabase): self.lock() return self def __exit__(self, exc_type, exc_val, exc_tb): - if isinstance(self.db, PostgresDatabaseLock): + if isinstance(self.db, PooledPostgresqlDatabase): self.unlock() def __call__(self, func): @@ -310,6 +353,7 @@ class MysqlDatabaseLock: self.timeout = int(timeout) self.db = db if db else DB + @with_retry(max_retries=3, retry_delay=1.0) def lock(self): # SQL parameters only support %s format placeholders cursor = self.db.execute_sql("SELECT GET_LOCK(%s, %s)", (self.lock_name, self.timeout)) @@ -321,6 +365,7 @@ class MysqlDatabaseLock: else: raise Exception(f"failed to acquire lock {self.lock_name}") + @with_retry(max_retries=3, retry_delay=1.0) def unlock(self): cursor = self.db.execute_sql("SELECT RELEASE_LOCK(%s)", (self.lock_name,)) ret = cursor.fetchone()