Spaces:
1v1
/
Running

gp2 / tests /test_stream.py
1v1's picture
Upload 11 files
dfee0ed verified
import os
import requests
import json
from logger import get_logger, get_stream_logger
from dotenv import load_dotenv
from utils.api_utils import APIUtils
# 获取日志器
logger = get_logger()
stream_logger = get_stream_logger()
def _truncate_json_for_logging(json_obj, max_length=500):
"""截断JSON对象用于日志记录,避免日志过大
Args:
json_obj: 要截断的JSON对象
max_length: 最大字符长度,默认500
Returns:
str: 截断后的JSON字符串
"""
if isinstance(json_obj, str):
json_str = json_obj
else:
json_str = json.dumps(json_obj, ensure_ascii=False)
if len(json_str) <= max_length:
return json_str
return json_str[:max_length] + f"... [截断,总长度: {len(json_str)}字符]"
def test_api_stream():
"""
测试API流式响应功能
"""
# 加载环境变量
load_dotenv()
# 获取API配置
api_url = os.getenv('API_URL')
api_key = os.getenv('API_KEY')
api_model = os.getenv('API_MODEL', 'gpt-3.5-turbo')
logger.info(f"开始测试API流式响应,API URL: {api_url}, MODEL: {api_model}")
# 检查API配置
if not api_url:
logger.error("API URL未配置,无法进行测试")
return
if not api_key:
logger.error("API Key未配置,无法进行测试")
return
# 标准化API URL
api_url = APIUtils.format_api_url(api_url)
logger.debug(f"标准化后的API URL: {api_url}")
# 构建简单的测试提示
prompt = "这是一个API流式响应测试。请给出一个简短的股票分析样例。"
# 构建请求头和请求体
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": api_model,
"messages": [{"role": "user", "content": prompt}],
"stream": True # 明确设置stream参数为True
}
logger.debug(f"请求载荷: {_truncate_json_for_logging(payload)}")
try:
logger.info(f"发起流式API请求: {api_url}")
response = requests.post(
api_url,
headers=headers,
json=payload,
timeout=int(os.getenv('API_TIMEOUT', 60)),
stream=True
)
logger.info(f"API流式响应状态码: {response.status_code}")
logger.debug(f"响应头: {response.headers}")
if response.status_code == 200:
logger.info("成功获取API流式响应,开始处理")
buffer = ""
chunk_count = 0
for line in response.iter_lines():
if line:
line_str = line.decode('utf-8')
logger.info(f"原始流式行: {line_str}")
# 跳过保持连接的空行
if line_str.strip() == '':
logger.debug("跳过空行")
continue
# 数据行通常以"data: "开头
if line_str.startswith('data: '):
data_content = line_str[6:].strip() # 移除 "data: " 前缀并去除前后空格
logger.info(f"数据内容: {data_content}")
# 检查是否为流的结束
if data_content == '[DONE]':
logger.info("收到流结束标记 [DONE]")
break
try:
# 解析JSON数据
json_data = json.loads(data_content)
logger.debug(f"JSON结构: {_truncate_json_for_logging(json_data)}")
if 'choices' in json_data:
delta = json_data['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
chunk_count += 1
buffer += content
logger.info(f"内容片段 #{chunk_count}: {content}")
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {e}, 内容: {data_content}")
else:
logger.warning(f"收到非'data:'开头的行: {line_str}")
logger.info(f"流式处理完成,共收到 {chunk_count} 个内容片段")
logger.info(f"完整内容:\n{buffer}")
else:
try:
error_response = response.json()
error_text = json.dumps(error_response, indent=2)
except:
error_text = response.text[:500] if response.text else "无响应内容"
logger.error(f"API请求失败: 状态码 {response.status_code}, 响应: {error_text}")
except Exception as e:
logger.error(f"测试过程中发生异常: {str(e)}")
logger.exception(e)
if __name__ == "__main__":
test_api_stream()