File size: 7,254 Bytes
19adbbb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# api_client.py
# API客户端模块,处理与目标API的通信

import httpx
import json
import asyncio
import random
from fastapi import HTTPException
from fastapi.responses import StreamingResponse
from random_user_agent.user_agent import UserAgent


def generate_random_user_agent():
    """生成随机的User-Agent字符串
    
    Returns:
        随机生成的User-Agent字符串
    """
    # 使用random-user-agent库生成随机User-Agent
    user_agent_rotator = UserAgent()
    return user_agent_rotator.get_random_user_agent()


async def call_api(payload, is_stream=False):
    """调用目标API并处理响应
    
    Args:
        payload: 请求负载
        is_stream: 是否为流式请求
        
    Returns:
        流式响应或完整内容
    """
    try:
        async with httpx.AsyncClient() as client:
            # 生成随机User-Agent
            user_agent = generate_random_user_agent()
            
            # 发送请求到目标API
            response = await client.post(
                "https://deepseek.rkui.cn/api/chat",
                json=payload,
                timeout=60.0,
                headers={
                    "Accept": "text/event-stream",
                    "Cache-Control": "no-cache",
                    "Connection": "keep-alive",
                    "User-Agent": user_agent
                }
            )
            
            if response.status_code != 200:
                raise HTTPException(status_code=response.status_code, detail=response.text)
            
            # 检查响应内容是否为空
            if not response.content:
                raise HTTPException(status_code=502, detail="Empty response from API")
                
            # 处理流式响应
            if is_stream:
                return handle_stream_response(response)
            
            # 处理非流式响应
            return await handle_non_stream_response(response)
            
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


def handle_stream_response(response):
    """处理流式响应
    
    Args:
        response: API响应对象
        
    Returns:
        StreamingResponse对象
    """
    async def generate():
        buffer = ""
        chunk_count = 0  # 用于跟踪接收到的数据块数量
        
        print("\n===== 开始接收API响应数据 =====\n")
        
        # 使用aiter_text处理文本流,确保实时处理
        async for chunk in response.aiter_text():
            chunk_count += 1
            print(f"\n[接收数据块 #{chunk_count}] 原始数据: {chunk!r}\n")
            
            # 立即处理接收到的数据块
            buffer += chunk
            
            # 处理缓冲区中的每一行
            while "\n" in buffer:
                line, buffer = buffer.split("\n", 1)
                
                # 处理完整的SSE行
                if line.startswith("data: "):
                    print(f"\n[SSE行] {line}")
                    
                    if line == "data: [DONE]":
                        print("\n===== 接收到完成标记 [DONE] =====\n")
                        # 确保立即发送完成标记
                        yield "data: [DONE]\n\n"
                        # 强制刷新缓冲区
                        await asyncio.sleep(0)
                        continue
                    
                    try:
                        # 直接转发SSE行,不尝试解析JSON
                        # 确保行以data:开头并以\n\n结尾
                        formatted_response = f"{line}\n\n"
                        print(f"[直接转发SSE行] {formatted_response!r}")
                        # 立即发送数据,不等待整个响应完成
                        yield formatted_response
                        # 强制刷新缓冲区,确保数据立即发送
                        await asyncio.sleep(0)
                        # 添加额外的刷新,确保数据立即发送到客户端
                        await asyncio.sleep(0.01)
                    except Exception as e:
                        print(f"\n[处理行时出错] 错误类型: {type(e).__name__}, 错误信息: {e}\n[问题数据] {line!r}")
                        continue
        
        print("\n===== API响应数据接收完毕 =====\n")
        if buffer:
            print(f"[剩余未处理的缓冲区数据] {buffer!r}")
    
    # 使用headers参数明确设置Content-Type,确保不包含charset=utf-8
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )


async def handle_non_stream_response(response):
    """处理非流式响应
    
    Args:
        response: API响应对象
        
    Returns:
        提取的完整内容
    """
    full_content = ""
    buffer = ""
    print("\n===== 开始接收非流式API响应数据 =====\n")
    
    # 立即处理每个数据块,不等待整个响应完成
    async for chunk in response.aiter_text():
        print(f"\n[接收非流式数据块] 原始数据: {chunk!r}\n")
        
        # 立即处理接收到的数据块
        buffer += chunk
        
        # 处理缓冲区中的每一行
        while "\n" in buffer:
            line, buffer = buffer.split("\n", 1)
            
            # 处理完整的SSE行
            if line.startswith("data: "):
                print(f"\n[非流式SSE行] {line}")
                
                if line == "data: [DONE]":
                    print("\n===== 接收到非流式完成标记 [DONE] =====\n")
                    # 强制刷新缓冲区
                    await asyncio.sleep(0)
                    continue
                
                try:
                    # 提取SSE行中的JSON数据
                    json_str = line[6:]  # 去掉 "data: " 前缀
                    data = json.loads(json_str)
                    if "choices" in data and len(data["choices"]) > 0:
                        delta = data["choices"][0].get("delta", {})
                        content = delta.get("content", "")
                        full_content += content
                        # 强制刷新缓冲区
                        await asyncio.sleep(0)
                except Exception as e:
                    print(f"\n[处理非流式行时出错] 错误类型: {type(e).__name__}, 错误信息: {e}\n[问题数据] {line!r}")
                    continue
    
    print("\n===== 非流式API响应数据接收完毕 =====\n")
    if buffer:
        print(f"[剩余未处理的缓冲区数据] {buffer!r}")        
    # 如果没有成功提取内容,尝试直接解析响应
    if not full_content:
        try:
            data = response.json()
            full_content = data.get("content", "")
        except json.JSONDecodeError:
            raise HTTPException(
                status_code=502,
                detail=f"Invalid JSON response from API: {response.text[:200]}"
            )
            
    return full_content