""" API管理器 - 管理API请求的节流控制和队列调度 提供API调用的桥接功能和请求限流控制 """ import time import threading import importlib from typing import Any, Callable, Dict, List, Tuple, Union import config class ApiManager: """ API管理器类 - 负责处理API请求的节流控制和队列管理 为每个平台实现两个队列: 1. 完成队列: 记录发起请求的时间戳 2. 等待队列: 记录等待执行的请求 定时任务每秒执行一次,清理完成队列中的过期请求, 并处理等待队列中的请求(如果有空位) """ def __init__(self): # 从config导入节流控制配置 self.platform_limits = config.PLATFORM_LIMITS self.time_window = config.TIME_WINDOW # 初始化平台队列 self.completed_queues = {platform: [] for platform in self.platform_limits} self.waiting_queues = {platform: [] for platform in self.platform_limits} # 线程安全锁 self.locks = {platform: threading.Lock() for platform in self.platform_limits} # 启动调度器 self._scheduler_active = True self._start_scheduler() def _start_scheduler(self): """启动定时调度器,每秒执行一次队列管理""" self._process_queues() if self._scheduler_active: # 每秒执行一次 threading.Timer(1.0, self._start_scheduler).start() def _process_queues(self): """处理所有平台的队列""" current_time = time.time() for platform in self.platform_limits: with self.locks[platform]: # 1. 移除已超过时间窗口的完成请求 self._clean_completed_queue(platform, current_time) # 2. 处理等待队列中的请求(如果有空位) self._process_waiting_queue(platform) def _clean_completed_queue(self, platform: str, current_time: float): """清理完成队列中超时的请求""" # 移除已经超过时间窗口的请求 self.completed_queues[platform] = [ timestamp for timestamp in self.completed_queues[platform] if current_time - timestamp < self.time_window ] def _process_waiting_queue(self, platform: str): """处理等待队列中的请求""" # 获取当前可用的请求数量 available_slots = self._get_available_slots(platform) # 处理等待队列中的请求 while available_slots > 0 and self.waiting_queues[platform]: # 获取等待队列头部的请求 request_data = self.waiting_queues[platform].pop(0) request_func, args, kwargs, result_event, result_container = request_data # 在新线程中执行请求 threading.Thread( target=self._execute_request, args=(platform, request_func, args, kwargs, result_event, result_container) ).start() # 减少可用槽位 available_slots -= 1 def _get_available_slots(self, platform: str) -> int: """计算平台当前可用的请求槽位数""" limit = self.platform_limits.get(platform, self.platform_limits['default']) return max(0, limit - len(self.completed_queues[platform])) def _execute_request(self, platform: str, request_func: Callable, args: Tuple, kwargs: Dict, result_event: threading.Event, result_container: List): """执行请求并存储结果""" try: # 立即记录请求时间(添加到完成队列) with self.locks[platform]: self.completed_queues[platform].append(time.time()) # 执行请求 result = request_func(*args, **kwargs) # 存储结果 result_container.append(result) except Exception as e: # 存储异常 result_container.append(e) finally: # 通知等待线程结果已准备好 result_event.set() def execute(self, platform: str, method_name: str, *args, **kwargs): """ 执行API请求,处理节流控制 Args: platform: API平台名称 (如 'openai', 'anthropic' 等) method_name: 要调用的方法名称 (如 'validate_api_key') *args, **kwargs: 传递给方法的参数 Returns: 方法的返回值 """ # 确保平台支持 if platform not in self.platform_limits and platform != 'default': # 如果不是已知平台,则使用默认限制 platform = 'default' # 导入相应平台的模块 try: module = importlib.import_module(f"core.api.{platform}") except ImportError: raise ValueError(f"不支持的平台: {platform}") # 获取方法 if not hasattr(module, method_name): raise ValueError(f"平台 {platform} 没有方法 {method_name}") method = getattr(module, method_name) # 创建结果容器和事件 result_container = [] result_event = threading.Event() # 请求函数 request_func = lambda *a, **kw: method(*a, **kw) with self.locks[platform]: # 检查是否有可用槽位 if len(self.completed_queues[platform]) < self.platform_limits.get(platform, self.platform_limits['default']): # 有槽位,立即执行 threading.Thread( target=self._execute_request, args=(platform, request_func, args, kwargs, result_event, result_container) ).start() else: # 没有槽位,添加到等待队列 self.waiting_queues[platform].append((request_func, args, kwargs, result_event, result_container)) # 等待结果(同步阻塞) result_event.wait() # 获取结果 if not result_container: raise RuntimeError("请求执行失败,没有结果") result = result_container[0] if isinstance(result, Exception): raise result return result def shutdown(self): """关闭调度器""" self._scheduler_active = False # 全局API管理器实例 _api_manager = None def get_api_manager(): """ 获取全局API管理器实例 Returns: ApiManager: API管理器实例 """ global _api_manager if _api_manager is None: _api_manager = ApiManager() return _api_manager def start_service(): """ 启动API管理服务 Returns: list: 服务相关的线程列表 """ # 初始化API管理器 api_manager = get_api_manager() # 此处可以添加其他需要启动的服务线程 # 例如: 监控线程、日志线程等 # 返回服务线程列表(目前API管理器的调度线程是内部管理的,所以返回空列表) return []