diff --git a/rag/svr/task_executor.py b/rag/svr/task_executor.py index bb6303367..75a087db9 100644 --- a/rag/svr/task_executor.py +++ b/rag/svr/task_executor.py @@ -100,7 +100,7 @@ CURRENT_TASKS = {} MAX_CONCURRENT_TASKS = int(os.environ.get('MAX_CONCURRENT_TASKS', "5")) MAX_CONCURRENT_CHUNK_BUILDERS = int(os.environ.get('MAX_CONCURRENT_CHUNK_BUILDERS', "1")) MAX_CONCURRENT_MINIO = int(os.environ.get('MAX_CONCURRENT_MINIO', '10')) -task_limiter = trio.CapacityLimiter(MAX_CONCURRENT_TASKS) +task_limiter = trio.Semaphore(MAX_CONCURRENT_TASKS) chunk_limiter = trio.CapacityLimiter(MAX_CONCURRENT_CHUNK_BUILDERS) minio_limiter = trio.CapacityLimiter(MAX_CONCURRENT_MINIO) kg_limiter = trio.CapacityLimiter(2) @@ -736,9 +736,10 @@ def recover_pending_tasks(): stop_event.wait(60) async def task_manager(): - global task_limiter - async with task_limiter: + try: await handle_task() + finally: + task_limiter.release() async def main(): @@ -767,8 +768,8 @@ async def main(): async with trio.open_nursery() as nursery: nursery.start_soon(report_status) while not stop_event.is_set(): + await task_limiter.acquire() nursery.start_soon(task_manager) - await trio.sleep(0.1) logging.error("BUG!!! You should not reach here!!!") if __name__ == "__main__":