Spaces:
Sleeping
Sleeping
import json | |
import time | |
import asyncio | |
import uvicorn | |
from fastapi import FastAPI, Request, HTTPException, Header, Depends | |
from fastapi.responses import StreamingResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel, Field | |
from typing import List, Optional, Dict, Any, Union | |
import requests | |
from datetime import datetime | |
import logging | |
import os | |
from dotenv import load_dotenv | |
# 加载环境变量 | |
load_dotenv() | |
# 配置日志 | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger("openai-proxy") | |
# 创建FastAPI应用 | |
app = FastAPI( | |
title="OpenAI API Proxy", | |
description="将OpenAI API请求代理到DeepSider API", | |
version="1.0.0" | |
) | |
# 添加CORS中间件 | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# 配置 | |
DEEPSIDER_API_BASE = "https://api.chargpt.ai/api/v2" | |
DEEPSIDER_TOKEN = os.getenv("DEEPSIDER_TOKEN", "").split(',') | |
TOKEN_INDEX = 0 | |
# 模型映射表 | |
MODEL_MAPPING = { | |
"gpt-3.5-turbo": "anthropic/claude-3.5-sonnet", | |
"gpt-4": "anthropic/claude-3.7-sonnet", | |
"gpt-4o": "openai/gpt-4o", | |
"gpt-4-turbo": "openai/gpt-4o", | |
"gpt-4o-mini": "openai/gpt-4o-mini", | |
"claude-3-sonnet-20240229": "anthropic/claude-3.5-sonnet", | |
"claude-3-opus-20240229": "anthropic/claude-3.7-sonnet", | |
"claude-3.5-sonnet": "anthropic/claude-3.5-sonnet", | |
"claude-3.7-sonnet": "anthropic/claude-3.7-sonnet", | |
} | |
# Token负载均衡状态 | |
token_status = {} | |
# 请求头 | |
def get_headers(): | |
global TOKEN_INDEX | |
# 负载均衡,轮询选择token | |
if len(DEEPSIDER_TOKEN) > 0: | |
current_token = DEEPSIDER_TOKEN[TOKEN_INDEX % len(DEEPSIDER_TOKEN)] | |
TOKEN_INDEX = (TOKEN_INDEX + 1) % len(DEEPSIDER_TOKEN) | |
# 检查token状态 | |
if current_token in token_status and not token_status[current_token]["active"]: | |
# 如果token不可用,尝试下一个 | |
for i in range(len(DEEPSIDER_TOKEN)): | |
next_token = DEEPSIDER_TOKEN[(TOKEN_INDEX + i) % len(DEEPSIDER_TOKEN)] | |
if next_token not in token_status or token_status[next_token]["active"]: | |
current_token = next_token | |
TOKEN_INDEX = (TOKEN_INDEX + i + 1) % len(DEEPSIDER_TOKEN) | |
break | |
else: | |
current_token = "" | |
return { | |
"accept": "*/*", | |
"accept-encoding": "gzip, deflate, br, zstd", | |
"accept-language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7", | |
"content-type": "application/json", | |
"origin": "chrome-extension://client", | |
"i-lang": "zh-CN", | |
"i-version": "1.1.64", | |
"sec-ch-ua": '"Chromium";v="134", "Not:A-Brand";v="24"', | |
"sec-ch-ua-mobile": "?0", | |
"sec-ch-ua-platform": "Windows", | |
"sec-fetch-dest": "empty", | |
"sec-fetch-mode": "cors", | |
"sec-fetch-site": "cross-site", | |
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36", | |
"authorization": f"Bearer {current_token}" | |
} | |
# OpenAI API请求模型 | |
class ChatMessage(BaseModel): | |
role: str | |
content: str | |
name: Optional[str] = None | |
class ChatCompletionRequest(BaseModel): | |
model: str | |
messages: List[ChatMessage] | |
temperature: Optional[float] = 1.0 | |
top_p: Optional[float] = 1.0 | |
n: Optional[int] = 1 | |
stream: Optional[bool] = False | |
stop: Optional[Union[List[str], str]] = None | |
max_tokens: Optional[int] = None | |
presence_penalty: Optional[float] = 0 | |
frequency_penalty: Optional[float] = 0 | |
user: Optional[str] = None | |
# 初始化token状态 | |
async def initialize_token_status(): | |
"""初始化检查所有token的状态和余额""" | |
global token_status | |
for token in DEEPSIDER_TOKEN: | |
headers = { | |
"accept": "*/*", | |
"content-type": "application/json", | |
"authorization": f"Bearer {token}" | |
} | |
try: | |
# 获取账户余额信息 | |
response = requests.get( | |
f"{DEEPSIDER_API_BASE.replace('/v2', '')}/quota/retrieve", | |
headers=headers | |
) | |
active = False | |
quota_info = {} | |
if response.status_code == 200: | |
data = response.json() | |
if data.get('code') == 0: | |
quota_list = data.get('data', {}).get('list', []) | |
# 解析余额信息 | |
for item in quota_list: | |
item_type = item.get('type', '') | |
available = item.get('available', 0) | |
if available > 0: | |
active = True | |
quota_info[item_type] = { | |
"total": item.get('total', 0), | |
"available": available, | |
"title": item.get('title', '') | |
} | |
token_status[token] = { | |
"active": active, | |
"quota": quota_info, | |
"last_checked": datetime.now(), | |
"failed_count": 0 | |
} | |
logger.info(f"Token {token[:8]}... 状态:{'活跃' if active else '无效'}") | |
except Exception as e: | |
logger.warning(f"检查Token {token[:8]}... 出错:{str(e)}") | |
token_status[token] = { | |
"active": False, | |
"quota": {}, | |
"last_checked": datetime.now(), | |
"failed_count": 0 | |
} | |
# 工具函数 | |
def verify_api_key(api_key: str = Header(..., alias="Authorization")): | |
"""验证API密钥""" | |
if not api_key.startswith("Bearer "): | |
raise HTTPException(status_code=401, detail="Invalid API key format") | |
return api_key.replace("Bearer ", "") | |
def map_openai_to_deepsider_model(model: str) -> str: | |
"""将OpenAI模型名称映射到DeepSider模型名称""" | |
return MODEL_MAPPING.get(model, "anthropic/claude-3.7-sonnet") | |
def format_messages_for_deepsider(messages: List[ChatMessage]) -> str: | |
"""格式化消息列表为DeepSider API所需的提示格式""" | |
prompt = "" | |
for msg in messages: | |
role = msg.role | |
# 将OpenAI的角色映射到DeepSider能理解的格式 | |
if role == "system": | |
# 系统消息放在开头 作为指导 | |
prompt = f"{msg.content}\n\n" + prompt | |
elif role == "user": | |
prompt += f"Human: {msg.content}\n\n" | |
elif role == "assistant": | |
prompt += f"Assistant: {msg.content}\n\n" | |
else: | |
# 其他角色按用户处理 | |
prompt += f"Human ({role}): {msg.content}\n\n" | |
# 如果最后一个消息不是用户的 添加一个Human前缀引导模型回答 | |
if messages and messages[-1].role != "user": | |
prompt += "Human: " | |
return prompt.strip() | |
def update_token_status(token: str, success: bool, error_message: str = None): | |
"""更新token的状态""" | |
global token_status | |
if token not in token_status: | |
token_status[token] = { | |
"active": True, | |
"quota": {}, | |
"last_checked": datetime.now(), | |
"failed_count": 0 | |
} | |
if not success: | |
token_status[token]["failed_count"] += 1 | |
# 如果失败消息包含余额不足,标记为不活跃 | |
if error_message and ("配额不足" in error_message or "quota" in error_message.lower()): | |
token_status[token]["active"] = False | |
logger.warning(f"Token {token[:8]}... 余额不足,已标记为不活跃") | |
# 连续失败5次,也标记为不活跃 | |
if token_status[token]["failed_count"] >= 5: | |
token_status[token]["active"] = False | |
logger.warning(f"Token {token[:8]}... 连续失败{token_status[token]['failed_count']}次,已标记为不活跃") | |
else: | |
# 成功则重置失败计数 | |
token_status[token]["failed_count"] = 0 | |
async def generate_openai_response(full_response: str, request_id: str, model: str) -> Dict: | |
"""生成符合OpenAI API响应格式的完整响应""" | |
timestamp = int(time.time()) | |
return { | |
"id": f"chatcmpl-{request_id}", | |
"object": "chat.completion", | |
"created": timestamp, | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"message": { | |
"role": "assistant", | |
"content": full_response | |
}, | |
"finish_reason": "stop" | |
} | |
], | |
"usage": { | |
"prompt_tokens": 0, # 无法准确计算 | |
"completion_tokens": 0, # 无法准确计算 | |
"total_tokens": 0 # 无法准确计算 | |
} | |
} | |
async def stream_openai_response(response, request_id: str, model: str, token: str): | |
"""流式返回OpenAI API格式的响应""" | |
timestamp = int(time.time()) | |
full_response = "" | |
try: | |
# 将DeepSider响应流转换为OpenAI流格式 | |
for line in response.iter_lines(): | |
if not line: | |
continue | |
if line.startswith(b'data: '): | |
try: | |
data = json.loads(line[6:].decode('utf-8')) | |
if data.get('code') == 202 and data.get('data', {}).get('type') == "chat": | |
# 获取正文内容 | |
content = data.get('data', {}).get('content', '') | |
if content: | |
full_response += content | |
# 生成OpenAI格式的流式响应 | |
chunk = { | |
"id": f"chatcmpl-{request_id}", | |
"object": "chat.completion.chunk", | |
"created": timestamp, | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": { | |
"content": content | |
}, | |
"finish_reason": None | |
} | |
] | |
} | |
yield f"data: {json.dumps(chunk)}\n\n" | |
elif data.get('code') == 203: | |
# 生成完成信号 | |
chunk = { | |
"id": f"chatcmpl-{request_id}", | |
"object": "chat.completion.chunk", | |
"created": timestamp, | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": {}, | |
"finish_reason": "stop" | |
} | |
] | |
} | |
yield f"data: {json.dumps(chunk)}\n\n" | |
yield "data: [DONE]\n\n" | |
except json.JSONDecodeError: | |
logger.warning(f"无法解析响应: {line}") | |
# 更新token状态(成功) | |
update_token_status(token, True) | |
except Exception as e: | |
logger.error(f"流式响应处理出错: {str(e)}") | |
# 更新token状态(失败) | |
update_token_status(token, False, str(e)) | |
# 返回错误信息 | |
error_chunk = { | |
"id": f"chatcmpl-{request_id}", | |
"object": "chat.completion.chunk", | |
"created": timestamp, | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": { | |
"content": f"\n\n[处理响应时出错: {str(e)}]" | |
}, | |
"finish_reason": "stop" | |
} | |
] | |
} | |
yield f"data: {json.dumps(error_chunk)}\n\n" | |
yield "data: [DONE]\n\n" | |
# 路由定义 | |
async def root(): | |
return {"message": "OpenAI API Proxy服务已启动 连接至DeepSider API"} | |
async def list_models(api_key: str = Depends(verify_api_key)): | |
"""列出可用的模型""" | |
models = [] | |
for openai_model, _ in MODEL_MAPPING.items(): | |
models.append({ | |
"id": openai_model, | |
"object": "model", | |
"created": int(time.time()), | |
"owned_by": "openai-proxy" | |
}) | |
return { | |
"object": "list", | |
"data": models | |
} | |
async def create_chat_completion( | |
request: Request, | |
api_key: str = Depends(verify_api_key) | |
): | |
"""创建聊天完成API - 支持普通请求和流式请求""" | |
# 解析请求体 | |
body = await request.json() | |
chat_request = ChatCompletionRequest(**body) | |
# 生成唯一请求ID | |
request_id = datetime.now().strftime("%Y%m%d%H%M%S") + str(time.time_ns())[-6:] | |
# 映射模型 | |
deepsider_model = map_openai_to_deepsider_model(chat_request.model) | |
# 准备DeepSider API所需的提示 | |
prompt = format_messages_for_deepsider(chat_request.messages) | |
# 准备请求体 | |
payload = { | |
"model": deepsider_model, | |
"prompt": prompt, | |
"webAccess": "close", # 默认关闭网络访问 | |
"timezone": "Asia/Shanghai" | |
} | |
# 获取当前token | |
headers = get_headers() | |
current_token = headers["authorization"].replace("Bearer ", "") | |
try: | |
# 发送请求到DeepSider API | |
response = requests.post( | |
f"{DEEPSIDER_API_BASE}/chat/conversation", | |
headers=headers, | |
json=payload, | |
stream=True | |
) | |
# 检查响应状态 | |
if response.status_code != 200: | |
error_msg = f"DeepSider API请求失败: {response.status_code}" | |
try: | |
error_data = response.json() | |
error_msg += f" - {error_data.get('message', '')}" | |
except: | |
error_msg += f" - {response.text}" | |
logger.error(error_msg) | |
# 更新token状态 | |
update_token_status(current_token, False, error_msg) | |
raise HTTPException(status_code=response.status_code, detail="API请求失败") | |
# 处理流式或非流式响应 | |
if chat_request.stream: | |
# 返回流式响应 | |
return StreamingResponse( | |
stream_openai_response(response, request_id, chat_request.model, current_token), | |
media_type="text/event-stream" | |
) | |
else: | |
# 收集完整响应 | |
full_response = "" | |
for line in response.iter_lines(): | |
if not line: | |
continue | |
if line.startswith(b'data: '): | |
try: | |
data = json.loads(line[6:].decode('utf-8')) | |
if data.get('code') == 202 and data.get('data', {}).get('type') == "chat": | |
content = data.get('data', {}).get('content', '') | |
if content: | |
full_response += content | |
except json.JSONDecodeError: | |
pass | |
# 更新token状态(成功) | |
update_token_status(current_token, True) | |
# 返回OpenAI格式的完整响应 | |
return await generate_openai_response(full_response, request_id, chat_request.model) | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.exception("处理请求时出错") | |
# 更新token状态(失败) | |
update_token_status(current_token, False, str(e)) | |
raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}") | |
# 查看token状态的端点 | |
async def get_token_status(admin_key: str = Header(None, alias="X-Admin-Key")): | |
"""查看所有token的状态""" | |
# 简单的管理密钥检查 | |
expected_admin_key = os.getenv("ADMIN_KEY", "admin") | |
if not admin_key or admin_key != expected_admin_key: | |
raise HTTPException(status_code=403, detail="Unauthorized") | |
# 脱敏token,只显示前8位 | |
safe_status = {} | |
for token, status in token_status.items(): | |
token_display = token[:8] + "..." if len(token) > 8 else token | |
safe_status[token_display] = status | |
return {"tokens": safe_status, "active_tokens": sum(1 for s in token_status.values() if s["active"])} | |
# 手动刷新token状态 | |
async def refresh_token_status(admin_key: str = Header(None, alias="X-Admin-Key")): | |
"""手动刷新所有token的状态""" | |
# 简单的管理密钥检查 | |
expected_admin_key = os.getenv("ADMIN_KEY", "admin") | |
if not admin_key or admin_key != expected_admin_key: | |
raise HTTPException(status_code=403, detail="Unauthorized") | |
await initialize_token_status() | |
return {"message": "所有token状态已刷新", "active_tokens": sum(1 for s in token_status.values() if s["active"])} | |
# 模拟模型的路由 | |
async def engines_handler(): | |
"""兼容旧的引擎API""" | |
raise HTTPException(status_code=404, detail="引擎API已被弃用 请使用模型API") | |
# 错误处理器 | |
async def not_found_handler(request, exc): | |
return { | |
"error": { | |
"message": f"未找到资源: {request.url.path}", | |
"type": "not_found_error", | |
"code": "not_found" | |
} | |
}, 404 | |
# 启动事件 | |
async def startup_event(): | |
"""服务启动时初始化token状态""" | |
if not DEEPSIDER_TOKEN or (len(DEEPSIDER_TOKEN) == 1 and DEEPSIDER_TOKEN[0] == ""): | |
logger.warning("未设置DEEPSIDER_TOKEN环境变量 请设置后再重启服务") | |
else: | |
logger.info(f"初始化 {len(DEEPSIDER_TOKEN)} 个token状态...") | |
await initialize_token_status() | |
active_tokens = sum(1 for s in token_status.values() if s["active"]) | |
logger.info(f"初始化完成 活跃token: {active_tokens}/{len(DEEPSIDER_TOKEN)}") | |
# 主程序 | |
if __name__ == "__main__": | |
# 启动服务器 | |
port = int(os.getenv("PORT", "3000")) | |
logger.info(f"启动OpenAI API代理服务 端口: {port}") | |
uvicorn.run(app, host="0.0.0.0", port=port) | |