File size: 7,254 Bytes
19adbbb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# api_client.py
# API客户端模块,处理与目标API的通信
import httpx
import json
import asyncio
import random
from fastapi import HTTPException
from fastapi.responses import StreamingResponse
from random_user_agent.user_agent import UserAgent
def generate_random_user_agent():
"""生成随机的User-Agent字符串
Returns:
随机生成的User-Agent字符串
"""
# 使用random-user-agent库生成随机User-Agent
user_agent_rotator = UserAgent()
return user_agent_rotator.get_random_user_agent()
async def call_api(payload, is_stream=False):
"""调用目标API并处理响应
Args:
payload: 请求负载
is_stream: 是否为流式请求
Returns:
流式响应或完整内容
"""
try:
async with httpx.AsyncClient() as client:
# 生成随机User-Agent
user_agent = generate_random_user_agent()
# 发送请求到目标API
response = await client.post(
"https://deepseek.rkui.cn/api/chat",
json=payload,
timeout=60.0,
headers={
"Accept": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"User-Agent": user_agent
}
)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
# 检查响应内容是否为空
if not response.content:
raise HTTPException(status_code=502, detail="Empty response from API")
# 处理流式响应
if is_stream:
return handle_stream_response(response)
# 处理非流式响应
return await handle_non_stream_response(response)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def handle_stream_response(response):
"""处理流式响应
Args:
response: API响应对象
Returns:
StreamingResponse对象
"""
async def generate():
buffer = ""
chunk_count = 0 # 用于跟踪接收到的数据块数量
print("\n===== 开始接收API响应数据 =====\n")
# 使用aiter_text处理文本流,确保实时处理
async for chunk in response.aiter_text():
chunk_count += 1
print(f"\n[接收数据块 #{chunk_count}] 原始数据: {chunk!r}\n")
# 立即处理接收到的数据块
buffer += chunk
# 处理缓冲区中的每一行
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
# 处理完整的SSE行
if line.startswith("data: "):
print(f"\n[SSE行] {line}")
if line == "data: [DONE]":
print("\n===== 接收到完成标记 [DONE] =====\n")
# 确保立即发送完成标记
yield "data: [DONE]\n\n"
# 强制刷新缓冲区
await asyncio.sleep(0)
continue
try:
# 直接转发SSE行,不尝试解析JSON
# 确保行以data:开头并以\n\n结尾
formatted_response = f"{line}\n\n"
print(f"[直接转发SSE行] {formatted_response!r}")
# 立即发送数据,不等待整个响应完成
yield formatted_response
# 强制刷新缓冲区,确保数据立即发送
await asyncio.sleep(0)
# 添加额外的刷新,确保数据立即发送到客户端
await asyncio.sleep(0.01)
except Exception as e:
print(f"\n[处理行时出错] 错误类型: {type(e).__name__}, 错误信息: {e}\n[问题数据] {line!r}")
continue
print("\n===== API响应数据接收完毕 =====\n")
if buffer:
print(f"[剩余未处理的缓冲区数据] {buffer!r}")
# 使用headers参数明确设置Content-Type,确保不包含charset=utf-8
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
async def handle_non_stream_response(response):
"""处理非流式响应
Args:
response: API响应对象
Returns:
提取的完整内容
"""
full_content = ""
buffer = ""
print("\n===== 开始接收非流式API响应数据 =====\n")
# 立即处理每个数据块,不等待整个响应完成
async for chunk in response.aiter_text():
print(f"\n[接收非流式数据块] 原始数据: {chunk!r}\n")
# 立即处理接收到的数据块
buffer += chunk
# 处理缓冲区中的每一行
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
# 处理完整的SSE行
if line.startswith("data: "):
print(f"\n[非流式SSE行] {line}")
if line == "data: [DONE]":
print("\n===== 接收到非流式完成标记 [DONE] =====\n")
# 强制刷新缓冲区
await asyncio.sleep(0)
continue
try:
# 提取SSE行中的JSON数据
json_str = line[6:] # 去掉 "data: " 前缀
data = json.loads(json_str)
if "choices" in data and len(data["choices"]) > 0:
delta = data["choices"][0].get("delta", {})
content = delta.get("content", "")
full_content += content
# 强制刷新缓冲区
await asyncio.sleep(0)
except Exception as e:
print(f"\n[处理非流式行时出错] 错误类型: {type(e).__name__}, 错误信息: {e}\n[问题数据] {line!r}")
continue
print("\n===== 非流式API响应数据接收完毕 =====\n")
if buffer:
print(f"[剩余未处理的缓冲区数据] {buffer!r}")
# 如果没有成功提取内容,尝试直接解析响应
if not full_content:
try:
data = response.json()
full_content = data.get("content", "")
except json.JSONDecodeError:
raise HTTPException(
status_code=502,
detail=f"Invalid JSON response from API: {response.text[:200]}"
)
return full_content
|