fix: single task executor getting all tasks from Redis queue (#7330)

### What problem does this PR solve?

Currently, as long as there are tasks in Redis, this loop will keep
getting the tasks. This will lead to a single task executor with many
tasks in the pending state. Then we need to wait for the pending tasks
to get them back in the queue.

In first place, if we set the `MAX_CONCURRENT_TASKS` to X, then only X
tasks should be picked from the queue, and others should be left in the
queue for other `task_executors` or be picked after 1 of the spots in
the current executor gets free. This PR ensures this behavior.

The additional changes were due to the Ruff linting in pre-commit. But I
believe these are expected to keep the coding style.

### Type of change

- [X] Bug Fix (non-breaking change which fixes an issue)
- [ ] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):

Co-authored-by: Zhichang Yu <yuzhichang@gmail.com>
This commit is contained in:
Wanderson Pinto dos Santos
2025-06-06 03:32:35 -03:00
committed by GitHub
parent 2e44c3b743
commit 0e03542db5

View File

@ -100,7 +100,7 @@ CURRENT_TASKS = {}
MAX_CONCURRENT_TASKS = int(os.environ.get('MAX_CONCURRENT_TASKS', "5")) MAX_CONCURRENT_TASKS = int(os.environ.get('MAX_CONCURRENT_TASKS', "5"))
MAX_CONCURRENT_CHUNK_BUILDERS = int(os.environ.get('MAX_CONCURRENT_CHUNK_BUILDERS', "1")) MAX_CONCURRENT_CHUNK_BUILDERS = int(os.environ.get('MAX_CONCURRENT_CHUNK_BUILDERS', "1"))
MAX_CONCURRENT_MINIO = int(os.environ.get('MAX_CONCURRENT_MINIO', '10')) MAX_CONCURRENT_MINIO = int(os.environ.get('MAX_CONCURRENT_MINIO', '10'))
task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS) task_limiter = trio.Semaphore(MAX_CONCURRENT_TASKS)
chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS) chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS)
minio_limiter = trio.CapacityLimiter(MAX_CONCURRENT_MINIO) minio_limiter = trio.CapacityLimiter(MAX_CONCURRENT_MINIO)
kg_limiter = trio.CapacityLimiter(2) kg_limiter = trio.CapacityLimiter(2)
@ -736,9 +736,10 @@ def recover_pending_tasks():
stop_event.wait(60) stop_event.wait(60)
async def task_manager(): async def task_manager():
global task_limiter try:
async with task_limiter:
await handle_task() await handle_task()
finally:
task_limiter.release()
async def main(): async def main():
@ -767,8 +768,8 @@ async def main():
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
nursery.start_soon(report_status) nursery.start_soon(report_status)
while not stop_event.is_set(): while not stop_event.is_set():
await task_limiter.acquire()
nursery.start_soon(task_manager) nursery.start_soon(task_manager)
await trio.sleep(0.1)
logging.error("BUG!!! You should not reach here!!!") logging.error("BUG!!! You should not reach here!!!")
if __name__ == "__main__": if __name__ == "__main__":