Refa: ensure Redis stream queue could be created properly (#9223)

### What problem does this PR solve?

Ensure Redis queue could be created properly.

### Type of change

- [x] Refactoring
This commit is contained in:
Yongteng Lei
2025-08-05 09:54:31 +08:00
committed by GitHub
parent 6ec3f18e22
commit a249803961

View File

@ -226,10 +226,21 @@ class RedisDB:
def queue_consumer(self, queue_name, group_name, consumer_name, msg_id=b">") -> RedisMsg:
"""https://redis.io/docs/latest/commands/xreadgroup/"""
for _ in range(3):
try:
try:
group_info = self.REDIS.xinfo_groups(queue_name)
if not any(gi["name"] == group_name for gi in group_info):
self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True)
except redis.exceptions.ResponseError as e:
if "no such key" in str(e).lower():
self.REDIS.xgroup_create(queue_name, group_name, id="0", mkstream=True)
elif "busygroup" in str(e).lower():
logging.warning("Group already exists, continue.")
pass
else:
raise
args = {
"groupname": group_name,
"consumername": consumer_name,