from flask import Flask, request, jsonify, Response, stream_with_context import requests import uuid import json import time import os import re API_KEY = os.getenv("API_KEY", "linux.do") app = Flask(__name__) @app.before_request def check_api_key(): key = request.headers.get("Authorization") if key != "Bearer "+API_KEY: return jsonify({"success": False, "message": "Unauthorized: Invalid API key"}), 403 @app.route('/v1/models', methods=['GET']) def get_models(): # 构建请求头 headers = { "Content-Type": "application/json", "Authorization": "Bearer "+API_KEY } # 发送请求到Akash API response = requests.get( 'https://text.pollinations.ai/models', headers=headers ) # 定义名称映射 name_mapping = { "claude-hybridspace": "claude hybridspace", "gemini-thinking": "gemini thinking", "gemini": "openai/gemini" } models_data = response.json() current_timestamp = int(time.time()) converted_data = { "object": "list", "data": [ { "id": name_mapping.get(model["name"], model["name"]), "object": "model", "created": current_timestamp, "owned_by": "openai" if "openai" in model["name"] else "third_party", "permissions": [], "root": name_mapping.get(model["name"], model["name"]), "parent": None, "capabilities": { "vision": model.get("vision", False), "audio": model.get("audio", False), "reasoning": model.get("reasoning", True) }, "description": model["description"] } for model in models_data ] } return converted_data @app.route('/v1/chat/completions', methods=['POST']) def chat_completions(): try: # 获取OpenAI格式的请求数据 data = request.json # 生成唯一ID chat_id = str(uuid.uuid4()).replace('-', '')[:16] # 构建请求头 headers = { "Content-Type": "application/json", "Authorization": "Bearer "+API_KEY } response = requests.post( 'https://text.pollinations.ai/openai/', json=data, headers=headers ) # 处理流式响应 def generate(): # 直接逐块传输 API 返回的数据 for chunk in response.iter_content(chunk_size=1024, decode_unicode=True): if chunk: yield chunk # 注意:如果 API 返回中已经包含 "data: [DONE]",就不必额外添加 _stream = data.get('stream', False) if _stream == True: return Response(stream_with_context(generate()), content_type="text/event-stream") else: return Response( response.text, headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/json' } ) except Exception as e: print(e) return jsonify({"error": str(e)}), 500 @app.route('/v1/audio/transcriptions', methods=['POST']) def audio_transcriptions(): try: # 确保请求是 multipart/form-data if 'file' not in request.files: return jsonify({"error": "Missing audio file"}), 400 audio_file = request.files['file'] # 获取音频文件 model = request.form.get("model", "openai-audio") # 获取模型参数,默认 whisper-1 # response_format = request.form.get("response_format", "text") # 获取返回格式,默认 text stream = request.form.get("stream", "false").lower() == "true" # 是否流式返回 # 生成唯一请求 ID chat_id = str(uuid.uuid4()).replace('-', '')[:16] # 构建 OpenAI API 请求 files = {"file": (audio_file.filename, audio_file.stream, audio_file.content_type)} data = {"model": model} headers = {"Authorization": f"Bearer {API_KEY}"} print(files) response = requests.post( "https://text.pollinations.ai/openai/audio/transcriptions", headers=headers, files=files, data=data, stream=stream # 如果 stream=True,保持流式传输 ) # 处理流式响应 def generate(): for chunk in response.iter_content(chunk_size=1024, decode_unicode=True): if chunk: yield chunk if stream: return Response(stream_with_context(generate()), content_type="text/event-stream") else: return Response( response.text, headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "Content-Type": "application/json" } ) except Exception as e: print("Error:", e) return jsonify({"error": str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5200)