From a24980396158d20834fa6b7f8f28347f18dc33da Mon Sep 17 00:00:00 2001 From: Yongteng Lei Date: Tue, 5 Aug 2025 09:54:31 +0800 Subject: [PATCH] Refa: ensure Redis stream queue could be created properly (#9223) ### What problem does this PR solve? Ensure Redis queue could be created properly. ### Type of change - [x] Refactoring --- rag/utils/redis_conn.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/rag/utils/redis_conn.py b/rag/utils/redis_conn.py index 334f438ed..5571d8b2f 100644 --- a/rag/utils/redis_conn.py +++ b/rag/utils/redis_conn.py @@ -227,9 +227,20 @@ class RedisDB: """https://redis.io/docs/latest/commands/xreadgroup/""" for _ in range(3): try: - group_info = self.REDIS.xinfo_groups(queue_name) - if not any(gi["name"] == group_name for gi in group_info): - self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True) + + try: + group_info = self.REDIS.xinfo_groups(queue_name) + if not any(gi["name"] == group_name for gi in group_info): + self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True) + except redis.exceptions.ResponseError as e: + if "no such key" in str(e).lower(): + self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True) + elif "busygroup" in str(e).lower(): + logging.warning("Group already exists, continue.") + pass + else: + raise + args = { "groupname": group_name, "consumername": consumer_name, @@ -338,8 +349,8 @@ class RedisDB: logging.warning("RedisDB.delete " + str(key) + " got exception: " + str(e)) self.__open__() return False - - + + REDIS_CONN = RedisDB()