import threading from typing import Any, Callable, Dict class TaskManager: def __init__(self, max_concurrent_tasks: int): self.max_concurrent_tasks = max_concurrent_tasks self.current_tasks = 0 self.lock = threading.Lock() self.queue = self.create_queue() def create_queue(self): raise NotImplementedError() def add_task(self, func: Callable, *args: Any, **kwargs: Any): with self.lock: if self.current_tasks < self.max_concurrent_tasks: print(f"add task: {func.__name__}, current_tasks: {self.current_tasks}") self.execute_task(func, *args, **kwargs) else: print( f"enqueue task: {func.__name__}, current_tasks: {self.current_tasks}" ) self.enqueue({"func": func, "args": args, "kwargs": kwargs}) def execute_task(self, func: Callable, *args: Any, **kwargs: Any): thread = threading.Thread( target=self.run_task, args=(func, *args), kwargs=kwargs ) thread.start() def run_task(self, func: Callable, *args: Any, **kwargs: Any): try: with self.lock: self.current_tasks += 1 func(*args, **kwargs) # call the function here, passing *args and **kwargs. finally: self.task_done() def check_queue(self): with self.lock: if ( self.current_tasks < self.max_concurrent_tasks and not self.is_queue_empty() ): task_info = self.dequeue() func = task_info["func"] args = task_info.get("args", ()) kwargs = task_info.get("kwargs", {}) self.execute_task(func, *args, **kwargs) def task_done(self): with self.lock: self.current_tasks -= 1 self.check_queue() def enqueue(self, task: Dict): raise NotImplementedError() def dequeue(self): raise NotImplementedError() def is_queue_empty(self): raise NotImplementedError()