vkjlwnvbioWBV / server.py
devme's picture
Upload 90 files
9314c03 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Warp Protobuf 桥接服务器启动文件
提供 Warp API 桥接端点(/api/warp/send_stream、/api/warp/send_stream_sse)、/healthz 与 /v1/models。已移除 GUI、静态文件与 WebSocket 功能。
"""
import os
import asyncio
import json
from pathlib import Path
import uvicorn
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
# 新增:类型导入
from typing import Any, Dict, List
from warp2protobuf.api.protobuf_routes import app as protobuf_app
from warp2protobuf.core.logging import logger
from warp2protobuf.core.auth import acquire_anonymous_access_token
from warp2protobuf.config.models import get_all_unique_models
# 导入OpenAI兼容路由
from protobuf2openai.router import router as openai_router
# ============= JSON Schema 清理函数已移至 warp2protobuf.core.schema_sanitizer 模块 =============
# ============= 应用创建 =============
def create_app() -> FastAPI:
"""创建FastAPI应用"""
# 使用protobuf路由的应用作为主应用
app = protobuf_app
# 挂载OpenAI兼容路由
app.include_router(openai_router)
# 挂载输入 schema 清理中间件(覆盖 Warp 相关端点)
# ============= OpenAI 兼容:模型列表接口(通过router提供) =============
return app
def create_app_with_lifespan() -> FastAPI:
"""创建带有lifespan事件处理的FastAPI应用"""
app = FastAPI(
title="Warp Protobuf Bridge Server",
description="Warp API 桥接服务器,提供 Protobuf 编解码与 OpenAI 兼容接口",
version="1.0.0",
lifespan=lifespan
)
# 添加CORS中间件
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 挂载子应用
app.mount("/api", protobuf_app)
# 包含OpenAI兼容路由
app.include_router(openai_router)
# ============= 根路径与健康检查 =============
@app.get("/")
async def root():
return {
"service": "Warp Protobuf Bridge Server",
"status": "running",
"endpoints": {
"health": "/healthz",
"models": "/v1/models",
"protobuf_bridge": "/api/warp/send_stream",
"sse_bridge": "/api/warp/send_stream_sse",
"auth_refresh": "/api/auth/refresh"
}
}
@app.get("/healthz")
async def health_check():
return {"status": "ok", "service": "Warp Protobuf Bridge Server"}
return app
############################################################
# server_message_data 深度编解码工具
############################################################
# 说明:
# 根据抓包与分析,server_message_data 是 Base64URL 编码的 proto3 消息:
# - 字段 1:string(通常为 36 字节 UUID)
# - 字段 3:google.protobuf.Timestamp(字段1=seconds,字段2=nanos)
# 可能出现:仅 Timestamp、仅 UUID、或 UUID + Timestamp。
from typing import Dict, Optional, Tuple
import base64
from datetime import datetime, timezone
try:
from zoneinfo import ZoneInfo # Python 3.9+
except Exception:
ZoneInfo = None # type: ignore
def _b64url_decode_padded(s: str) -> bytes:
t = s.replace("-", "+").replace("_", "/")
pad = (-len(t)) % 4
if pad:
t += "=" * pad
return base64.b64decode(t)
def _b64url_encode_nopad(b: bytes) -> str:
return base64.urlsafe_b64encode(b).decode("ascii").rstrip("=")
def _read_varint(buf: bytes, i: int) -> Tuple[int, int]:
shift = 0
val = 0
while i < len(buf):
b = buf[i]
i += 1
val |= (b & 0x7F) << shift
if not (b & 0x80):
return val, i
shift += 7
if shift > 63:
break
raise ValueError("invalid varint")
def _write_varint(v: int) -> bytes:
out = bytearray()
vv = int(v)
while True:
to_write = vv & 0x7F
vv >>= 7
if vv:
out.append(to_write | 0x80)
else:
out.append(to_write)
break
return bytes(out)
def _make_key(field_no: int, wire_type: int) -> bytes:
return _write_varint((field_no << 3) | wire_type)
def _decode_timestamp(buf: bytes) -> Tuple[Optional[int], Optional[int]]:
# google.protobuf.Timestamp: field 1 = seconds (int64 varint), field 2 = nanos (int32 varint)
i = 0
seconds: Optional[int] = None
nanos: Optional[int] = None
while i < len(buf):
key, i = _read_varint(buf, i)
field_no = key >> 3
wt = key & 0x07
if wt == 0: # varint
val, i = _read_varint(buf, i)
if field_no == 1:
seconds = int(val)
elif field_no == 2:
nanos = int(val)
elif wt == 2: # length-delimited (not expected inside Timestamp)
ln, i2 = _read_varint(buf, i)
i = i2 + ln
elif wt == 1:
i += 8
elif wt == 5:
i += 4
else:
break
return seconds, nanos
def _encode_timestamp(seconds: Optional[int], nanos: Optional[int]) -> bytes:
parts = bytearray()
if seconds is not None:
parts += _make_key(1, 0) # field 1, varint
parts += _write_varint(int(seconds))
if nanos is not None:
parts += _make_key(2, 0) # field 2, varint
parts += _write_varint(int(nanos))
return bytes(parts)
def decode_server_message_data(b64url: str) -> Dict:
"""解码 Base64URL 的 server_message_data,返回结构化信息。"""
try:
raw = _b64url_decode_padded(b64url)
except Exception as e:
return {"error": f"base64url decode failed: {e}", "raw_b64url": b64url}
i = 0
uuid: Optional[str] = None
seconds: Optional[int] = None
nanos: Optional[int] = None
while i < len(raw):
key, i = _read_varint(raw, i)
field_no = key >> 3
wt = key & 0x07
if wt == 2: # length-delimited
ln, i2 = _read_varint(raw, i)
i = i2
data = raw[i:i+ln]
i += ln
if field_no == 1: # uuid string
try:
uuid = data.decode("utf-8")
except Exception:
uuid = None
elif field_no == 3: # google.protobuf.Timestamp
seconds, nanos = _decode_timestamp(data)
elif wt == 0: # varint -> not expected, skip
_, i = _read_varint(raw, i)
elif wt == 1:
i += 8
elif wt == 5:
i += 4
else:
break
out: Dict[str, Any] = {}
if uuid is not None:
out["uuid"] = uuid
if seconds is not None:
out["seconds"] = seconds
if nanos is not None:
out["nanos"] = nanos
return out
def encode_server_message_data(uuid: str = None, seconds: int = None, nanos: int = None) -> str:
"""将 uuid/seconds/nanos 组合编码为 Base64URL 字符串。"""
parts = bytearray()
if uuid:
b = uuid.encode("utf-8")
parts += _make_key(1, 2) # field 1, length-delimited
parts += _write_varint(len(b))
parts += b
if seconds is not None or nanos is not None:
ts = _encode_timestamp(seconds, nanos)
parts += _make_key(3, 2) # field 3, length-delimited
parts += _write_varint(len(ts))
parts += ts
return _b64url_encode_nopad(bytes(parts))
async def startup_tasks():
"""启动时执行的任务"""
logger.info("="*60)
logger.info("Warp Protobuf编解码服务器启动")
logger.info("="*60)
# 检查protobuf运行时
try:
from warp2protobuf.core.protobuf import ensure_proto_runtime
ensure_proto_runtime()
logger.info("✅ Protobuf运行时初始化成功")
except Exception as e:
logger.error(f"❌ Protobuf运行时初始化失败: {e}")
raise
# 检查JWT token
try:
from warp2protobuf.core.auth import get_jwt_token, is_token_expired, refresh_jwt_if_needed
token = get_jwt_token()
if token and not is_token_expired(token):
logger.info("✅ JWT token有效")
elif not token:
logger.warning("⚠️ 未找到JWT token,尝试申请匿名访问token用于额度初始化…")
try:
new_token = await acquire_anonymous_access_token()
if new_token:
logger.info("✅ 匿名访问token申请成功")
else:
logger.warning("⚠️ 匿名访问token申请失败")
except Exception as e2:
logger.warning(f"⚠️ 匿名访问token申请异常: {e2}")
else:
logger.warning("⚠️ JWT token无效或已过期,尝试自动刷新…")
try:
refresh_success = await refresh_jwt_if_needed()
if refresh_success:
logger.info("✅ JWT token自动刷新成功")
else:
logger.warning("⚠️ JWT token自动刷新失败,建议手动运行: uv run refresh_jwt.py")
except Exception as e3:
logger.warning(f"⚠️ JWT token自动刷新异常: {e3},建议手动运行: uv run refresh_jwt.py")
except Exception as e:
logger.warning(f"⚠️ JWT检查失败: {e}")
# OpenAI 兼容层已集成到当前服务器中
# 显示可用端点
logger.info("-"*40)
logger.info("可用的API端点:")
logger.info(" GET / - 服务信息")
logger.info(" GET /healthz - 健康检查")
logger.info(" GET /v1/models - 模型列表(OpenAI兼容)")
logger.info(" POST /api/warp/send_stream - Warp API 转发(返回解析事件)")
logger.info(" POST /api/warp/send_stream_sse - Warp API 转发(实时SSE)")
logger.info(" POST /api/auth/refresh - 刷新JWT token")
logger.info("="*60)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
# 启动时执行
await startup_tasks()
yield
# 关闭时执行(如果需要的话)
def main():
"""主函数"""
# 创建应用(使用lifespan)
app = create_app_with_lifespan()
# 启动服务器
try:
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info",
access_log=True
)
except KeyboardInterrupt:
logger.info("服务器被用户停止")
except Exception as e:
logger.error(f"服务器启动失败: {e}")
raise
if __name__ == "__main__":
main()