Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import JSONResponse, StreamingResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from typing import List, Dict, Any, Union | |
import os | |
import time | |
import httpx | |
import json | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Simple configuration | |
API_PREFIX = os.getenv("API_PREFIX", "/") | |
MAX_RETRY_COUNT = int(os.getenv("MAX_RETRY_COUNT", "3")) | |
RETRY_DELAY = int(os.getenv("RETRY_DELAY", "5000")) | |
# Default headers for DuckDuckGo requests | |
FAKE_HEADERS = { | |
"Accept": "*/*", | |
"Accept-Language": "en-US,en;q=0.9", | |
"Origin": "https://duckduckgo.com/", | |
"Referer": "https://duckduckgo.com/", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36", | |
} | |
app = FastAPI() | |
# Add CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Models for request validation | |
class Message(BaseModel): | |
role: str | |
content: Union[str, List[Dict[str, Any]]] | |
class ChatCompletionRequest(BaseModel): | |
model: str | |
messages: List[Message] | |
stream: bool = False | |
# Add timing information | |
async def add_process_time(request: Request, call_next): | |
start_time = time.time() | |
response = await call_next(request) | |
process_time = time.time() - start_time | |
print(f"{request.method} {response.status_code} {request.url.path} {process_time*1000:.2f} ms") | |
return response | |
async def root(): | |
return {"message": "API server running"} | |
async def ping(): | |
return {"message": "pong"} | |
async def get_models(): | |
return { | |
"object": "list", | |
"data": [ | |
{"id": "gpt-4o-mini", "object": "model", "owned_by": "ddg"}, | |
{"id": "claude-3-haiku", "object": "model", "owned_by": "ddg"}, | |
{"id": "llama-3.1-70b", "object": "model", "owned_by": "ddg"}, | |
{"id": "mixtral-8x7b", "object": "model", "owned_by": "ddg"}, | |
{"id": "o3-mini", "object": "model", "owned_by": "ddg"}, | |
], | |
} | |
async def chat_completions(request: ChatCompletionRequest): | |
try: | |
model = convert_model(request.model) | |
content = messages_to_text(request.messages) | |
return await create_completion(model, content, request.stream) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def convert_model(input_model: str) -> str: | |
"""Convert public model names to DuckDuckGo internal model names""" | |
model_mapping = { | |
"claude-3-haiku": "claude-3-haiku-20240307", | |
"llama-3.1-70b": "meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo", | |
"mixtral-8x7b": "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
"o3-mini": "o3-mini" | |
} | |
return model_mapping.get(input_model.lower(), "gpt-4o-mini") | |
def messages_to_text(messages: List[Message]) -> str: | |
"""Convert message array to text format expected by DuckDuckGo API""" | |
result = "" | |
for message in messages: | |
role = "user" if message.role == "system" else message.role | |
if role in ["user", "assistant"]: | |
# Handle both string content and structured content | |
if isinstance(message.content, list): | |
content_str = "".join([item.get("text", "") for item in message.content if item.get("text", "")]) | |
else: | |
content_str = message.content | |
result += f"{role}:{content_str};\r\n" | |
return result | |
async def request_token() -> str: | |
"""Get auth token from DuckDuckGo""" | |
try: | |
async with httpx.AsyncClient() as client: | |
response = await client.get( | |
"https://duckduckgo.com/duckchat/v1/status", | |
headers={**FAKE_HEADERS, "x-vqd-accept": "1"} | |
) | |
return response.headers.get("x-vqd-4", "") | |
except Exception as e: | |
print(f"Token request error: {e}") | |
return "" | |
async def create_completion(model: str, content: str, return_stream: bool, retry_count: int = 0): | |
"""Create a chat completion via DuckDuckGo API""" | |
token = await request_token() | |
try: | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
"https://duckduckgo.com/duckchat/v1/chat", | |
headers={ | |
**FAKE_HEADERS, | |
"Accept": "text/event-stream", | |
"Content-Type": "application/json", | |
"x-vqd-4": token, | |
}, | |
json={ | |
"model": model, | |
"messages": [{"role": "user", "content": content}] | |
}, | |
stream=True | |
) | |
if response.status_code != 200: | |
raise HTTPException(status_code=response.status_code, detail="API request failed") | |
return await process_stream(model, response, return_stream) | |
except Exception as e: | |
if retry_count < MAX_RETRY_COUNT: | |
print(f"Retrying... attempt {retry_count + 1}") | |
await asyncio.sleep(RETRY_DELAY / 1000) | |
return await create_completion(model, content, return_stream, retry_count + 1) | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def process_stream(model: str, response, return_stream: bool): | |
"""Process streaming response from DuckDuckGo""" | |
buffer = "" | |
full_text = "" | |
async def generate_stream(): | |
nonlocal buffer, full_text | |
# Process chunks as they arrive | |
async for chunk in response.aiter_bytes(): | |
chunk_str = chunk.decode('utf-8').strip() | |
# Handle buffer from previous chunk if needed | |
if buffer: | |
chunk_str = buffer + chunk_str | |
buffer = "" | |
# Handle incomplete chunks | |
if not chunk_str.endswith('"}') and "[DONE]" not in chunk_str: | |
buffer = chunk_str | |
continue | |
# Process each line in the chunk | |
for line in chunk_str.split('\n'): | |
if len(line) < 6: | |
continue | |
# Remove prefix (data: ) | |
line = line[6:] if line.startswith("data: ") else line | |
# Handle completion signal | |
if line == "[DONE]": | |
if return_stream: | |
yield f"data: {json.dumps(create_stop_chunk(model))}\n\n" | |
return | |
# Parse and handle message content | |
try: | |
data = json.loads(line) | |
if data.get("action") == "success" and "message" in data: | |
message = data["message"] | |
full_text += message | |
if return_stream: | |
yield f"data: {json.dumps(create_chunk(message, model))}\n\n" | |
except json.JSONDecodeError: | |
continue | |
# Return appropriate response based on streaming preference | |
if return_stream: | |
return StreamingResponse(generate_stream(), media_type="text/event-stream") | |
else: | |
# For non-streaming, consume the generator and return complete response | |
async for _ in generate_stream(): | |
pass # Just collecting text in full_text | |
return JSONResponse(content=create_complete_response(full_text, model)) | |
def create_chunk(text: str, model: str) -> dict: | |
"""Create a streaming chunk response""" | |
return { | |
"id": "chatcmpl-123", | |
"object": "chat.completion.chunk", | |
"created": int(time.time()), | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": {"content": text}, | |
"finish_reason": None, | |
}, | |
], | |
} | |
def create_stop_chunk(model: str) -> dict: | |
"""Create a final streaming chunk with stop reason""" | |
return { | |
"id": "chatcmpl-123", | |
"object": "chat.completion.chunk", | |
"created": int(time.time()), | |
"model": model, | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": {}, | |
"finish_reason": "stop", | |
}, | |
], | |
} | |
def create_complete_response(text: str, model: str) -> dict: | |
"""Create a complete non-streaming response""" | |
return { | |
"id": "chatcmpl-123", | |
"object": "chat.completion", | |
"created": int(time.time()), | |
"model": model, | |
"usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, | |
"choices": [ | |
{ | |
"message": {"content": text, "role": "assistant"}, | |
"index": 0, | |
"finish_reason": "stop", | |
}, | |
], | |
} | |
# Only needed for retry delays | |
import asyncio | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) |