File size: 7,612 Bytes
99fc92f 8b166bc 99fc92f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
"""
API管理器 - 管理API请求的节流控制和队列调度
提供API调用的桥接功能和请求限流控制
"""
import time
import threading
import importlib
from typing import Any, Callable, Dict, List, Tuple, Union
import config
class ApiManager:
"""
API管理器类 - 负责处理API请求的节流控制和队列管理
为每个平台实现两个队列:
1. 完成队列: 记录发起请求的时间戳
2. 等待队列: 记录等待执行的请求
定时任务每秒执行一次,清理完成队列中的过期请求,
并处理等待队列中的请求(如果有空位)
"""
def __init__(self):
# 从config导入节流控制配置
self.platform_limits = config.PLATFORM_LIMITS
self.time_window = config.TIME_WINDOW
# 初始化平台队列
self.completed_queues = {platform: [] for platform in self.platform_limits}
self.waiting_queues = {platform: [] for platform in self.platform_limits}
# 线程安全锁
self.locks = {platform: threading.Lock() for platform in self.platform_limits}
# 启动调度器
self._scheduler_active = True
self._start_scheduler()
def _start_scheduler(self):
"""启动定时调度器,每秒执行一次队列管理"""
self._process_queues()
if self._scheduler_active:
# 每秒执行一次
threading.Timer(1.0, self._start_scheduler).start()
def _process_queues(self):
"""处理所有平台的队列"""
current_time = time.time()
for platform in self.platform_limits:
with self.locks[platform]:
# 1. 移除已超过时间窗口的完成请求
self._clean_completed_queue(platform, current_time)
# 2. 处理等待队列中的请求(如果有空位)
self._process_waiting_queue(platform)
def _clean_completed_queue(self, platform: str, current_time: float):
"""清理完成队列中超时的请求"""
# 移除已经超过时间窗口的请求
self.completed_queues[platform] = [
timestamp for timestamp in self.completed_queues[platform]
if current_time - timestamp < self.time_window
]
def _process_waiting_queue(self, platform: str):
"""处理等待队列中的请求"""
# 获取当前可用的请求数量
available_slots = self._get_available_slots(platform)
# 处理等待队列中的请求
while available_slots > 0 and self.waiting_queues[platform]:
# 获取等待队列头部的请求
request_data = self.waiting_queues[platform].pop(0)
request_func, args, kwargs, result_event, result_container = request_data
# 在新线程中执行请求
threading.Thread(
target=self._execute_request,
args=(platform, request_func, args, kwargs, result_event, result_container)
).start()
# 减少可用槽位
available_slots -= 1
def _get_available_slots(self, platform: str) -> int:
"""计算平台当前可用的请求槽位数"""
limit = self.platform_limits.get(platform, self.platform_limits['default'])
return max(0, limit - len(self.completed_queues[platform]))
def _execute_request(self, platform: str, request_func: Callable,
args: Tuple, kwargs: Dict,
result_event: threading.Event,
result_container: List):
"""执行请求并存储结果"""
try:
# 立即记录请求时间(添加到完成队列)
with self.locks[platform]:
self.completed_queues[platform].append(time.time())
# 执行请求
result = request_func(*args, **kwargs)
# 存储结果
result_container.append(result)
except Exception as e:
# 存储异常
result_container.append(e)
finally:
# 通知等待线程结果已准备好
result_event.set()
def execute(self, platform: str, method_name: str, *args, **kwargs):
"""
执行API请求,处理节流控制
Args:
platform: API平台名称 (如 'openai', 'anthropic' 等)
method_name: 要调用的方法名称 (如 'validate_api_key')
*args, **kwargs: 传递给方法的参数
Returns:
方法的返回值
"""
# 确保平台支持
if platform not in self.platform_limits and platform != 'default':
# 如果不是已知平台,则使用默认限制
platform = 'default'
# 导入相应平台的模块
try:
module = importlib.import_module(f"core.api.{platform}")
except ImportError:
raise ValueError(f"不支持的平台: {platform}")
# 获取方法
if not hasattr(module, method_name):
raise ValueError(f"平台 {platform} 没有方法 {method_name}")
method = getattr(module, method_name)
# 创建结果容器和事件
result_container = []
result_event = threading.Event()
# 请求函数
request_func = lambda *a, **kw: method(*a, **kw)
with self.locks[platform]:
# 检查是否有可用槽位
if len(self.completed_queues[platform]) < self.platform_limits.get(platform, self.platform_limits['default']):
# 有槽位,立即执行
threading.Thread(
target=self._execute_request,
args=(platform, request_func, args, kwargs, result_event, result_container)
).start()
else:
# 没有槽位,添加到等待队列
self.waiting_queues[platform].append((request_func, args, kwargs, result_event, result_container))
# 等待结果(同步阻塞)
result_event.wait()
# 获取结果
if not result_container:
raise RuntimeError("请求执行失败,没有结果")
result = result_container[0]
if isinstance(result, Exception):
raise result
return result
def shutdown(self):
"""关闭调度器"""
self._scheduler_active = False
# 全局API管理器实例
_api_manager = None
def get_api_manager():
"""
获取全局API管理器实例
Returns:
ApiManager: API管理器实例
"""
global _api_manager
if _api_manager is None:
_api_manager = ApiManager()
return _api_manager
def start_service():
"""
启动API管理服务
Returns:
list: 服务相关的线程列表
"""
# 初始化API管理器
api_manager = get_api_manager()
# 此处可以添加其他需要启动的服务线程
# 例如: 监控线程、日志线程等
# 返回服务线程列表(目前API管理器的调度线程是内部管理的,所以返回空列表)
return []
|