File size: 4,850 Bytes
bbb6398 2809c18 bbb6398 2809c18 bbb6398 2809c18 bbb6398 2809c18 bbb6398 99fc92f bbb6398 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
"""
Web路由模块 - 处理所有页面请求和身份验证
"""
import re # 导入正则表达式模块
from flask import Blueprint, render_template, request, redirect, url_for, make_response, session
from datetime import datetime
from config import ADMIN_PASSWORD, PLATFORMS, PLATFORM_STYLES, TOKEN_EXPIRY_DAYS
from utils.auth import AuthManager
from models.api_key import ApiKeyManager
# 创建Web蓝图
web_bp = Blueprint('web', __name__)
# 辅助函数:为API密钥生成排序键
def get_sort_key(key):
"""
生成用于排序的元组 (balance, platform_priority)。
- balance: 主要排序键,降序。
- platform_priority: 次要排序键,仅在 balance 相同时生效,降序。
- Google: paid=1, free=0
- OpenAI/Claude: t5=5, t4=4, ..., t1=1, 其他=0
"""
balance = key.get('balance', 0)
platform_priority = 0
platform = key.get('platform', '').lower()
name = key.get('name', '').lower() # 用于 OpenAI/Claude tier 判断
states = key.get('states', '').lower() # 获取 states 字段并转小写,用于 Google 判断
if platform == 'google':
if 'paid' in states: # 检查 states 字段是否包含 'paid' (已转小写)
platform_priority = 1 # paid 优先
elif platform in ['openai', 'anthropic']: # 假设 Claude 的 platform id 是 'anthropic'
match = re.search(r't(\d)', name)
if match:
try:
tier = int(match.group(1))
if 1 <= tier <= 5: # 假设 tier 在 1-5 之间
platform_priority = tier
except ValueError:
pass # 如果转换失败,保持 priority 为 0
# 返回元组,Python 的 sorted 会按元组顺序比较
# 因为 sorted 默认升序,我们需要对 balance 和 priority 都取反来实现降序效果
# 或者直接在 sorted 中用 reverse=True,然后只对 platform_priority 取反(或用正数表示优先级)
# 这里我们用 reverse=True,所以 balance 自然降序,platform_priority 越大越优先
return (balance, platform_priority)
@web_bp.route('/')
def index():
"""首页 - 显示API密钥管理界面"""
api_keys_data = ApiKeyManager.get_all_keys()
all_keys_list = api_keys_data.get("api_keys", [])
# 按 balance 降序排序密钥
# 注意:确保 balance 存在且为数字类型,否则排序可能出错。使用 .get('balance', 0) 提供默认值。
# 使用新的排序逻辑
sorted_keys = sorted(all_keys_list, key=get_sort_key, reverse=True)
# 按平台分组密钥
grouped_keys = {}
for platform in PLATFORMS:
grouped_keys[platform["id"]] = {
"name": platform["name"],
"keys": []
}
# 将密钥加入对应的平台组
for key in sorted_keys: # 使用排序后的列表进行分组
platform_id = key.get("platform", "other").lower()
if platform_id not in grouped_keys:
platform_id = "other"
grouped_keys[platform_id]["keys"].append(key)
is_ajax = request.args.get('ajax', '0') == '1'
return render_template('index.html', platforms=PLATFORMS, grouped_keys=grouped_keys, platform_styles=PLATFORM_STYLES)
@web_bp.route('/update-test')
def update_test():
"""API密钥更新测试页面"""
return render_template('update_test.html')
@web_bp.route('/login', methods=['GET', 'POST'])
def login():
"""用户登录"""
error = None
if request.method == 'POST':
password = request.form.get('password')
if password == ADMIN_PASSWORD:
# 密码正确,生成令牌
token = AuthManager.generate_token()
AuthManager.store_token(token)
# 创建响应并设置Cookie
response = make_response(redirect(url_for('web.index')))
response.set_cookie(
'auth_token',
token,
max_age=TOKEN_EXPIRY_DAYS * 24 * 60 * 60,
httponly=True,
secure=request.is_secure
)
return response
else:
error = "密码错误,请重试"
# 传递当前年份以供页脚使用
current_year = datetime.now().year
return render_template('login.html', error=error, now={'year': current_year})
@web_bp.route('/logout')
def logout():
"""用户登出"""
# 从Cookie中获取令牌
token = request.cookies.get('auth_token')
# 删除令牌
if token:
AuthManager.remove_token(token)
# 返回响应并清除Cookie
response = make_response(redirect(url_for('web.login')))
response.delete_cookie('auth_token')
return response
|