|
"""
|
|
API管理器 - 管理API请求的节流控制和队列调度
|
|
提供API调用的桥接功能和请求限流控制
|
|
"""
|
|
import time
|
|
import threading
|
|
import importlib
|
|
from typing import Any, Callable, Dict, List, Tuple, Union
|
|
import config
|
|
|
|
class ApiManager:
|
|
"""
|
|
API管理器类 - 负责处理API请求的节流控制和队列管理
|
|
|
|
为每个平台实现两个队列:
|
|
1. 完成队列: 记录发起请求的时间戳
|
|
2. 等待队列: 记录等待执行的请求
|
|
|
|
定时任务每秒执行一次,清理完成队列中的过期请求,
|
|
并处理等待队列中的请求(如果有空位)
|
|
"""
|
|
def __init__(self):
|
|
|
|
self.platform_limits = config.PLATFORM_LIMITS
|
|
self.time_window = config.TIME_WINDOW
|
|
|
|
|
|
self.completed_queues = {platform: [] for platform in self.platform_limits}
|
|
self.waiting_queues = {platform: [] for platform in self.platform_limits}
|
|
|
|
|
|
self.locks = {platform: threading.Lock() for platform in self.platform_limits}
|
|
|
|
|
|
self._scheduler_active = True
|
|
self._start_scheduler()
|
|
|
|
def _start_scheduler(self):
|
|
"""启动定时调度器,每秒执行一次队列管理"""
|
|
self._process_queues()
|
|
if self._scheduler_active:
|
|
|
|
threading.Timer(1.0, self._start_scheduler).start()
|
|
|
|
def _process_queues(self):
|
|
"""处理所有平台的队列"""
|
|
current_time = time.time()
|
|
|
|
for platform in self.platform_limits:
|
|
with self.locks[platform]:
|
|
|
|
self._clean_completed_queue(platform, current_time)
|
|
|
|
|
|
self._process_waiting_queue(platform)
|
|
|
|
def _clean_completed_queue(self, platform: str, current_time: float):
|
|
"""清理完成队列中超时的请求"""
|
|
|
|
self.completed_queues[platform] = [
|
|
timestamp for timestamp in self.completed_queues[platform]
|
|
if current_time - timestamp < self.time_window
|
|
]
|
|
|
|
def _process_waiting_queue(self, platform: str):
|
|
"""处理等待队列中的请求"""
|
|
|
|
available_slots = self._get_available_slots(platform)
|
|
|
|
|
|
while available_slots > 0 and self.waiting_queues[platform]:
|
|
|
|
request_data = self.waiting_queues[platform].pop(0)
|
|
request_func, args, kwargs, result_event, result_container = request_data
|
|
|
|
|
|
threading.Thread(
|
|
target=self._execute_request,
|
|
args=(platform, request_func, args, kwargs, result_event, result_container)
|
|
).start()
|
|
|
|
|
|
available_slots -= 1
|
|
|
|
def _get_available_slots(self, platform: str) -> int:
|
|
"""计算平台当前可用的请求槽位数"""
|
|
limit = self.platform_limits.get(platform, self.platform_limits['default'])
|
|
return max(0, limit - len(self.completed_queues[platform]))
|
|
|
|
def _execute_request(self, platform: str, request_func: Callable,
|
|
args: Tuple, kwargs: Dict,
|
|
result_event: threading.Event,
|
|
result_container: List):
|
|
"""执行请求并存储结果"""
|
|
try:
|
|
|
|
with self.locks[platform]:
|
|
self.completed_queues[platform].append(time.time())
|
|
|
|
|
|
result = request_func(*args, **kwargs)
|
|
|
|
|
|
result_container.append(result)
|
|
except Exception as e:
|
|
|
|
result_container.append(e)
|
|
finally:
|
|
|
|
result_event.set()
|
|
|
|
def execute(self, platform: str, method_name: str, *args, **kwargs):
|
|
"""
|
|
执行API请求,处理节流控制
|
|
|
|
Args:
|
|
platform: API平台名称 (如 'openai', 'anthropic' 等)
|
|
method_name: 要调用的方法名称 (如 'validate_api_key')
|
|
*args, **kwargs: 传递给方法的参数
|
|
|
|
Returns:
|
|
方法的返回值
|
|
"""
|
|
|
|
if platform not in self.platform_limits and platform != 'default':
|
|
|
|
platform = 'default'
|
|
|
|
|
|
try:
|
|
module = importlib.import_module(f"core.api.{platform}")
|
|
except ImportError:
|
|
raise ValueError(f"不支持的平台: {platform}")
|
|
|
|
|
|
if not hasattr(module, method_name):
|
|
raise ValueError(f"平台 {platform} 没有方法 {method_name}")
|
|
|
|
method = getattr(module, method_name)
|
|
|
|
|
|
result_container = []
|
|
result_event = threading.Event()
|
|
|
|
|
|
request_func = lambda *a, **kw: method(*a, **kw)
|
|
|
|
with self.locks[platform]:
|
|
|
|
if len(self.completed_queues[platform]) < self.platform_limits.get(platform, self.platform_limits['default']):
|
|
|
|
threading.Thread(
|
|
target=self._execute_request,
|
|
args=(platform, request_func, args, kwargs, result_event, result_container)
|
|
).start()
|
|
else:
|
|
|
|
self.waiting_queues[platform].append((request_func, args, kwargs, result_event, result_container))
|
|
|
|
|
|
result_event.wait()
|
|
|
|
|
|
if not result_container:
|
|
raise RuntimeError("请求执行失败,没有结果")
|
|
|
|
result = result_container[0]
|
|
if isinstance(result, Exception):
|
|
raise result
|
|
|
|
return result
|
|
|
|
def shutdown(self):
|
|
"""关闭调度器"""
|
|
self._scheduler_active = False
|
|
|
|
|
|
_api_manager = None
|
|
|
|
def get_api_manager():
|
|
"""
|
|
获取全局API管理器实例
|
|
|
|
Returns:
|
|
ApiManager: API管理器实例
|
|
"""
|
|
global _api_manager
|
|
if _api_manager is None:
|
|
_api_manager = ApiManager()
|
|
return _api_manager
|
|
|
|
def start_service():
|
|
"""
|
|
启动API管理服务
|
|
|
|
Returns:
|
|
list: 服务相关的线程列表
|
|
"""
|
|
|
|
api_manager = get_api_manager()
|
|
|
|
|
|
|
|
|
|
|
|
return []
|
|
|