File size: 2,914 Bytes
b064311
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import jwt
import time
import uuid
import logging
from typing import Dict, Any
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel

from ..config import Config

# 设置日志
logger = logging.getLogger("sora-api.auth")

# 创建路由
router = APIRouter(prefix="/api/auth")

# JWT配置
JWT_SECRET = Config.ADMIN_KEY  # 使用管理员密钥作为JWT秘钥
JWT_ALGORITHM = "HS256"
JWT_EXPIRATION = 3600  # 令牌有效期1小时

# 认证请求模型
class LoginRequest(BaseModel):
    admin_key: str

# 令牌响应模型
class TokenResponse(BaseModel):
    token: str
    expires_in: int
    token_type: str = "bearer"

# 创建JWT令牌
def create_jwt_token(data: Dict[str, Any], expires_delta: int = JWT_EXPIRATION) -> str:
    payload = data.copy()
    issued_at = int(time.time())
    expiration = issued_at + expires_delta
    payload.update({
        "iat": issued_at,
        "exp": expiration,
        "jti": str(uuid.uuid4())
    })
    
    token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
    return token

# 验证JWT令牌
def verify_jwt_token(token: str) -> Dict[str, Any]:
    try:
        payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
        # 检查token是否已过期
        if payload.get("exp", 0) < time.time():
            raise HTTPException(status_code=401, detail="令牌已过期")
        return payload
    except jwt.PyJWTError as e:
        logger.warning(f"JWT验证失败: {str(e)}")
        raise HTTPException(status_code=401, detail="无效的令牌")

# 登录API
@router.post("/login", response_model=TokenResponse)
async def login(request: LoginRequest):
    """管理员登录,返回JWT令牌"""
    # 验证管理员密钥
    if request.admin_key != Config.ADMIN_KEY:
        logger.warning(f"尝试使用无效的管理员密钥登录")
        # 使用固定延迟以防止计时攻击
        time.sleep(1)
        raise HTTPException(status_code=401, detail="管理员密钥错误")
    
    # 创建令牌
    token_data = {
        "sub": "admin",
        "role": "admin",
        "name": "管理员"
    }
    
    token = create_jwt_token(token_data)
    logger.info(f"管理员登录成功,生成新令牌")
    
    return TokenResponse(token=token, expires_in=JWT_EXPIRATION)

# 刷新令牌API
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(token: str = Depends(verify_jwt_token)):
    """刷新JWT令牌"""
    # 创建新令牌,保持相同的sub和role
    token_data = {
        "sub": token.get("sub"),
        "role": token.get("role", "admin"),
        "name": token.get("name", "管理员"),
        "refresh_count": token.get("refresh_count", 0) + 1
    }
    
    new_token = create_jwt_token(token_data)
    logger.info(f"令牌刷新成功,生成新令牌")
    
    return TokenResponse(token=new_token, expires_in=JWT_EXPIRATION)