From bb869aca33fcb1454048017deb712c8439e09ec1 Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Wed, 19 Mar 2025 17:46:58 +0800 Subject: [PATCH] Fix get_unacked_iterator (#6280) ### What problem does this PR solve? Fix get_unacked_iterator. Close #6132 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- rag/svr/task_executor.py | 1 + rag/utils/redis_conn.py | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index f353c44a4..e8ccb5be9 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -592,6 +592,7 @@ async def handle_task(): global DONE_TASKS, FAILED_TASKS redis_msg, task = await collect() if not task: + await trio.sleep(5) return try: logging.info(f"handle_task begin for task {json.dumps(task)}") diff --git a/rag/utils/redis_conn.py b/rag/utils/redis_conn.py index 98a152c89..a83febbed 100644 --- a/rag/utils/redis_conn.py +++ b/rag/utils/redis_conn.py @@ -209,7 +209,7 @@ class RedisDB: """https://redis.io/docs/latest/commands/xreadgroup/""" try: group_info = self.REDIS.xinfo_groups(queue_name) - if not any(e["name"] == group_name for e in group_info): + if not any(gi["name"] == group_name for gi in group_info): self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True) args = { "groupname": group_name, @@ -228,7 +228,7 @@ class RedisDB: res = RedisMsg(self.REDIS, queue_name, group_name, msg_id, payload) return res except Exception as e: - if "key" in str(e): + if str(e) == 'no such key': pass else: logging.exception( @@ -242,8 +242,14 @@ class RedisDB: def get_unacked_iterator(self, queue_names: list[str], group_name, consumer_name): try: for queue_name in queue_names: - group_info = self.REDIS.xinfo_groups(queue_name) - if not any(e["name"] == group_name for e in group_info): + try: + group_info = self.REDIS.xinfo_groups(queue_name) + except Exception as e: + if str(e) == 'no such key': + logging.warning(f"RedisDB.get_unacked_iterator queue {queue_name} doesn't exist") + continue + if not any(gi["name"] == group_name for gi in group_info): + logging.warning(f"RedisDB.get_unacked_iterator queue {queue_name} group {group_name} doesn't exist") continue current_min = 0 while True: @@ -251,13 +257,11 @@ class RedisDB: if not payload: break current_min = payload.get_msg_id() - logging.info(f"RedisDB.get_unacked_iterator {consumer_name} msg_id {current_min}") + logging.info(f"RedisDB.get_unacked_iterator {queue_name} {consumer_name} {current_min}") yield payload - except Exception as e: - if "key" in str(e): - return + except Exception: logging.exception( - "RedisDB.get_unacked_iterator " + consumer_name + " got exception: " + "RedisDB.get_unacked_iterator got exception: " ) self.__open__()