diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index ed76396a3..fbe6f134f 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -705,34 +705,7 @@ async def report_status(): finally: redis_lock.release() await trio.sleep(30) - - -def recover_pending_tasks(): - redis_lock = RedisDistributedLock("recover_pending_tasks", lock_value=CONSUMER_NAME, timeout=60) - svr_queue_names = get_svr_queue_names() - while not stop_event.is_set(): - try: - if redis_lock.acquire(): - for queue_name in svr_queue_names: - msgs = REDIS_CONN.get_pending_msg(queue=queue_name, group_name=SVR_CONSUMER_GROUP_NAME) - msgs = [msg for msg in msgs if msg['consumer'] != CONSUMER_NAME] - if len(msgs) == 0: - continue - - task_executors = REDIS_CONN.smembers("TASKEXE") - task_executor_set = {t for t in task_executors} - msgs = [msg for msg in msgs if msg['consumer'] not in task_executor_set] - for msg in msgs: - logging.info( - f"Recover pending task: {msg['message_id']}, consumer: {msg['consumer']}, " - f"time since delivered: {msg['time_since_delivered'] / 1000} s" - ) - REDIS_CONN.requeue_msg(queue_name, SVR_CONSUMER_GROUP_NAME, msg['message_id']) - except Exception: - logging.warning("recover_pending_tasks got exception") - finally: - redis_lock.release() - stop_event.wait(60) + async def task_manager(): try: @@ -762,8 +735,6 @@ async def main(): signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - threading.Thread(name="RecoverPendingTask", target=recover_pending_tasks).start() - async with trio.open_nursery() as nursery: nursery.start_soon(report_status) while not stop_event.is_set():