pre / core /api_manager.py
yangtb24's picture
Upload 62 files
8b166bc verified
"""
API管理器 - 管理API请求的节流控制和队列调度
提供API调用的桥接功能和请求限流控制
"""
import time
import threading
import importlib
from typing import Any, Callable, Dict, List, Tuple, Union
import config
class ApiManager:
"""
API管理器类 - 负责处理API请求的节流控制和队列管理
为每个平台实现两个队列:
1. 完成队列: 记录发起请求的时间戳
2. 等待队列: 记录等待执行的请求
定时任务每秒执行一次,清理完成队列中的过期请求,
并处理等待队列中的请求(如果有空位)
"""
def __init__(self):
# 从config导入节流控制配置
self.platform_limits = config.PLATFORM_LIMITS
self.time_window = config.TIME_WINDOW
# 初始化平台队列
self.completed_queues = {platform: [] for platform in self.platform_limits}
self.waiting_queues = {platform: [] for platform in self.platform_limits}
# 线程安全锁
self.locks = {platform: threading.Lock() for platform in self.platform_limits}
# 启动调度器
self._scheduler_active = True
self._start_scheduler()
def _start_scheduler(self):
"""启动定时调度器,每秒执行一次队列管理"""
self._process_queues()
if self._scheduler_active:
# 每秒执行一次
threading.Timer(1.0, self._start_scheduler).start()
def _process_queues(self):
"""处理所有平台的队列"""
current_time = time.time()
for platform in self.platform_limits:
with self.locks[platform]:
# 1. 移除已超过时间窗口的完成请求
self._clean_completed_queue(platform, current_time)
# 2. 处理等待队列中的请求(如果有空位)
self._process_waiting_queue(platform)
def _clean_completed_queue(self, platform: str, current_time: float):
"""清理完成队列中超时的请求"""
# 移除已经超过时间窗口的请求
self.completed_queues[platform] = [
timestamp for timestamp in self.completed_queues[platform]
if current_time - timestamp < self.time_window
]
def _process_waiting_queue(self, platform: str):
"""处理等待队列中的请求"""
# 获取当前可用的请求数量
available_slots = self._get_available_slots(platform)
# 处理等待队列中的请求
while available_slots > 0 and self.waiting_queues[platform]:
# 获取等待队列头部的请求
request_data = self.waiting_queues[platform].pop(0)
request_func, args, kwargs, result_event, result_container = request_data
# 在新线程中执行请求
threading.Thread(
target=self._execute_request,
args=(platform, request_func, args, kwargs, result_event, result_container)
).start()
# 减少可用槽位
available_slots -= 1
def _get_available_slots(self, platform: str) -> int:
"""计算平台当前可用的请求槽位数"""
limit = self.platform_limits.get(platform, self.platform_limits['default'])
return max(0, limit - len(self.completed_queues[platform]))
def _execute_request(self, platform: str, request_func: Callable,
args: Tuple, kwargs: Dict,
result_event: threading.Event,
result_container: List):
"""执行请求并存储结果"""
try:
# 立即记录请求时间(添加到完成队列)
with self.locks[platform]:
self.completed_queues[platform].append(time.time())
# 执行请求
result = request_func(*args, **kwargs)
# 存储结果
result_container.append(result)
except Exception as e:
# 存储异常
result_container.append(e)
finally:
# 通知等待线程结果已准备好
result_event.set()
def execute(self, platform: str, method_name: str, *args, **kwargs):
"""
执行API请求,处理节流控制
Args:
platform: API平台名称 (如 'openai', 'anthropic' 等)
method_name: 要调用的方法名称 (如 'validate_api_key')
*args, **kwargs: 传递给方法的参数
Returns:
方法的返回值
"""
# 确保平台支持
if platform not in self.platform_limits and platform != 'default':
# 如果不是已知平台,则使用默认限制
platform = 'default'
# 导入相应平台的模块
try:
module = importlib.import_module(f"core.api.{platform}")
except ImportError:
raise ValueError(f"不支持的平台: {platform}")
# 获取方法
if not hasattr(module, method_name):
raise ValueError(f"平台 {platform} 没有方法 {method_name}")
method = getattr(module, method_name)
# 创建结果容器和事件
result_container = []
result_event = threading.Event()
# 请求函数
request_func = lambda *a, **kw: method(*a, **kw)
with self.locks[platform]:
# 检查是否有可用槽位
if len(self.completed_queues[platform]) < self.platform_limits.get(platform, self.platform_limits['default']):
# 有槽位,立即执行
threading.Thread(
target=self._execute_request,
args=(platform, request_func, args, kwargs, result_event, result_container)
).start()
else:
# 没有槽位,添加到等待队列
self.waiting_queues[platform].append((request_func, args, kwargs, result_event, result_container))
# 等待结果(同步阻塞)
result_event.wait()
# 获取结果
if not result_container:
raise RuntimeError("请求执行失败,没有结果")
result = result_container[0]
if isinstance(result, Exception):
raise result
return result
def shutdown(self):
"""关闭调度器"""
self._scheduler_active = False
# 全局API管理器实例
_api_manager = None
def get_api_manager():
"""
获取全局API管理器实例
Returns:
ApiManager: API管理器实例
"""
global _api_manager
if _api_manager is None:
_api_manager = ApiManager()
return _api_manager
def start_service():
"""
启动API管理服务
Returns:
list: 服务相关的线程列表
"""
# 初始化API管理器
api_manager = get_api_manager()
# 此处可以添加其他需要启动的服务线程
# 例如: 监控线程、日志线程等
# 返回服务线程列表(目前API管理器的调度线程是内部管理的,所以返回空列表)
return []