File size: 5,374 Bytes
dfee0ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
import os
import requests
import json
from logger import get_logger, get_stream_logger
from dotenv import load_dotenv
from utils.api_utils import APIUtils
# 获取日志器
logger = get_logger()
stream_logger = get_stream_logger()
def _truncate_json_for_logging(json_obj, max_length=500):
"""截断JSON对象用于日志记录,避免日志过大
Args:
json_obj: 要截断的JSON对象
max_length: 最大字符长度,默认500
Returns:
str: 截断后的JSON字符串
"""
if isinstance(json_obj, str):
json_str = json_obj
else:
json_str = json.dumps(json_obj, ensure_ascii=False)
if len(json_str) <= max_length:
return json_str
return json_str[:max_length] + f"... [截断,总长度: {len(json_str)}字符]"
def test_api_stream():
"""
测试API流式响应功能
"""
# 加载环境变量
load_dotenv()
# 获取API配置
api_url = os.getenv('API_URL')
api_key = os.getenv('API_KEY')
api_model = os.getenv('API_MODEL', 'gpt-3.5-turbo')
logger.info(f"开始测试API流式响应,API URL: {api_url}, MODEL: {api_model}")
# 检查API配置
if not api_url:
logger.error("API URL未配置,无法进行测试")
return
if not api_key:
logger.error("API Key未配置,无法进行测试")
return
# 标准化API URL
api_url = APIUtils.format_api_url(api_url)
logger.debug(f"标准化后的API URL: {api_url}")
# 构建简单的测试提示
prompt = "这是一个API流式响应测试。请给出一个简短的股票分析样例。"
# 构建请求头和请求体
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": api_model,
"messages": [{"role": "user", "content": prompt}],
"stream": True # 明确设置stream参数为True
}
logger.debug(f"请求载荷: {_truncate_json_for_logging(payload)}")
try:
logger.info(f"发起流式API请求: {api_url}")
response = requests.post(
api_url,
headers=headers,
json=payload,
timeout=int(os.getenv('API_TIMEOUT', 60)),
stream=True
)
logger.info(f"API流式响应状态码: {response.status_code}")
logger.debug(f"响应头: {response.headers}")
if response.status_code == 200:
logger.info("成功获取API流式响应,开始处理")
buffer = ""
chunk_count = 0
for line in response.iter_lines():
if line:
line_str = line.decode('utf-8')
logger.info(f"原始流式行: {line_str}")
# 跳过保持连接的空行
if line_str.strip() == '':
logger.debug("跳过空行")
continue
# 数据行通常以"data: "开头
if line_str.startswith('data: '):
data_content = line_str[6:].strip() # 移除 "data: " 前缀并去除前后空格
logger.info(f"数据内容: {data_content}")
# 检查是否为流的结束
if data_content == '[DONE]':
logger.info("收到流结束标记 [DONE]")
break
try:
# 解析JSON数据
json_data = json.loads(data_content)
logger.debug(f"JSON结构: {_truncate_json_for_logging(json_data)}")
if 'choices' in json_data:
delta = json_data['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
chunk_count += 1
buffer += content
logger.info(f"内容片段 #{chunk_count}: {content}")
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {e}, 内容: {data_content}")
else:
logger.warning(f"收到非'data:'开头的行: {line_str}")
logger.info(f"流式处理完成,共收到 {chunk_count} 个内容片段")
logger.info(f"完整内容:\n{buffer}")
else:
try:
error_response = response.json()
error_text = json.dumps(error_response, indent=2)
except:
error_text = response.text[:500] if response.text else "无响应内容"
logger.error(f"API请求失败: 状态码 {response.status_code}, 响应: {error_text}")
except Exception as e:
logger.error(f"测试过程中发生异常: {str(e)}")
logger.exception(e)
if __name__ == "__main__":
test_api_stream()
|