Spaces:
Running
Running
File size: 2,914 Bytes
b064311 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
import jwt
import time
import uuid
import logging
from typing import Dict, Any
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
from ..config import Config
# 设置日志
logger = logging.getLogger("sora-api.auth")
# 创建路由
router = APIRouter(prefix="/api/auth")
# JWT配置
JWT_SECRET = Config.ADMIN_KEY # 使用管理员密钥作为JWT秘钥
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION = 3600 # 令牌有效期1小时
# 认证请求模型
class LoginRequest(BaseModel):
admin_key: str
# 令牌响应模型
class TokenResponse(BaseModel):
token: str
expires_in: int
token_type: str = "bearer"
# 创建JWT令牌
def create_jwt_token(data: Dict[str, Any], expires_delta: int = JWT_EXPIRATION) -> str:
payload = data.copy()
issued_at = int(time.time())
expiration = issued_at + expires_delta
payload.update({
"iat": issued_at,
"exp": expiration,
"jti": str(uuid.uuid4())
})
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return token
# 验证JWT令牌
def verify_jwt_token(token: str) -> Dict[str, Any]:
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
# 检查token是否已过期
if payload.get("exp", 0) < time.time():
raise HTTPException(status_code=401, detail="令牌已过期")
return payload
except jwt.PyJWTError as e:
logger.warning(f"JWT验证失败: {str(e)}")
raise HTTPException(status_code=401, detail="无效的令牌")
# 登录API
@router.post("/login", response_model=TokenResponse)
async def login(request: LoginRequest):
"""管理员登录,返回JWT令牌"""
# 验证管理员密钥
if request.admin_key != Config.ADMIN_KEY:
logger.warning(f"尝试使用无效的管理员密钥登录")
# 使用固定延迟以防止计时攻击
time.sleep(1)
raise HTTPException(status_code=401, detail="管理员密钥错误")
# 创建令牌
token_data = {
"sub": "admin",
"role": "admin",
"name": "管理员"
}
token = create_jwt_token(token_data)
logger.info(f"管理员登录成功,生成新令牌")
return TokenResponse(token=token, expires_in=JWT_EXPIRATION)
# 刷新令牌API
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(token: str = Depends(verify_jwt_token)):
"""刷新JWT令牌"""
# 创建新令牌,保持相同的sub和role
token_data = {
"sub": token.get("sub"),
"role": token.get("role", "admin"),
"name": token.get("name", "管理员"),
"refresh_count": token.get("refresh_count", 0) + 1
}
new_token = create_jwt_token(token_data)
logger.info(f"令牌刷新成功,生成新令牌")
return TokenResponse(token=new_token, expires_in=JWT_EXPIRATION) |