File size: 11,350 Bytes
fb3db59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
"""
API密钥模型 - 处理API密钥的CRUD操作
"""
import json
import uuid
from datetime import datetime
import os
import pytz
import sqlite3
from utils.db import get_db_connection
from config import API_KEYS_FILE, DATABASE_PATH

class ApiKeyManager:
    """管理API密钥的类"""
    
    @staticmethod
    def load_keys():
        """加载所有API密钥 (兼容旧的JSON方式)"""
        if not os.path.exists(API_KEYS_FILE):
            with open(API_KEYS_FILE, 'w', encoding='utf-8') as f:
                json.dump({"api_keys": []}, f, ensure_ascii=False, indent=2)
            return {"api_keys": []}
        
        try:
            with open(API_KEYS_FILE, 'r', encoding='utf-8') as f:
                return json.load(f)
        except json.JSONDecodeError:
            return {"api_keys": []}
    
    @staticmethod
    def save_keys(data):
        """保存API密钥数据 (兼容旧的JSON方式)"""
        with open(API_KEYS_FILE, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
    
    @staticmethod
    def get_all_keys():
        """获取所有密钥"""
        conn = get_db_connection()
        try:
            cursor = conn.cursor()
            cursor.execute('SELECT * FROM api_keys')
            rows = cursor.fetchall()
            
            # 转换为字典列表
            api_keys = []
            for row in rows:
                key_dict = dict(row)
                # 转换success字段从整数为布尔值
                key_dict['success'] = bool(key_dict['success'])
                api_keys.append(key_dict)
            
            return {"api_keys": api_keys}
        except sqlite3.Error as e:
            print(f"获取所有密钥时出错: {str(e)}")
            # 如果数据库出错,尝试从JSON文件加载
            return ApiKeyManager.load_keys()
        finally:
            conn.close()
    
    @staticmethod
    def add_key(platform, name, key):
        """添加新的API密钥"""
        # 过滤掉key中的单引号、双引号、小括号、方括号和空格,防止存储时出错
        if key:
            key = key.replace("'", "").replace('"', "").replace('(', "").replace(')', "").replace('[', "").replace(']', "").replace(' ', "")
        
        current_time = datetime.now(pytz.timezone('Asia/Shanghai')).isoformat()
        new_key_id = str(uuid.uuid4())
        
        new_key = {
            "id": new_key_id,
            "platform": platform,
            "name": name,
            "key": key,
            "created_at": current_time,
            "updated_at": current_time,
            "success": False,
            "return_message": "等待测试",
            "states": "",
            "balance": 0
        }
        
        conn = get_db_connection()
        try:
            cursor = conn.cursor()
            cursor.execute('''
            INSERT INTO api_keys 
            (id, platform, name, key, created_at, updated_at, success, return_message, states, balance)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                new_key_id,
                platform,
                name,
                key,
                current_time,
                current_time,
                0,  # success是整数: 0表示False
                "等待测试",
                "",
                0
            ))
            conn.commit()
                
            return new_key
        except sqlite3.Error as e:
            print(f"添加密钥时出错: {str(e)}")
            conn.rollback()
            # 如果数据库出错,尝试使用JSON方式保存
            api_keys_data = ApiKeyManager.load_keys()
            api_keys_data["api_keys"].append(new_key)
            ApiKeyManager.save_keys(api_keys_data)
            return new_key
        finally:
            conn.close()
    
    @staticmethod
    def delete_key(key_id):
        """删除指定的API密钥"""
        conn = get_db_connection()
        try:
            cursor = conn.cursor()
            cursor.execute('DELETE FROM api_keys WHERE id = ?', (key_id,))
            deleted = cursor.rowcount > 0
            conn.commit()
            return deleted
        except sqlite3.Error as e:
            print(f"删除密钥时出错: {str(e)}")
            conn.rollback()
            # 如果数据库出错,尝试从JSON文件删除
            api_keys_data = ApiKeyManager.load_keys()
            original_count = len(api_keys_data["api_keys"])
            api_keys_data["api_keys"] = [k for k in api_keys_data["api_keys"] if k.get("id") != key_id]
            
            if len(api_keys_data["api_keys"]) < original_count:
                ApiKeyManager.save_keys(api_keys_data)
                return True
            return False
        finally:
            conn.close()
    
    @staticmethod
    def bulk_delete(key_ids):
        """批量删除多个API密钥"""
        if not key_ids:
            return 0
        
        conn = get_db_connection()
        try:
            cursor = conn.cursor()
            # 获取当前的密钥数量
            cursor.execute('SELECT COUNT(*) FROM api_keys')
            original_count = cursor.fetchone()[0]
            
            # 使用参数化查询构建占位符
            placeholders = ','.join(['?'] * len(key_ids))
            cursor.execute(f'DELETE FROM api_keys WHERE id IN ({placeholders})', key_ids)
            
            # 获取删除后的密钥数量
            cursor.execute('SELECT COUNT(*) FROM api_keys')
            new_count = cursor.fetchone()[0]
            
            conn.commit()
            return original_count - new_count
        except sqlite3.Error as e:
            print(f"批量删除密钥时出错: {str(e)}")
            conn.rollback()
            # 如果数据库出错,尝试从JSON文件删除
            api_keys_data = ApiKeyManager.load_keys()
            original_count = len(api_keys_data["api_keys"])
            api_keys_data["api_keys"] = [k for k in api_keys_data["api_keys"] if k.get("id") not in key_ids]
            
            deleted_count = original_count - len(api_keys_data["api_keys"])
            if deleted_count > 0:
                ApiKeyManager.save_keys(api_keys_data)
            
            return deleted_count
        finally:
            conn.close()
    
    @staticmethod
    def bulk_add_keys(keys_data):
        """批量添加多个API密钥
        
        Args:
            keys_data: 包含多个密钥信息的列表,每个元素包含platform、name、key
            
        Returns:
            添加的密钥列表
        """
        if not keys_data:
            return []
        
        added_keys = []
        now = datetime.now(pytz.timezone('Asia/Shanghai')).isoformat()
        
        conn = get_db_connection()
        try:
            cursor = conn.cursor()
            
            for key_info in keys_data:
                platform = key_info.get("platform")
                name = key_info.get("name")
                key = key_info.get("key")
                
                # 过滤掉key中的单引号、双引号、小括号、方括号和空格,防止存储时出错
                if key:
                    key = key.replace("'", "").replace('"', "").replace('(', "").replace(')', "").replace('[', "").replace(']', "").replace(' ', "")
                
                new_key_id = str(uuid.uuid4())
                
                new_key = {
                    "id": new_key_id,
                    "platform": platform,
                    "name": name,
                    "key": key,
                    "created_at": now,
                    "updated_at": now,
                    "success": False,
                    "return_message": "等待测试",
                    "states": "",
                    "balance": 0
                }
                
                cursor.execute('''
                INSERT INTO api_keys 
                (id, platform, name, key, created_at, updated_at, success, return_message, states, balance)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    new_key_id,
                    platform,
                    name,
                    key,
                    now,
                    now,
                    0,  # success是整数: 0表示False
                    "等待测试",
                    "",
                    0
                ))
                
                added_keys.append(new_key)
            
            conn.commit()
            
            return added_keys
        except sqlite3.Error as e:
            print(f"批量添加密钥时出错: {str(e)}")
            conn.rollback()
            # 如果数据库出错,尝试使用JSON方式保存
            api_keys_data = ApiKeyManager.load_keys()
            for key in added_keys:
                api_keys_data["api_keys"].append(key)
            ApiKeyManager.save_keys(api_keys_data)
            return added_keys
        finally:
            conn.close()
    
    @staticmethod
    def update_key(key_id, name, key):
        """更新API密钥信息"""
        # 过滤掉key中的单引号、双引号、小括号、方括号和空格,防止存储时出错
        if key:
            key = key.replace("'", "").replace('"', "").replace('(', "").replace(')', "").replace('[', "").replace(']', "").replace(' ', "")
        
        updated_at = datetime.now(pytz.timezone('Asia/Shanghai')).isoformat()
        
        conn = get_db_connection()
        try:
            cursor = conn.cursor()
            
            # 更新密钥
            cursor.execute('''
            UPDATE api_keys 
            SET name = ?, key = ?, updated_at = ?, success = ?, return_message = ?
            WHERE id = ?
            ''', (name, key, updated_at, 0, "等待测试", key_id))
            
            if cursor.rowcount > 0:
                conn.commit()
                
                # 获取更新后的密钥信息
                cursor.execute('SELECT * FROM api_keys WHERE id = ?', (key_id,))
                row = cursor.fetchone()
                
                if row:
                    updated_key = dict(row)
                    # 转换success字段从整数为布尔值
                    updated_key['success'] = bool(updated_key['success'])
                    return updated_key
            
            return None
        except sqlite3.Error as e:
            print(f"更新密钥时出错: {str(e)}")
            conn.rollback()
            
            # 如果数据库出错,尝试使用JSON方式更新
            api_keys_data = ApiKeyManager.load_keys()
            updated_key = None
            for k in api_keys_data["api_keys"]:
                if k.get("id") == key_id:
                    k["name"] = name
                    k["key"] = key
                    k["updated_at"] = updated_at
                    # 重置验证状态
                    k["success"] = False
                    k["return_message"] = "等待测试"
                    updated_key = k
                    break
            
            if updated_key:
                ApiKeyManager.save_keys(api_keys_data)
                
            return updated_key
        finally:
            conn.close()