File size: 5,374 Bytes
dfee0ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import requests
import json
from logger import get_logger, get_stream_logger
from dotenv import load_dotenv
from utils.api_utils import APIUtils

# 获取日志器
logger = get_logger()
stream_logger = get_stream_logger()

def _truncate_json_for_logging(json_obj, max_length=500):
    """截断JSON对象用于日志记录,避免日志过大
    
    Args:
        json_obj: 要截断的JSON对象
        max_length: 最大字符长度,默认500
        
    Returns:
        str: 截断后的JSON字符串
    """
    if isinstance(json_obj, str):
        json_str = json_obj
    else:
        json_str = json.dumps(json_obj, ensure_ascii=False)
    
    if len(json_str) <= max_length:
        return json_str
    return json_str[:max_length] + f"... [截断,总长度: {len(json_str)}字符]"

def test_api_stream():
    """
    测试API流式响应功能
    """
    # 加载环境变量
    load_dotenv()
    
    # 获取API配置
    api_url = os.getenv('API_URL')
    api_key = os.getenv('API_KEY')
    api_model = os.getenv('API_MODEL', 'gpt-3.5-turbo')
    
    logger.info(f"开始测试API流式响应,API URL: {api_url}, MODEL: {api_model}")
    
    # 检查API配置
    if not api_url:
        logger.error("API URL未配置,无法进行测试")
        return
        
    if not api_key:
        logger.error("API Key未配置,无法进行测试")
        return
    
    # 标准化API URL
    api_url = APIUtils.format_api_url(api_url)
    logger.debug(f"标准化后的API URL: {api_url}")
    
    # 构建简单的测试提示
    prompt = "这是一个API流式响应测试。请给出一个简短的股票分析样例。"
    
    # 构建请求头和请求体
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "model": api_model,
        "messages": [{"role": "user", "content": prompt}],
        "stream": True  # 明确设置stream参数为True
    }
    
    logger.debug(f"请求载荷: {_truncate_json_for_logging(payload)}")
    
    try:
        logger.info(f"发起流式API请求: {api_url}")
        
        response = requests.post(
            api_url,
            headers=headers,
            json=payload,
            timeout=int(os.getenv('API_TIMEOUT', 60)),
            stream=True
        )
        
        logger.info(f"API流式响应状态码: {response.status_code}")
        logger.debug(f"响应头: {response.headers}")
        
        if response.status_code == 200:
            logger.info("成功获取API流式响应,开始处理")
            
            buffer = ""
            chunk_count = 0
            
            for line in response.iter_lines():
                if line:
                    line_str = line.decode('utf-8')
                    logger.info(f"原始流式行: {line_str}")
                    
                    # 跳过保持连接的空行
                    if line_str.strip() == '':
                        logger.debug("跳过空行")
                        continue
                    
                    # 数据行通常以"data: "开头
                    if line_str.startswith('data: '):
                        data_content = line_str[6:].strip()  # 移除 "data: " 前缀并去除前后空格
                        logger.info(f"数据内容: {data_content}")
                        
                        # 检查是否为流的结束
                        if data_content == '[DONE]':
                            logger.info("收到流结束标记 [DONE]")
                            break
                            
                        try:
                            # 解析JSON数据
                            json_data = json.loads(data_content)
                            logger.debug(f"JSON结构: {_truncate_json_for_logging(json_data)}")
                            
                            if 'choices' in json_data:
                                delta = json_data['choices'][0].get('delta', {})
                                content = delta.get('content', '')
                                
                                if content:
                                    chunk_count += 1
                                    buffer += content
                                    logger.info(f"内容片段 #{chunk_count}: {content}")
                        except json.JSONDecodeError as e:
                            logger.error(f"JSON解析错误: {e}, 内容: {data_content}")
                    else:
                        logger.warning(f"收到非'data:'开头的行: {line_str}")
            
            logger.info(f"流式处理完成,共收到 {chunk_count} 个内容片段")
            logger.info(f"完整内容:\n{buffer}")
            
        else:
            try:
                error_response = response.json()
                error_text = json.dumps(error_response, indent=2)
            except:
                error_text = response.text[:500] if response.text else "无响应内容"
                
            logger.error(f"API请求失败: 状态码 {response.status_code}, 响应: {error_text}")
            
    except Exception as e:
        logger.error(f"测试过程中发生异常: {str(e)}")
        logger.exception(e)

if __name__ == "__main__":
    test_api_stream()