File size: 11,350 Bytes
fb3db59 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
"""
API密钥模型 - 处理API密钥的CRUD操作
"""
import json
import uuid
from datetime import datetime
import os
import pytz
import sqlite3
from utils.db import get_db_connection
from config import API_KEYS_FILE, DATABASE_PATH
class ApiKeyManager:
"""管理API密钥的类"""
@staticmethod
def load_keys():
"""加载所有API密钥 (兼容旧的JSON方式)"""
if not os.path.exists(API_KEYS_FILE):
with open(API_KEYS_FILE, 'w', encoding='utf-8') as f:
json.dump({"api_keys": []}, f, ensure_ascii=False, indent=2)
return {"api_keys": []}
try:
with open(API_KEYS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except json.JSONDecodeError:
return {"api_keys": []}
@staticmethod
def save_keys(data):
"""保存API密钥数据 (兼容旧的JSON方式)"""
with open(API_KEYS_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
@staticmethod
def get_all_keys():
"""获取所有密钥"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute('SELECT * FROM api_keys')
rows = cursor.fetchall()
# 转换为字典列表
api_keys = []
for row in rows:
key_dict = dict(row)
# 转换success字段从整数为布尔值
key_dict['success'] = bool(key_dict['success'])
api_keys.append(key_dict)
return {"api_keys": api_keys}
except sqlite3.Error as e:
print(f"获取所有密钥时出错: {str(e)}")
# 如果数据库出错,尝试从JSON文件加载
return ApiKeyManager.load_keys()
finally:
conn.close()
@staticmethod
def add_key(platform, name, key):
"""添加新的API密钥"""
# 过滤掉key中的单引号、双引号、小括号、方括号和空格,防止存储时出错
if key:
key = key.replace("'", "").replace('"', "").replace('(', "").replace(')', "").replace('[', "").replace(']', "").replace(' ', "")
current_time = datetime.now(pytz.timezone('Asia/Shanghai')).isoformat()
new_key_id = str(uuid.uuid4())
new_key = {
"id": new_key_id,
"platform": platform,
"name": name,
"key": key,
"created_at": current_time,
"updated_at": current_time,
"success": False,
"return_message": "等待测试",
"states": "",
"balance": 0
}
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO api_keys
(id, platform, name, key, created_at, updated_at, success, return_message, states, balance)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
new_key_id,
platform,
name,
key,
current_time,
current_time,
0, # success是整数: 0表示False
"等待测试",
"",
0
))
conn.commit()
return new_key
except sqlite3.Error as e:
print(f"添加密钥时出错: {str(e)}")
conn.rollback()
# 如果数据库出错,尝试使用JSON方式保存
api_keys_data = ApiKeyManager.load_keys()
api_keys_data["api_keys"].append(new_key)
ApiKeyManager.save_keys(api_keys_data)
return new_key
finally:
conn.close()
@staticmethod
def delete_key(key_id):
"""删除指定的API密钥"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute('DELETE FROM api_keys WHERE id = ?', (key_id,))
deleted = cursor.rowcount > 0
conn.commit()
return deleted
except sqlite3.Error as e:
print(f"删除密钥时出错: {str(e)}")
conn.rollback()
# 如果数据库出错,尝试从JSON文件删除
api_keys_data = ApiKeyManager.load_keys()
original_count = len(api_keys_data["api_keys"])
api_keys_data["api_keys"] = [k for k in api_keys_data["api_keys"] if k.get("id") != key_id]
if len(api_keys_data["api_keys"]) < original_count:
ApiKeyManager.save_keys(api_keys_data)
return True
return False
finally:
conn.close()
@staticmethod
def bulk_delete(key_ids):
"""批量删除多个API密钥"""
if not key_ids:
return 0
conn = get_db_connection()
try:
cursor = conn.cursor()
# 获取当前的密钥数量
cursor.execute('SELECT COUNT(*) FROM api_keys')
original_count = cursor.fetchone()[0]
# 使用参数化查询构建占位符
placeholders = ','.join(['?'] * len(key_ids))
cursor.execute(f'DELETE FROM api_keys WHERE id IN ({placeholders})', key_ids)
# 获取删除后的密钥数量
cursor.execute('SELECT COUNT(*) FROM api_keys')
new_count = cursor.fetchone()[0]
conn.commit()
return original_count - new_count
except sqlite3.Error as e:
print(f"批量删除密钥时出错: {str(e)}")
conn.rollback()
# 如果数据库出错,尝试从JSON文件删除
api_keys_data = ApiKeyManager.load_keys()
original_count = len(api_keys_data["api_keys"])
api_keys_data["api_keys"] = [k for k in api_keys_data["api_keys"] if k.get("id") not in key_ids]
deleted_count = original_count - len(api_keys_data["api_keys"])
if deleted_count > 0:
ApiKeyManager.save_keys(api_keys_data)
return deleted_count
finally:
conn.close()
@staticmethod
def bulk_add_keys(keys_data):
"""批量添加多个API密钥
Args:
keys_data: 包含多个密钥信息的列表,每个元素包含platform、name、key
Returns:
添加的密钥列表
"""
if not keys_data:
return []
added_keys = []
now = datetime.now(pytz.timezone('Asia/Shanghai')).isoformat()
conn = get_db_connection()
try:
cursor = conn.cursor()
for key_info in keys_data:
platform = key_info.get("platform")
name = key_info.get("name")
key = key_info.get("key")
# 过滤掉key中的单引号、双引号、小括号、方括号和空格,防止存储时出错
if key:
key = key.replace("'", "").replace('"', "").replace('(', "").replace(')', "").replace('[', "").replace(']', "").replace(' ', "")
new_key_id = str(uuid.uuid4())
new_key = {
"id": new_key_id,
"platform": platform,
"name": name,
"key": key,
"created_at": now,
"updated_at": now,
"success": False,
"return_message": "等待测试",
"states": "",
"balance": 0
}
cursor.execute('''
INSERT INTO api_keys
(id, platform, name, key, created_at, updated_at, success, return_message, states, balance)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
new_key_id,
platform,
name,
key,
now,
now,
0, # success是整数: 0表示False
"等待测试",
"",
0
))
added_keys.append(new_key)
conn.commit()
return added_keys
except sqlite3.Error as e:
print(f"批量添加密钥时出错: {str(e)}")
conn.rollback()
# 如果数据库出错,尝试使用JSON方式保存
api_keys_data = ApiKeyManager.load_keys()
for key in added_keys:
api_keys_data["api_keys"].append(key)
ApiKeyManager.save_keys(api_keys_data)
return added_keys
finally:
conn.close()
@staticmethod
def update_key(key_id, name, key):
"""更新API密钥信息"""
# 过滤掉key中的单引号、双引号、小括号、方括号和空格,防止存储时出错
if key:
key = key.replace("'", "").replace('"', "").replace('(', "").replace(')', "").replace('[', "").replace(']', "").replace(' ', "")
updated_at = datetime.now(pytz.timezone('Asia/Shanghai')).isoformat()
conn = get_db_connection()
try:
cursor = conn.cursor()
# 更新密钥
cursor.execute('''
UPDATE api_keys
SET name = ?, key = ?, updated_at = ?, success = ?, return_message = ?
WHERE id = ?
''', (name, key, updated_at, 0, "等待测试", key_id))
if cursor.rowcount > 0:
conn.commit()
# 获取更新后的密钥信息
cursor.execute('SELECT * FROM api_keys WHERE id = ?', (key_id,))
row = cursor.fetchone()
if row:
updated_key = dict(row)
# 转换success字段从整数为布尔值
updated_key['success'] = bool(updated_key['success'])
return updated_key
return None
except sqlite3.Error as e:
print(f"更新密钥时出错: {str(e)}")
conn.rollback()
# 如果数据库出错,尝试使用JSON方式更新
api_keys_data = ApiKeyManager.load_keys()
updated_key = None
for k in api_keys_data["api_keys"]:
if k.get("id") == key_id:
k["name"] = name
k["key"] = key
k["updated_at"] = updated_at
# 重置验证状态
k["success"] = False
k["return_message"] = "等待测试"
updated_key = k
break
if updated_key:
ApiKeyManager.save_keys(api_keys_data)
return updated_key
finally:
conn.close()
|