mpt2026 / app /controllers /manager /base_manager.py
chaowenguo's picture
Upload 108 files
e58b6a6 verified
import threading
from typing import Any, Callable, Dict
class TaskManager:
def __init__(self, max_concurrent_tasks: int):
self.max_concurrent_tasks = max_concurrent_tasks
self.current_tasks = 0
self.lock = threading.Lock()
self.queue = self.create_queue()
def create_queue(self):
raise NotImplementedError()
def add_task(self, func: Callable, *args: Any, **kwargs: Any):
with self.lock:
if self.current_tasks < self.max_concurrent_tasks:
print(f"add task: {func.__name__}, current_tasks: {self.current_tasks}")
self.execute_task(func, *args, **kwargs)
else:
print(
f"enqueue task: {func.__name__}, current_tasks: {self.current_tasks}"
)
self.enqueue({"func": func, "args": args, "kwargs": kwargs})
def execute_task(self, func: Callable, *args: Any, **kwargs: Any):
thread = threading.Thread(
target=self.run_task, args=(func, *args), kwargs=kwargs
)
thread.start()
def run_task(self, func: Callable, *args: Any, **kwargs: Any):
try:
with self.lock:
self.current_tasks += 1
func(*args, **kwargs) # call the function here, passing *args and **kwargs.
finally:
self.task_done()
def check_queue(self):
with self.lock:
if (
self.current_tasks < self.max_concurrent_tasks
and not self.is_queue_empty()
):
task_info = self.dequeue()
func = task_info["func"]
args = task_info.get("args", ())
kwargs = task_info.get("kwargs", {})
self.execute_task(func, *args, **kwargs)
def task_done(self):
with self.lock:
self.current_tasks -= 1
self.check_queue()
def enqueue(self, task: Dict):
raise NotImplementedError()
def dequeue(self):
raise NotImplementedError()
def is_queue_empty(self):
raise NotImplementedError()