soraapi / src /sora_generator.py
anycallzhf's picture
Initial commit for Hugging Face Space deployment
b064311
import cloudscraper
import time
import json
import random
import string
import os
import mimetypes # To guess file mime type
import argparse # For better command-line arguments
import asyncio
from .utils import localize_image_urls
from .config import Config
class SoraImageGenerator:
def __init__(self, proxy_host=None, proxy_port=None, proxy_user=None, proxy_pass=None, auth_token=None):
# 使用Config.VERBOSE_LOGGING替代直接从环境变量读取SORA_DEBUG
self.DEBUG = Config.VERBOSE_LOGGING
# 设置代理
if proxy_host and proxy_port:
# 如果有认证信息,添加到代理URL中
if proxy_user and proxy_pass:
proxy_auth = f"{proxy_user}:{proxy_pass}@"
self.proxies = {
"http": f"http://{proxy_auth}{proxy_host}:{proxy_port}",
"https": f"http://{proxy_auth}{proxy_host}:{proxy_port}"
}
if self.DEBUG:
print(f"已配置带认证的代理: {proxy_user}:****@{proxy_host}:{proxy_port}")
else:
self.proxies = {
"http": f"http://{proxy_host}:{proxy_port}",
"https": f"http://{proxy_host}:{proxy_port}"
}
if self.DEBUG:
print(f"已配置代理: {proxy_host}:{proxy_port}")
else:
self.proxies = None
if self.DEBUG:
print("代理未配置。请求将直接发送。")
# 创建 cloudscraper 实例
self.scraper = cloudscraper.create_scraper(
browser={
'browser': 'chrome',
'platform': 'windows',
'mobile': False
}
)
# 设置通用请求头 - 移除 Content-Type 和 openai-sentinel-token (会动态添加)
self.base_headers = {
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"cache-control": "no-cache",
"pragma": "no-cache",
"priority": "u=1, i",
"sec-ch-ua": "\"Chromium\";v=\"136\", \"Google Chrome\";v=\"136\", \"Not.A/Brand\";v=\"99\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
# Referer 会根据操作不同设置
}
# 设置认证Token (从外部传入或硬编码)
self.auth_token = auth_token or "Bearer eyJhbGciOiJSUzI1NiIsImtpZCI6IjE5MzQ0ZTY1LWJiYzktNDRkMS1hOWQwLWY5NTdiMDc5YmQwZSIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsiaHR0cHM6Ly9hcGkub3BlbmFpLmNvbS92MSJdLCJjbGllbnRfaWQiOiJhcHBfWDh6WTZ2VzJwUTl0UjNkRTduSzFqTDVnSCIsImV4cCI6MTc0Nzk3MDExMSwiaHR0cHM6Ly9hcGkub3BlbmFpLmNvbS9hdXRoIjp7InVzZXJfaWQiOiJ1c2VyLWdNeGM0QmVoVXhmTW1iTDdpeUtqengxYiJ9LCJodHRwczovL2FwaS5vcGVuYWkuY29tL3Byb2ZpbGUiOnsiZW1haWwiOiIzajVtOTFud3VtckBmcmVlLnViby5lZHUuZ24iLCJlbWFpbF92ZXJpZmllZCI6dHJ1ZX0sImlhdCI6MTc0NzEwNjExMSwiaXNzIjoiaHR0cHM6Ly9hdXRoLm9wZW5haS5jb20iLCJqdGkiOiIzMGM4ZDJhOS0yNzkxLTRhNjQtODI2OS0yMzU3OGFhMmI0MTEiLCJuYmYiOjE3NDcxMDYxMTEsInB3ZF9hdXRoX3RpbWUiOjE3NDcxMDYxMDkxMDksInNjcCI6WyJvcGVuaWQiLCJlbWFpbCIsInByb2ZpbGUiLCJvZmZsaW5lX2FjY2VzcyIsIm1vZGVsLnJlcXVlc3QiLCJtb2RlbC5yZWFkIiwib3JnYW5pemF0aW9uLnJlYWQiLCJvcmdhbml6YXRpb24ud3JpdGUiXSwic2Vzc2lvbl9pZCI6ImF1dGhzZXNzX21yWFRwZlVENU51TDFsV05xNUhSOW9lYiIsInN1YiI6ImF1dGgwfDY4MjFkYWYyNjhiYjgxMzFkMDRkYTAwNCJ9.V4ZqYJuf_f7F_DrMMRrt-ymul5HUrqENVkiFyEwfYmzMFWthEGS6Ryia100QRlprw8jjGscHZXlUFaOcRNIarcBig8fBY6n_AB3J34MlcBv6peS-3_EJlIiH_N7j_mu-8lNpJbxk9lSlFaGpKU1IOO7kBuaAmLH-iErM-wqBfSlnnAq8h4iqBDxi4CMTcAhVm2-qG7u7f0Ho1TCGa7wrdchWtZxyfHIqNWkC88qBlUwTH5g2vRL419_zIKEWKyAtV2WNI68vpyBLrRVhtnpDh0jcrm2WqCj2X2LQqNFkFKoui3wCdG9Vskpl39l9sV54HuV7w6stQIausR1F4Y9NbjsBAyLIimZOllCwYAefTC2BOpIHfOA3_D58G3SEiRADVK7pK7ip6QsEI__GteoeCuRvZA9b5jLmhVS0SUlDYSOoNwlJ_ejfEpPJcmHUchFa7bUkS-XVrEUgr1yP5FxPwWUyn7UWrW_dZ3lVW1EU4Bp6Kp6JuwyOFf2Mj-V3_9tc8qJRClI8WHUf6In0hiO_pGbFCI2opkF3XusAQKmTB12nPBsmSlwewigTPhAj3nf-8Ze3O-etnBrV5pz_woIwQsQ54T-wgEdrLWDE6dSqNDulfpldF6Cok62212kW8w3SY3V7VSq5Tr1KRyWXJEH-haVb6qmAE2ldDjeHvJossWg" # 替换成你的有效Token或从环境变量读取
if not self.auth_token or not self.auth_token.startswith("Bearer "):
raise ValueError("无效或缺失的认证Token (应以 'Bearer ' 开头)")
self.gen_url = "https://sora.chatgpt.com/backend/video_gen"
self.check_url = "https://sora.chatgpt.com/backend/video_gen"
self.upload_url = "https://sora.chatgpt.com/backend/uploads"
# 显示是否启用了图片本地化功能
if Config.IMAGE_LOCALIZATION:
if self.DEBUG:
print(f"图片本地化功能已启用,图片将保存到:{Config.IMAGE_SAVE_DIR}")
os.makedirs(Config.IMAGE_SAVE_DIR, exist_ok=True)
def _get_dynamic_headers(self, content_type="application/json", referer="https://sora.chatgpt.com/explore"):
"""为每个请求生成包含动态sentinel token的headers"""
headers = self.base_headers.copy()
headers["authorization"] = self.auth_token
headers["openai-sentinel-token"] = self._generate_sentinel_token()
headers["referer"] = referer
if content_type: # multipart/form-data 时 Content-Type 由 requests 自动设置
headers["content-type"] = content_type
return headers
def _generate_sentinel_token(self):
"""生成一个随机修改的sentinel-token"""
base_token = {
"p": "gAAAAABWzIxMjksIk1vbiBNYXkgMTIgMjAyNSAxODo1ODowOSBHTVQrMDgwMCAo5Lit5Zu95qCH5YeG5pe26Ze0KSIsMjI0ODE0Njk0NCw5LCJNb3ppbGxhLzUuMCAoV2luZG93cyBOVCAxMC4wOyBXaW42NDsgeDY0KSBBcHBsZVdlYktpdC81MzcuMzYgKEtIVE1MLCBsaWtlIEdlY2tvKSBDaHJvbWUvMTM2LjAuMC4wIFNhZmFyaS81MzcuMzYiLCJodHRwczovL3NvcmEtY2RuLm9haXN0YXRpYy5jb20vX25leHQvc3RhdGljL2NodW5rcy93ZWJwYWNrLWU1MDllMDk0N2RlZWM0Y2YuanMiLG51bGwsInpoLUNOIiwiemgtQ04semgiLDEwLCJwcm9kdWN0U3Vi4oiSMjAwMzAxMDciLCJfcmVhY3RMaXN0ZW5pbmd2cXAzdWtpNmh6IiwiX19zZW50aW5lbF9pbml0X3BlbmRpbmciLDEyMDczMzEuNSwiN2M4OTMxM2EtZTY0Mi00MjI4LTk5ZDQtNTRlZDRjYzQ3MGZiIiwiIiwxNl0=",
"t": "SBYdBBoGFhQRdVtbdxEYEQEaHQAAFhQRYwN2T2IEUH5pRHJyYx51YnJybFBhWHZkYgRyUHgDclZzH2l5a1gXUWVlcRMRGBEBHR0HBBYUEXpnfVt9YFAJDB8WAAIAAgYRDgxpYltjSl55SWJEZV9mXx4KFh8WGAAaBxYUEXVydW9ydXJ1b3J1cnVvcnVydW9ydXJ1b3J1b3J1b3J1b3J1DgkMHxYBAgALAxEOHh0BBwIXCwwEBx8FAwADGwAAHxYcBxoFDQwJFnFDEw4WHxYXBxoCBwwJFmB2dGRxYgBqdHVLf2hUX3VySWpVcVNNa3ZiYW13dgtyb3J5cHxvam12YWB7YgN2THVcYnxsSwR7fH9mfHJlZ1F3VARtcVwLcGpiV2pwaEdmZFhgdGZLbWRxXAdQbHJ5dHNvanlwQ29Xd1QEZnFmXFJoWFBRbEZ2e3JTWVF8YnFvd2ZUd2xiR3Vzb1BxcWV3UXxLbmxrYV9Wf3FxfHNJclVydU1qdXJtaHJmUH9sYnF1fElcdXtMdH5sdnZmZAR+ZmpUcXZzVgN2cUN3UHZyZWhyXAd8bHJ5cnVGdW1lWGRScWIAamBlDgkMHxYKBgADDREODHZjcWx/AnIBZkJLV2FTRWZWZX5Pa2JHVWsDbmRie1xgYXFoUWxfbmlhBQJ4f3FmUGFJBnNkWEphZ3VET2VYcntpA25kYntDVWRYf2Z3ZXZ9Vl9hXXZ1RGZseENzZnJ3Z3YCdn1WX2FddnVEZmx4Q3NmcndkFh8WFwAaBAUMCRZrcnh7fF92T2Z1eXx2dW5pZ1YKXGIFRnFhX3ZAZmJLUXwAfmtgeGlxYQVoeFdYdU1iYgJjfFsBAVJ7YUxrB1ZDVUR+fGthcXN4ZkRrbH9lWmtsG0FiAkdIZHFXZnxffnRiVgZsa3JeUmVlQEhhQ31ybFsNZ2VCS2NmWEZFZlhMWWQEcWJ2ZWZdZx5XdWRYXnllcVxJanMKWnQARElXdgoDUHx4WlZ0UEBWB0tNTQBuQW4dAlNXWXRUV11cXWkHQwx0XW5BUW9QbWVyRnFhZWJIVwR1UHtfRGdSeGl1YQUfcmJ2V0h3Zgd4bFR9cWdCUHFmZU1qd199amVDeQx5X3JRYB51ZWRYXnlSZHJcdAVHdXtmcWlWaXFwV1hGXlB0Q2hlX1dQf2V+fWB7cndqcl5YUF92TXRxV3V3AkBXY0JpbWRfSlJ8AgVLZGFHdnwCRHBnHFdzZlheV2wCYmZmbAZjfF9iUWxoBmJrYkp/UAJiZmVleXB4ZWZhdR9HYmJ2Qn5iX3JKdGFbV3xfdmF1HnVmZWF0f2V1THlmdXlTd3t+VFJ2Q05RfEZbV3tiAVIHAlpLAXICVXZpRmpzbEVXa1wcUgZXBwxO",
"c": "gAAAAABoIdRAZEk6qAnDRqdimKxPhXtA_xBkiDXhKF4LUmlNY6CZmfNxZiabjPHk_DCEUnnyq-y6JPj-D46YPk-6r7zR6qS64hEwGYt9Hh_8vUIod-7PLh9qPKqdYl4TBCVUgtrbhTWfse7s6NHCSy1T0Nzj2C6vAUPhzAx4LAMIrl2YbElkUVPgwELyYF_inh3zliwZL-zp4zR3LOABcrGqlrLoP7_kNrwcIZwlVD1RNlnHy9TEFsRzYOMQo_DbagZAK1h87arrMonZHBi9ukfiGuvCQP-y76j61b4qaQPMA19EoURLwnotVBWUIBpHEEoH9vmPb817sGwQ2R8XHoAVR4dwYs_7EoS8H8kAlUVZDjAKGq5x48nvZrarLBjYJXXsfJLuxhibNYXG1hKNbOdi1w-Xl1NgqSPAb-MuwnyDLPGE5MeLkwM2Dl3jD7G6B2Z2F993cvW7mOOs0OebZ6NMgIrZnTG4mMI7PirPY95JWmztDfeuFLJ_V_kyaSP--BZCIIAB4074RBVitIrJEwceWVW3zXOOHWJoDax7E0nfa5abvLGCjdEeJfNx4Fcp7iYFN_E8iR0f797DLlFh4uFLv1DPhipYtQFpPPUlbxKu9H9W4IDr7Hv_LgfvFo0VwLzV6ANZPGmdza67dAsKXrWtXlCrVNfqFoVO4wI4n-zrE3lcUPzI_ZJebF2HGlzerTvqgU5R0i4fzUGY4-UqpWlVurP-rCJY2ARcSMSyPXnPGetl5Z2m-f9k7K3n7txrfv2293jyyRTVAZLC9aLrBOFWHD4cf0aiwisgJ9IKnhhnoJ-WF8NuvFS7l1Z0d2zTnndjuryb0M36Og4b_Ku3aJKS0_Eqbns8_bUXdEPh5_15T_92_1yf5jy-amrgolgcO_7yJqX5aU9-PUUiaP3WzeyidMSH4Vtls63tQ5evUlDkEHfNKoyCYaSxpzA_FsNmPbMmcv3g1wNKg_W8V0Yh7ZrtW1L8229SpZFWc96Sg3CRPplk-dnVzV93lP7cN5o5ubGWZMkkz5UASjE5XLn8h5dx6neuOemKVHAj29QxOWmdGEehNvmeMec0k8uL9X-7yYBJSTnI066OR38JUUTFqTH3RSXI9M4ggals32P56bwUmawvZ-bu02qc3kCVI3oK9bnP8oTk0xTK5_bMrlevGYKG0qdPXamgdDfVg7hlNA2OTCnw6iRP6DJiPm_zKVdQa4z6SPJsdt_Q",
"id": self._generate_random_id(),
"flow": "sora_create_task" # This might need changing for uploads? Let's try keeping it.
}
# Simple randomization (can be improved if needed)
p_chars = list(base_token["p"])
for _ in range(random.randint(3, 7)):
pos = random.randint(10, len(p_chars) - 10)
p_chars[pos] = random.choice(string.ascii_letters + string.digits + "+/=")
base_token["p"] = "".join(p_chars)
c_chars = list(base_token["c"])
for _ in range(random.randint(3, 7)):
pos = random.randint(10, len(c_chars) - 10)
c_chars[pos] = random.choice(string.ascii_letters + string.digits + "+/=_-")
base_token["c"] = "".join(c_chars)
return json.dumps(base_token)
def _generate_random_id(self):
"""生成一个随机ID,格式类似于UUID"""
return f"{self._random_hex(8)}-{self._random_hex(4)}-{self._random_hex(4)}-{self._random_hex(4)}-{self._random_hex(12)}"
def _random_hex(self, length):
"""生成指定长度的随机十六进制字符串"""
return ''.join(random.choice(string.hexdigits.lower()) for _ in range(length))
def generate_image(self, prompt, num_images=1, width=720, height=480):
"""
生成一张或多张图片并返回图片URL列表
参数:
prompt (str): 图片生成提示词
num_images (int): 要生成的图片数量 (对应 n_variants)
width (int): 图片宽度
height (int): 图片高度
返回:
list[str] or str: 成功时返回包含图片URL的列表,失败时返回错误信息字符串
"""
# 确保提示词是正确的UTF-8格式
if isinstance(prompt, bytes):
prompt = prompt.decode('utf-8')
# 打印提示词,并处理可能的编码问题
try:
if self.DEBUG:
print(f"开始生成 {num_images} 张图片,提示词: '{prompt}'")
except UnicodeEncodeError:
if self.DEBUG:
print(f"开始生成 {num_images} 张图片,提示词: [编码显示问题,但数据正确]")
payload = {
"type": "image_gen",
"operation": "simple_compose",
"prompt": prompt,
"n_variants": num_images, # 使用传入的数量
"width": width,
"height": height,
"n_frames": 1,
"inpaint_items": []
}
try:
task_id = self._submit_task(payload, referer="https://sora.chatgpt.com/explore")
if not task_id:
# 任务提交失败,尝试切换密钥后重试
if self.DEBUG:
print(f"任务提交失败,尝试切换API密钥后重试")
try:
# 导入在这里进行以避免循环导入
from .key_manager import key_manager
# 标记当前密钥为无效并获取新密钥
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试任务")
# 使用新密钥重试提交任务
task_id = self._submit_task(payload, referer="https://sora.chatgpt.com/explore")
if not task_id:
return "任务提交失败(已尝试切换密钥)"
else:
return "任务提交失败(无可用的备用密钥)"
except ImportError:
if self.DEBUG:
print(f"无法导入key_manager,无法自动切换密钥")
return "任务提交失败"
except Exception as e:
if self.DEBUG:
print(f"切换API密钥时发生错误: {str(e)}")
return f"任务提交失败: {str(e)}"
if self.DEBUG:
print(f"任务已提交,ID: {task_id}")
# 轮询检查任务状态,直到完成
image_urls = self._poll_task_status(task_id)
# 如果轮询返回错误信息,也尝试切换密钥重试
if isinstance(image_urls, str) and ("失败" in image_urls or "错误" in image_urls or "error" in image_urls.lower()):
if self.DEBUG:
print(f"任务执行失败,尝试切换API密钥后重试")
try:
from .key_manager import key_manager
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试整个生成过程")
# 使用新密钥重试整个生成过程
return self.generate_image(prompt, num_images, width, height)
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"切换API密钥失败或重试失败: {str(e)}")
# 图片本地化处理
if Config.IMAGE_LOCALIZATION and isinstance(image_urls, list) and image_urls:
if self.DEBUG:
print(f"\n================================")
print(f"开始图片本地化处理")
print(f"图片本地化配置: 启用={Config.IMAGE_LOCALIZATION}, 保存目录={Config.IMAGE_SAVE_DIR}")
print(f"需要本地化的图片数量: {len(image_urls)}")
print(f"原始图片URLs: {image_urls}")
print(f"================================\n")
try:
# 创建事件循环并运行图片本地化
if self.DEBUG:
print(f"创建异步事件循环处理图片下载...")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if self.DEBUG:
print(f"调用localize_image_urls函数...")
localized_urls = loop.run_until_complete(localize_image_urls(image_urls))
loop.close()
if self.DEBUG:
print(f"异步事件循环已关闭")
if self.DEBUG:
print(f"\n================================")
print(f"图片本地化完成")
print(f"原始URLs: {image_urls}")
print(f"本地化后的URLs: {localized_urls}")
print(f"================================\n")
# 检查结果是否有效
if not localized_urls:
if self.DEBUG:
print(f"❌ 警告:本地化后的URL列表为空,将使用原始URL")
return image_urls
# 检查是否所有URL都被正确本地化
local_count = sum(1 for url in localized_urls if url.startswith("/static/") or "/static/" in url)
if local_count == 0:
if self.DEBUG:
print(f"❌ 警告:没有一个URL被成功本地化,将使用原始URL")
return image_urls
elif local_count < len(localized_urls):
if self.DEBUG:
print(f"⚠️ 注意:部分URL未成功本地化 ({local_count}/{len(localized_urls)})")
return localized_urls
except Exception as e:
if self.DEBUG:
print(f"❌ 图片本地化过程中发生错误: {str(e)}")
import traceback
traceback.print_exc()
if self.DEBUG:
print(f"由于错误,将返回原始URL")
return image_urls
elif not Config.IMAGE_LOCALIZATION:
if self.DEBUG:
print(f"图片本地化功能未启用,返回原始URLs")
elif not isinstance(image_urls, list):
if self.DEBUG:
print(f"图片生成返回了非列表结果: {image_urls},无法进行本地化")
elif not image_urls:
if self.DEBUG:
print(f"图片生成返回了空列表,没有可本地化的内容")
return image_urls
except Exception as e:
if self.DEBUG:
print(f"❌ 生成图片时出错: {str(e)}")
import traceback
traceback.print_exc()
return f"生成图片时出错: {str(e)}"
def upload_image(self, file_path):
"""
上传本地图片文件到Sora后端
参数:
file_path (str): 本地图片文件的路径
返回:
dict or str: 成功时返回包含上传信息的字典,失败时返回错误信息字符串
"""
if not os.path.exists(file_path):
return f"错误:文件未找到 '{file_path}'"
file_name = os.path.basename(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type or not mime_type.startswith('image/'):
return f"错误:无法确定文件类型或文件不是图片 '{file_path}' (Mime: {mime_type})"
if self.DEBUG:
print(f"开始上传图片: {file_name} (Type: {mime_type})")
# multipart/form-data 请求不需要手动设置 Content-Type header
# requests 会根据 files 参数自动处理 boundary 和 Content-Type
headers = self._get_dynamic_headers(content_type=None, referer="https://sora.chatgpt.com/library") # Referer from example
# 尝试上传
return self._try_upload_with_retry(file_path, file_name, mime_type, headers)
def _try_upload_with_retry(self, file_path, file_name, mime_type, headers, is_retry=False):
"""尝试上传图片,失败时可能尝试切换密钥重试"""
# 保存当前的API密钥,确保整个上传过程使用相同的密钥
current_auth_token = self.auth_token
files = {
'file': (file_name, open(file_path, 'rb'), mime_type),
'file_name': (None, file_name) # 第二个字段是文件名
}
try:
response = self.scraper.post(
self.upload_url,
headers=headers,
files=files, # 使用 files 参数上传文件
proxies=self.proxies,
timeout=60 # 上传可能需要更长时间
)
if response.status_code == 200:
result = response.json()
if self.DEBUG:
print(f"图片上传成功! Media ID: {result.get('id')}")
# 确保返回的结果中包含上传时使用的API密钥信息
result['used_auth_token'] = current_auth_token
# print(f"上传响应: {json.dumps(result, indent=2)}") # 可选:打印完整响应
return result # 返回包含id, url等信息的字典
else:
error_msg = f"上传图片失败,状态码: {response.status_code}, 响应: {response.text}"
if self.DEBUG:
print(error_msg)
# 如果不是已经在重试,且响应表明可能是API密钥问题,尝试切换密钥
if not is_retry and (response.status_code in [401, 403] or "auth" in response.text.lower() or "token" in response.text.lower()):
if self.DEBUG:
print(f"上传失败可能与API密钥有关,尝试切换密钥后重试")
try:
from .key_manager import key_manager
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试上传")
# 更新头部信息并重试
new_headers = self._get_dynamic_headers(content_type=None, referer="https://sora.chatgpt.com/library")
return self._try_upload_with_retry(file_path, file_name, mime_type, new_headers, is_retry=True)
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"切换API密钥失败: {str(e)}")
return error_msg
except Exception as e:
error_msg = f"上传图片时出错: {str(e)}"
if self.DEBUG:
print(error_msg)
# 如果不是已经在重试,尝试切换密钥后重试
if not is_retry:
if self.DEBUG:
print(f"上传过程中发生异常,尝试切换API密钥后重试")
try:
from .key_manager import key_manager
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试上传")
# 更新头部信息并重试
new_headers = self._get_dynamic_headers(content_type=None, referer="https://sora.chatgpt.com/library")
return self._try_upload_with_retry(file_path, file_name, mime_type, new_headers, is_retry=True)
except (ImportError, Exception) as err:
if self.DEBUG:
print(f"切换API密钥失败: {str(err)}")
return error_msg
finally:
# 确保文件句柄被关闭
if 'file' in files and files['file'][1]:
files['file'][1].close()
def generate_image_remix(self, prompt, uploaded_media_id, num_images=1, width=None, height=None):
"""
基于已上传的图片进行重混(Remix)生成新图片
参数:
prompt (str): 图片生成提示词
uploaded_media_id (str): 通过 upload_image 获取的媒体ID (例如 "media_...")
num_images (int): 要生成的图片数量
width (int, optional): 输出图片宽度。如果为None,可能由API决定。
height (int, optional): 输出图片高度。如果为None,可能由API决定。
返回:
list[str] or str: 成功时返回包含图片URL的列表,失败时返回错误信息字符串
"""
if self.DEBUG:
print(f"开始 Remix 图片 (ID: {uploaded_media_id}),提示词: '{prompt}'")
# 如果上传图片时使用了特定API密钥,则确保使用同一个密钥进行remix
# 在upload_image的返回结果中可能包含used_auth_token
if isinstance(uploaded_media_id, dict) and 'id' in uploaded_media_id:
if 'used_auth_token' in uploaded_media_id and uploaded_media_id['used_auth_token'] != self.auth_token:
if self.DEBUG:
print(f"检测到上传图片时使用了不同的API密钥,切换到匹配的密钥进行Remix操作")
self.auth_token = uploaded_media_id['used_auth_token']
uploaded_media_id = uploaded_media_id['id']
# 获取上传图片的信息,特别是原始尺寸,如果未指定输出尺寸则可能需要
# (这里简化,假设API能处理尺寸或我们强制指定)
# 实际应用中可能需要先查询 media_id 的详情
payload = {
"prompt": prompt,
"n_variants": num_images,
"inpaint_items": [
{
"type": "image",
"frame_index": 0,
"preset_id": None,
"generation_id": None,
"upload_media_id": uploaded_media_id, # 关键:引用上传的图片
"source_start_frame": 0,
"source_end_frame": 0,
"crop_bounds": None
}
],
"operation": "remix", # 关键:操作类型
"type": "image_gen",
"n_frames": 1,
"width": 720, # 可选,如果为None,API可能会基于输入调整
"height": 480 # 可选
}
# 只有当width和height都提供了值时才添加到payload中
if width is not None:
payload["width"] = width
if height is not None:
payload["height"] = height
try:
# 提交任务时使用 library 作为 referer,与示例一致
task_id = self._submit_task(payload, referer="https://sora.chatgpt.com/library")
if not task_id:
# 任务提交失败,尝试切换密钥后重试
if self.DEBUG:
print(f"Remix任务提交失败,尝试切换API密钥后重试")
try:
# 导入在这里进行以避免循环导入
from .key_manager import key_manager
# 标记当前密钥为无效并获取新密钥
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试Remix任务")
# 使用新密钥重试提交任务
task_id = self._submit_task(payload, referer="https://sora.chatgpt.com/library")
if not task_id:
return "Remix任务提交失败(已尝试切换密钥)"
else:
return "Remix任务提交失败(无可用的备用密钥)"
except ImportError:
if self.DEBUG:
print(f"无法导入key_manager,无法自动切换密钥")
return "Remix任务提交失败"
except Exception as e:
if self.DEBUG:
print(f"切换API密钥时发生错误: {str(e)}")
return f"Remix任务提交失败: {str(e)}"
if self.DEBUG:
print(f"Remix 任务已提交,ID: {task_id}")
# 轮询检查任务状态
image_urls = self._poll_task_status(task_id)
# 如果轮询返回错误信息,也尝试切换密钥重试
if isinstance(image_urls, str) and ("失败" in image_urls or "错误" in image_urls or "error" in image_urls.lower()):
if self.DEBUG:
print(f"Remix任务执行失败,尝试切换API密钥后重试")
try:
from .key_manager import key_manager
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试整个Remix过程")
# 使用新密钥重试整个生成过程
return self.generate_image_remix(prompt, uploaded_media_id, num_images, width, height)
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"切换API密钥失败或重试失败: {str(e)}")
# 增加图片本地化支持
if image_urls and isinstance(image_urls, list):
if Config.IMAGE_LOCALIZATION:
if self.DEBUG:
print(f"正在本地化 Remix 生成的 {len(image_urls)} 张图片...")
# 创建事件循环并运行图片本地化
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
localized_urls = loop.run_until_complete(localize_image_urls(image_urls))
loop.close()
if self.DEBUG:
print(f"Remix 图片本地化完成")
return localized_urls
return image_urls
except Exception as e:
# 如果发生异常,也尝试切换密钥重试
if self.DEBUG:
print(f"Remix生成过程中发生异常: {str(e)},尝试切换API密钥")
try:
from .key_manager import key_manager
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试整个Remix过程")
# 使用新密钥重试整个生成过程
return self.generate_image_remix(prompt, uploaded_media_id, num_images, width, height)
except (ImportError, Exception) as err:
if self.DEBUG:
print(f"切换API密钥失败或重试失败: {str(err)}")
return f"Remix 生成图片时出错: {str(e)}"
def _submit_task(self, payload, referer="https://sora.chatgpt.com/explore"):
"""提交生成任务 (通用,接受payload字典)"""
headers = self._get_dynamic_headers(content_type="application/json", referer=referer)
# 获取当前的auth_token用于最后可能的释放
current_auth_token = self.auth_token
try:
# 检查是否可以导入key_manager并标记密钥为工作中
try:
from .key_manager import key_manager
# 生成一个临时任务ID,用于标记密钥工作状态
temp_task_id = f"pending_task_{self._generate_random_id()}"
key_manager.mark_key_as_working(self.auth_token, temp_task_id)
except ImportError:
if self.DEBUG:
print(f"无法导入key_manager,无法标记密钥工作状态")
except Exception as e:
if self.DEBUG:
print(f"标记密钥工作状态时发生错误: {str(e)}")
response = self.scraper.post(
self.gen_url,
headers=headers,
json=payload,
proxies=self.proxies,
timeout=20 # 增加超时时间
)
if response.status_code == 200:
try:
result = response.json()
task_id = result.get("id")
if task_id:
# 更新任务ID为实际分配的ID
try:
from .key_manager import key_manager
# 更新工作中状态的任务ID
key_manager.release_key(self.auth_token) # 先释放临时ID
key_manager.mark_key_as_working(self.auth_token, task_id) # 用真实ID重新标记
if self.DEBUG:
print(f"已将密钥标记为工作中,任务ID: {task_id}")
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"更新密钥工作状态时发生错误: {str(e)}")
return task_id
else:
# 任务提交成功但未返回ID,释放密钥
try:
from .key_manager import key_manager
key_manager.release_key(self.auth_token)
if self.DEBUG:
print(f"任务提交未返回ID,已释放密钥")
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"释放密钥时发生错误: {str(e)}")
# 检查响应中是否有可能表明API密钥问题的信息
response_text = response.text.lower()
is_auth_issue = False
auth_keywords = ["authorization", "auth", "token", "permission", "unauthorized", "credentials", "login"]
for keyword in auth_keywords:
if keyword in response_text:
is_auth_issue = True
break
if is_auth_issue:
if self.DEBUG:
print(f"API响应内容表明可能存在认证问题,尝试切换密钥")
try:
from .key_manager import key_manager
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试请求")
# 使用新密钥重试请求
return self._submit_task(payload, referer)
except ImportError:
if self.DEBUG:
print(f"无法导入key_manager,无法自动切换密钥")
except Exception as e:
if self.DEBUG:
print(f"切换API密钥时发生错误: {str(e)}")
if self.DEBUG:
print(f"提交任务成功,但响应中未找到任务ID。响应: {response.text}")
return None
except json.JSONDecodeError:
# 释放密钥
try:
from .key_manager import key_manager
key_manager.release_key(self.auth_token)
if self.DEBUG:
print(f"JSON解析失败,已释放密钥")
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"释放密钥时发生错误: {str(e)}")
if self.DEBUG:
print(f"提交任务成功,但无法解析响应JSON。状态码: {response.status_code}, 响应: {response.text}")
return None
else:
# 释放密钥
try:
from .key_manager import key_manager
key_manager.release_key(self.auth_token)
if self.DEBUG:
print(f"请求失败,已释放密钥")
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"释放密钥时发生错误: {str(e)}")
if self.DEBUG:
print(f"提交任务失败,状态码: {response.status_code}")
if self.DEBUG:
print(f"请求Payload: {json.dumps(payload)}")
if self.DEBUG:
print(f"响应内容: {response.text}")
# 特殊处理429错误(太多并发任务)
if response.status_code == 429:
response_text = response.text.lower()
is_concurrent_issue = (
"concurrent" in response_text or
"too many" in response_text or
"wait" in response_text or
"progress" in response_text
)
if is_concurrent_issue:
if self.DEBUG:
print(f"检测到并发限制错误,当前密钥正在处理其他任务,获取新密钥但不禁用当前密钥")
try:
from .key_manager import key_manager
# 不标记为无效,但获取一个新的可用密钥
new_key = key_manager.get_key()
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已获取新的API密钥,重试请求")
# 使用新密钥重试请求
return self._submit_task(payload, referer)
else:
if self.DEBUG:
print(f"没有可用的备用密钥")
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"获取新密钥时发生错误: {str(e)}")
return None
# 检查是否是认证失败(401/403)或其他可能表明API密钥失效的情况
response_text = response.text.lower()
is_auth_issue = (
response.status_code in [401, 403] or
"authorization" in response_text or
"auth" in response_text or
"token" in response_text or
"permission" in response_text or
"unauthorized" in response_text or
"credentials" in response_text or
"login" in response_text or
"invalid" in response_text
) and not (
# 排除并发限制导致的错误
"concurrent" in response_text or
"too many" in response_text or
"wait" in response_text or
"progress" in response_text
)
if is_auth_issue:
if self.DEBUG:
print(f"API密钥可能已失效,尝试切换密钥")
try:
# 导入在这里进行以避免循环导入
from .key_manager import key_manager
# 标记当前密钥为无效并获取新密钥
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试请求")
# 使用新密钥重试请求
return self._submit_task(payload, referer)
except ImportError:
if self.DEBUG:
print(f"无法导入key_manager,无法自动切换密钥")
except Exception as e:
if self.DEBUG:
print(f"切换API密钥时发生错误: {str(e)}")
return None
except Exception as e:
# 确保释放密钥
try:
from .key_manager import key_manager
key_manager.release_key(current_auth_token)
if self.DEBUG:
print(f"发生异常,已释放密钥")
except (ImportError, Exception) as release_err:
if self.DEBUG:
print(f"释放密钥时发生错误: {str(release_err)}")
if self.DEBUG:
print(f"提交任务时出错: {str(e)}")
# 检查异常信息中是否包含可能的认证问题
error_str = str(e).lower()
is_auth_issue = (
"authorization" in error_str or
"auth" in error_str or
"token" in error_str or
"permission" in error_str or
"unauthorized" in error_str or
"credentials" in error_str or
"login" in error_str
)
if is_auth_issue:
if self.DEBUG:
print(f"异常信息表明可能存在API密钥问题,尝试切换密钥")
try:
from .key_manager import key_manager
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试请求")
# 使用新密钥重试请求
return self._submit_task(payload, referer)
except (ImportError, Exception) as err:
if self.DEBUG:
print(f"切换API密钥失败: {str(err)}")
return None
def _poll_task_status(self, task_id, max_attempts=40, interval=5):
"""
轮询检查任务状态,直到完成,返回所有生成的图片URL列表
"""
# 保存当前使用的密钥,确保最后可以正确释放
current_auth_token = self.auth_token
if self.DEBUG:
print(f"开始轮询任务 {task_id} 的状态...")
try:
for attempt in range(max_attempts):
try:
headers = self._get_dynamic_headers(referer="https://sora.chatgpt.com/library") # Polling often happens from library view
query_url = f"{self.check_url}?limit=10" # 获取最近的任务,减少数据量
response = self.scraper.get(
query_url,
headers=headers,
proxies=self.proxies,
timeout=15
)
if response.status_code == 200:
try:
result = response.json()
task_responses = result.get("task_responses", [])
# 查找对应的任务
for task in task_responses:
if task.get("id") == task_id:
status = task.get("status")
if self.DEBUG:
print(f" 任务 {task_id} 状态: {status} (尝试 {attempt+1}/{max_attempts})")
if status == "succeeded":
# 任务成功完成,释放密钥
try:
from .key_manager import key_manager
key_manager.release_key(current_auth_token)
if self.DEBUG:
print(f"任务成功完成,已释放密钥")
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"释放密钥时发生错误: {str(e)}")
generations = task.get("generations", [])
image_urls = []
if generations:
for gen in generations:
url = gen.get("url")
if url:
image_urls.append(url)
if image_urls:
if self.DEBUG:
print(f"任务 {task_id} 成功完成!找到 {len(image_urls)} 张图片。")
return image_urls
else:
if self.DEBUG:
print(f"任务 {task_id} 状态为 succeeded,但在响应中未找到有效的图片URL。")
if self.DEBUG:
print(f"任务详情: {json.dumps(task, indent=2)}")
return "任务成功但未找到图片URL"
elif status == "failed":
# 任务失败,释放密钥
try:
from .key_manager import key_manager
key_manager.release_key(current_auth_token)
if self.DEBUG:
print(f"任务失败,已释放密钥")
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"释放密钥时发生错误: {str(e)}")
failure_reason = task.get("failure_reason", "未知原因")
if self.DEBUG:
print(f"任务 {task_id} 失败: {failure_reason}")
# 检查是否是因为API密钥问题导致的失败
failure_reason_lower = failure_reason.lower()
auth_keywords = [
"authorization", "auth", "token", "permission",
"unauthorized", "credentials", "login", "invalid"
]
is_auth_issue = any(keyword in failure_reason_lower for keyword in auth_keywords)
if is_auth_issue:
if self.DEBUG:
print(f"检测到API密钥可能失效,尝试切换密钥")
try:
# 导入在这里进行以避免循环导入
from .key_manager import key_manager
# 标记当前密钥为无效并获取新密钥
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥")
except ImportError:
if self.DEBUG:
print(f"无法导入key_manager,无法自动切换密钥")
except Exception as e:
if self.DEBUG:
print(f"切换API密钥时发生错误: {str(e)}")
return f"任务失败: {failure_reason}"
elif status in ["rejected", "needs_user_review"]:
# 任务被拒绝,释放密钥
try:
from .key_manager import key_manager
key_manager.release_key(current_auth_token)
if self.DEBUG:
print(f"任务被拒绝,已释放密钥")
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"释放密钥时发生错误: {str(e)}")
if self.DEBUG:
print(f"任务 {task_id} 被拒绝或需要审查: {status}")
return f"任务被拒绝或需审查: {status}"
# else status is pending, processing, etc. - continue polling
break # Found the task, no need to check others in this response
else:
# Task ID not found in the recent list, maybe it's older or just submitted
if self.DEBUG:
print(f" 未在最近任务列表中找到 {task_id},继续等待... (尝试 {attempt+1}/{max_attempts})")
except json.JSONDecodeError:
if self.DEBUG:
print(f"检查任务状态时无法解析响应JSON。状态码: {response.status_code}, 响应: {response.text}")
else:
if self.DEBUG:
print(f"检查任务状态失败,状态码: {response.status_code}, 响应: {response.text}")
# 检查是否是认证失败或其他可能表明API密钥失效的情况
response_text = response.text.lower()
is_auth_issue = (
response.status_code in [401, 403, 429] or
"authorization" in response_text or
"auth" in response_text or
"token" in response_text or
"permission" in response_text or
"unauthorized" in response_text or
"credentials" in response_text or
"login" in response_text or
"invalid" in response_text
)
if is_auth_issue:
if self.DEBUG:
print(f"API密钥可能已失效,尝试切换密钥")
try:
# 导入在这里进行以避免循环导入
from .key_manager import key_manager
# 标记当前密钥为无效并获取新密钥
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试请求")
# 使用新密钥继续轮询
continue
except ImportError:
if self.DEBUG:
print(f"无法导入key_manager,无法自动切换密钥")
except Exception as e:
if self.DEBUG:
print(f"切换API密钥时发生错误: {str(e)}")
time.sleep(interval) # 等待一段时间后再次检查
except Exception as e:
if self.DEBUG:
print(f"检查任务状态时出错: {str(e)}")
# 检查异常信息中是否包含可能的认证问题
error_str = str(e).lower()
is_auth_issue = (
"authorization" in error_str or
"auth" in error_str or
"token" in error_str or
"permission" in error_str or
"unauthorized" in error_str or
"credentials" in error_str or
"login" in error_str
)
if is_auth_issue:
if self.DEBUG:
print(f"异常信息表明可能存在API密钥问题,尝试切换密钥")
try:
from .key_manager import key_manager
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试请求")
# 重置当前尝试次数,继续轮询
continue
except (ImportError, Exception) as err:
if self.DEBUG:
print(f"切换API密钥失败: {str(err)}")
# Add a slightly longer delay on error to avoid hammering the server
time.sleep(interval * 1.5)
# 如果达到最大尝试次数,释放密钥
try:
from .key_manager import key_manager
key_manager.release_key(current_auth_token)
if self.DEBUG:
print(f"轮询超时,已释放密钥")
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"释放密钥时发生错误: {str(e)}")
return f"任务 {task_id} 超时 ({max_attempts * interval}秒),未能获取最终状态"
except Exception as e:
# 确保在异常情况下也释放密钥
try:
from .key_manager import key_manager
key_manager.release_key(current_auth_token)
if self.DEBUG:
print(f"轮询过程发生异常,已释放密钥")
except (ImportError, Exception) as release_err:
if self.DEBUG:
print(f"释放密钥时发生错误: {str(release_err)}")
if self.DEBUG:
print(f"轮询任务状态时发生未处理的异常: {str(e)}")
import traceback
traceback.print_exc()
return f"轮询任务状态时出错: {str(e)}"
def test_connection(self):
"""
测试API连接是否有效,仅发送一个轻量级请求
返回:
dict: 包含连接状态信息的字典
"""
start_time = time.time() # 记录开始时间,用于计算响应时间
success = False # 初始化请求结果标识
try:
# 使用简单的GET请求来验证连接和认证
headers = self._get_dynamic_headers(referer="https://sora.chatgpt.com/explore")
response = self.scraper.get(
"https://sora.chatgpt.com/backend/parameters",
headers=headers,
proxies=self.proxies,
timeout=10
)
if response.status_code == 200:
result = response.json()
# 检查返回的数据中是否含有关键字段,确认API确实有效
api_valid = result.get("can_create_images") is not None or "limits_for_images" in result
if api_valid:
success = True # 标记请求成功
# 记录请求结果(成功)
try:
from .key_manager import key_manager
response_time = time.time() - start_time
key_manager.record_request_result(self.auth_token, success, response_time)
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"记录请求结果失败: {str(e)}")
return {
"status": "success",
"message": "API连接测试成功",
"data": result
}
else:
# API返回200但数据不符合预期
success = False # 标记请求失败
# 记录请求结果(失败)
try:
from .key_manager import key_manager
response_time = time.time() - start_time
key_manager.record_request_result(self.auth_token, success, response_time)
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"记录请求结果失败: {str(e)}")
return {
"status": "error",
"message": "API连接测试失败:返回数据格式不符合预期",
"response": result
}
else:
# 检查是否是认证失败或其他可能表明API密钥失效的情况
response_text = response.text.lower()
is_auth_issue = (
response.status_code in [401, 403, 429] or
"authorization" in response_text or
"auth" in response_text or
"token" in response_text or
"permission" in response_text or
"unauthorized" in response_text or
"credentials" in response_text or
"login" in response_text or
"invalid" in response_text
)
success = False # 标记请求失败
# 记录请求结果(失败)
try:
from .key_manager import key_manager
response_time = time.time() - start_time
key_manager.record_request_result(self.auth_token, success, response_time)
except (ImportError, Exception) as e:
if self.DEBUG:
print(f"记录请求结果失败: {str(e)}")
if is_auth_issue:
if self.DEBUG:
print(f"API密钥可能已失效,尝试切换密钥")
try:
# 导入在这里进行以避免循环导入
from .key_manager import key_manager
# 标记当前密钥为无效并获取新密钥
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试连接测试")
# 使用新密钥重试
return self.test_connection()
except ImportError:
if self.DEBUG:
print(f"无法导入key_manager,无法自动切换密钥")
except Exception as e:
if self.DEBUG:
print(f"切换API密钥时发生错误: {str(e)}")
return {
"status": "error",
"message": f"API连接测试失败,状态码: {response.status_code}",
"response": response.text
}
except Exception as e:
success = False # 标记请求失败
# 记录请求结果(异常失败)
try:
from .key_manager import key_manager
response_time = time.time() - start_time
key_manager.record_request_result(self.auth_token, success, response_time)
except (ImportError, Exception) as err:
if self.DEBUG:
print(f"记录请求结果失败: {str(err)}")
# 检查异常信息中是否包含可能的认证问题
error_str = str(e).lower()
is_auth_issue = (
"authorization" in error_str or
"auth" in error_str or
"token" in error_str or
"permission" in error_str or
"unauthorized" in error_str or
"credentials" in error_str or
"login" in error_str
)
if is_auth_issue:
if self.DEBUG:
print(f"异常信息表明可能存在API密钥问题,尝试切换密钥")
try:
from .key_manager import key_manager
new_key = key_manager.mark_key_invalid(self.auth_token)
if new_key:
self.auth_token = new_key
if self.DEBUG:
print(f"已切换到新的API密钥,重试连接测试")
# 使用新密钥重试
return self.test_connection()
except (ImportError, Exception) as err:
if self.DEBUG:
print(f"切换API密钥失败: {str(err)}")
raise Exception(f"API连接测试失败: {str(e)}")