api / app.py
1v1's picture
Update app.py
736600b verified
from flask import Flask, request, jsonify, Response, stream_with_context
import requests
import uuid
import json
import time
import os
import re
API_KEY = os.getenv("API_KEY", "linux.do")
app = Flask(__name__)
@app.before_request
def check_api_key():
key = request.headers.get("Authorization")
if key != "Bearer "+API_KEY:
return jsonify({"success": False, "message": "Unauthorized: Invalid API key"}), 403
@app.route('/v1/models', methods=['GET'])
def get_models():
# 构建请求头
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer "+API_KEY
}
# 发送请求到Akash API
response = requests.get(
'https://text.pollinations.ai/models',
headers=headers
)
# 定义名称映射
name_mapping = {
"claude-hybridspace": "claude hybridspace",
"gemini-thinking": "gemini thinking",
"gemini": "openai/gemini"
}
models_data = response.json()
current_timestamp = int(time.time())
converted_data = {
"object": "list",
"data": [
{
"id": name_mapping.get(model["name"], model["name"]),
"object": "model",
"created": current_timestamp,
"owned_by": "openai" if "openai" in model["name"] else "third_party",
"permissions": [],
"root": name_mapping.get(model["name"], model["name"]),
"parent": None,
"capabilities": {
"vision": model.get("vision", False),
"audio": model.get("audio", False),
"reasoning": model.get("reasoning", True)
},
"description": model["description"]
}
for model in models_data
]
}
return converted_data
@app.route('/v1/chat/completions', methods=['POST'])
def chat_completions():
try:
# 获取OpenAI格式的请求数据
data = request.json
# 生成唯一ID
chat_id = str(uuid.uuid4()).replace('-', '')[:16]
# 构建请求头
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer "+API_KEY
}
response = requests.post(
'https://text.pollinations.ai/openai/',
json=data,
headers=headers
)
# 处理流式响应
def generate():
# 直接逐块传输 API 返回的数据
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
if chunk:
yield chunk
# 注意:如果 API 返回中已经包含 "data: [DONE]",就不必额外添加
_stream = data.get('stream', False)
if _stream == True:
return Response(stream_with_context(generate()), content_type="text/event-stream")
else:
return Response(
response.text,
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/json'
}
)
except Exception as e:
print(e)
return jsonify({"error": str(e)}), 500
@app.route('/v1/audio/transcriptions', methods=['POST'])
def audio_transcriptions():
try:
# 确保请求是 multipart/form-data
if 'file' not in request.files:
return jsonify({"error": "Missing audio file"}), 400
audio_file = request.files['file'] # 获取音频文件
model = request.form.get("model", "openai-audio") # 获取模型参数,默认 whisper-1
# response_format = request.form.get("response_format", "text") # 获取返回格式,默认 text
stream = request.form.get("stream", "false").lower() == "true" # 是否流式返回
# 生成唯一请求 ID
chat_id = str(uuid.uuid4()).replace('-', '')[:16]
# 构建 OpenAI API 请求
files = {"file": (audio_file.filename, audio_file.stream, audio_file.content_type)}
data = {"model": model}
headers = {"Authorization": f"Bearer {API_KEY}"}
print(files)
response = requests.post(
"https://text.pollinations.ai/openai/audio/transcriptions",
headers=headers,
files=files,
data=data,
stream=stream # 如果 stream=True,保持流式传输
)
# 处理流式响应
def generate():
for chunk in response.iter_content(chunk_size=1024, decode_unicode=True):
if chunk:
yield chunk
if stream:
return Response(stream_with_context(generate()), content_type="text/event-stream")
else:
return Response(
response.text,
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/json"
}
)
except Exception as e:
print("Error:", e)
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5200)