""" API密钥模型 - 处理API密钥的CRUD操作 """ import json import uuid from datetime import datetime import os import pytz import sqlite3 from utils.db import get_db_connection from config import API_KEYS_FILE, DATABASE_PATH class ApiKeyManager: """管理API密钥的类""" @staticmethod def load_keys(): """加载所有API密钥 (兼容旧的JSON方式)""" if not os.path.exists(API_KEYS_FILE): with open(API_KEYS_FILE, 'w', encoding='utf-8') as f: json.dump({"api_keys": []}, f, ensure_ascii=False, indent=2) return {"api_keys": []} try: with open(API_KEYS_FILE, 'r', encoding='utf-8') as f: return json.load(f) except json.JSONDecodeError: return {"api_keys": []} @staticmethod def save_keys(data): """保存API密钥数据 (兼容旧的JSON方式)""" with open(API_KEYS_FILE, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) @staticmethod def get_all_keys(): """获取所有密钥""" conn = get_db_connection() try: cursor = conn.cursor() cursor.execute('SELECT * FROM api_keys') rows = cursor.fetchall() # 转换为字典列表 api_keys = [] for row in rows: key_dict = dict(row) # 转换success字段从整数为布尔值 key_dict['success'] = bool(key_dict['success']) api_keys.append(key_dict) return {"api_keys": api_keys} except sqlite3.Error as e: print(f"获取所有密钥时出错: {str(e)}") # 如果数据库出错,尝试从JSON文件加载 return ApiKeyManager.load_keys() finally: conn.close() @staticmethod def add_key(platform, name, key): """添加新的API密钥""" # 过滤掉key中的单引号、双引号、小括号、方括号和空格,防止存储时出错 if key: key = key.replace("'", "").replace('"', "").replace('(', "").replace(')', "").replace('[', "").replace(']', "").replace(' ', "") current_time = datetime.now(pytz.timezone('Asia/Shanghai')).isoformat() new_key_id = str(uuid.uuid4()) new_key = { "id": new_key_id, "platform": platform, "name": name, "key": key, "created_at": current_time, "updated_at": current_time, "success": False, "return_message": "等待测试", "states": "", "balance": 0 } conn = get_db_connection() try: cursor = conn.cursor() cursor.execute(''' INSERT INTO api_keys (id, platform, name, key, created_at, updated_at, success, return_message, states, balance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( new_key_id, platform, name, key, current_time, current_time, 0, # success是整数: 0表示False "等待测试", "", 0 )) conn.commit() return new_key except sqlite3.Error as e: print(f"添加密钥时出错: {str(e)}") conn.rollback() # 如果数据库出错,尝试使用JSON方式保存 api_keys_data = ApiKeyManager.load_keys() api_keys_data["api_keys"].append(new_key) ApiKeyManager.save_keys(api_keys_data) return new_key finally: conn.close() @staticmethod def delete_key(key_id): """删除指定的API密钥""" conn = get_db_connection() try: cursor = conn.cursor() cursor.execute('DELETE FROM api_keys WHERE id = ?', (key_id,)) deleted = cursor.rowcount > 0 conn.commit() return deleted except sqlite3.Error as e: print(f"删除密钥时出错: {str(e)}") conn.rollback() # 如果数据库出错,尝试从JSON文件删除 api_keys_data = ApiKeyManager.load_keys() original_count = len(api_keys_data["api_keys"]) api_keys_data["api_keys"] = [k for k in api_keys_data["api_keys"] if k.get("id") != key_id] if len(api_keys_data["api_keys"]) < original_count: ApiKeyManager.save_keys(api_keys_data) return True return False finally: conn.close() @staticmethod def bulk_delete(key_ids): """批量删除多个API密钥""" if not key_ids: return 0 conn = get_db_connection() try: cursor = conn.cursor() # 获取当前的密钥数量 cursor.execute('SELECT COUNT(*) FROM api_keys') original_count = cursor.fetchone()[0] # 使用参数化查询构建占位符 placeholders = ','.join(['?'] * len(key_ids)) cursor.execute(f'DELETE FROM api_keys WHERE id IN ({placeholders})', key_ids) # 获取删除后的密钥数量 cursor.execute('SELECT COUNT(*) FROM api_keys') new_count = cursor.fetchone()[0] conn.commit() return original_count - new_count except sqlite3.Error as e: print(f"批量删除密钥时出错: {str(e)}") conn.rollback() # 如果数据库出错,尝试从JSON文件删除 api_keys_data = ApiKeyManager.load_keys() original_count = len(api_keys_data["api_keys"]) api_keys_data["api_keys"] = [k for k in api_keys_data["api_keys"] if k.get("id") not in key_ids] deleted_count = original_count - len(api_keys_data["api_keys"]) if deleted_count > 0: ApiKeyManager.save_keys(api_keys_data) return deleted_count finally: conn.close() @staticmethod def bulk_add_keys(keys_data): """批量添加多个API密钥 Args: keys_data: 包含多个密钥信息的列表,每个元素包含platform、name、key Returns: 添加的密钥列表 """ if not keys_data: return [] added_keys = [] now = datetime.now(pytz.timezone('Asia/Shanghai')).isoformat() conn = get_db_connection() try: cursor = conn.cursor() for key_info in keys_data: platform = key_info.get("platform") name = key_info.get("name") key = key_info.get("key") # 过滤掉key中的单引号、双引号、小括号、方括号和空格,防止存储时出错 if key: key = key.replace("'", "").replace('"', "").replace('(', "").replace(')', "").replace('[', "").replace(']', "").replace(' ', "") new_key_id = str(uuid.uuid4()) new_key = { "id": new_key_id, "platform": platform, "name": name, "key": key, "created_at": now, "updated_at": now, "success": False, "return_message": "等待测试", "states": "", "balance": 0 } cursor.execute(''' INSERT INTO api_keys (id, platform, name, key, created_at, updated_at, success, return_message, states, balance) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( new_key_id, platform, name, key, now, now, 0, # success是整数: 0表示False "等待测试", "", 0 )) added_keys.append(new_key) conn.commit() return added_keys except sqlite3.Error as e: print(f"批量添加密钥时出错: {str(e)}") conn.rollback() # 如果数据库出错,尝试使用JSON方式保存 api_keys_data = ApiKeyManager.load_keys() for key in added_keys: api_keys_data["api_keys"].append(key) ApiKeyManager.save_keys(api_keys_data) return added_keys finally: conn.close() @staticmethod def update_key(key_id, name, key): """更新API密钥信息""" # 过滤掉key中的单引号、双引号、小括号、方括号和空格,防止存储时出错 if key: key = key.replace("'", "").replace('"', "").replace('(', "").replace(')', "").replace('[', "").replace(']', "").replace(' ', "") updated_at = datetime.now(pytz.timezone('Asia/Shanghai')).isoformat() conn = get_db_connection() try: cursor = conn.cursor() # 更新密钥 cursor.execute(''' UPDATE api_keys SET name = ?, key = ?, updated_at = ?, success = ?, return_message = ? WHERE id = ? ''', (name, key, updated_at, 0, "等待测试", key_id)) if cursor.rowcount > 0: conn.commit() # 获取更新后的密钥信息 cursor.execute('SELECT * FROM api_keys WHERE id = ?', (key_id,)) row = cursor.fetchone() if row: updated_key = dict(row) # 转换success字段从整数为布尔值 updated_key['success'] = bool(updated_key['success']) return updated_key return None except sqlite3.Error as e: print(f"更新密钥时出错: {str(e)}") conn.rollback() # 如果数据库出错,尝试使用JSON方式更新 api_keys_data = ApiKeyManager.load_keys() updated_key = None for k in api_keys_data["api_keys"]: if k.get("id") == key_id: k["name"] = name k["key"] = key k["updated_at"] = updated_at # 重置验证状态 k["success"] = False k["return_message"] = "等待测试" updated_key = k break if updated_key: ApiKeyManager.save_keys(api_keys_data) return updated_key finally: conn.close()