import os import sys import cv2 import json import random import time import datetime import requests import func_timeout import numpy as np import gradio as gr import boto3 from botocore.client import Config OssUrl = "https://selfit-deploy-1256039085.cos.accelerate.myqcloud.com/" Regions = "IndiaPakistanBengal" # TOKEN = os.environ['TOKEN'] # APIKEY = os.environ['APIKEY'] # UKAPIURL = os.environ['UKAPIURL'] OneKey = os.environ['OneKey'].strip() OneKey = OneKey.split("#") TOKEN = OneKey[0] APIKEY = OneKey[1] UKAPIURL = OneKey[2] LLMKEY = OneKey[3] R2_ACCESS_KEY = OneKey[4] R2_SECRET_KEY = OneKey[5] R2_ENDPOINT = OneKey[6] tmpFolder = "tmp" os.makedirs(tmpFolder, exist_ok=True) def upload_user_img(clientIp, timeId, img): fileName = clientIp.replace(".", "")+str(timeId)+".jpg" local_path = os.path.join(tmpFolder, fileName) img = cv2.imread(img) cv2.imwrite(os.path.join(tmpFolder, fileName), img) json_data = { "token": TOKEN, "input1": fileName, "input2": "", "protocol": "", "cloud": "ali" } session = requests.session() ret = requests.post( f"{UKAPIURL}/upload", headers={'Content-Type': 'application/json'}, json=json_data ) res = "" if ret.status_code==200: if 'upload1' in ret.json(): upload_url = ret.json()['upload1'] headers = {'Content-Type': 'image/jpeg'} response = session.put(upload_url, data=open(local_path, 'rb').read(), headers=headers) # print(response.status_code) if response.status_code == 200: res = upload_url if os.path.exists(local_path): os.remove(local_path) return res class R2Api: def __init__(self, session=None): super().__init__() self.R2_BUCKET = "trump-ai-voice" self.domain = "https://www.trumpaivoice.net/" self.R2_ACCESS_KEY = "9ad101e6879661bbe5b9e36e49a988d3" self.R2_SECRET_KEY = "cea5f1c5cbbae6473c9a144c219dad61b9629befb9648e04734b615c95913f85" self.R2_ENDPOINT = f"https://a0f50aa07e4b6bd6d2b7431feb6b68ce.r2.cloudflarestorage.com" self.client = boto3.client( "s3", endpoint_url=self.R2_ENDPOINT, aws_access_key_id=self.R2_ACCESS_KEY, aws_secret_access_key=self.R2_SECRET_KEY, config=Config(signature_version="s3v4") ) self.session = requests.Session() if session is None else session def upload_file(self, local_path, cloud_path): t1 = time.time() head_dict = { 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png', 'gif': 'image/gif', 'bmp': 'image/bmp', 'webp': 'image/webp', 'ico': 'image/x-icon' } ftype = os.path.basename(local_path).split(".")[-1].lower() ctype = head_dict.get(ftype, 'application/octet-stream') headers = {"Content-Type": ctype} cloud_path = f"QwenImageEdit/Uploads/{str(datetime.date.today())}/{os.path.basename(local_path)}" url = self.client.generate_presigned_url( "put_object", Params={"Bucket": self.R2_BUCKET, "Key": cloud_path, "ContentType": ctype}, ExpiresIn=604800 ) retry_count = 0 while retry_count < 3: try: with open(local_path, 'rb') as f: self.session.put(url, data=f.read(), headers=headers, timeout=8) break except (requests.exceptions.Timeout, requests.exceptions.RequestException): retry_count += 1 if retry_count == 3: raise Exception('Failed to upload file to R2 after 3 retries!') continue print("upload_file time is ====>", time.time() - t1) return f"{self.domain}{cloud_path}" def upload_user_img_r2(clientIp, timeId, img): fileName = clientIp.replace(".", "")+str(timeId)+".jpg" local_path = os.path.join(tmpFolder, fileName) img = cv2.imread(img) cv2.imwrite(os.path.join(tmpFolder, fileName), img) res = R2Api().upload_file(local_path, fileName) if os.path.exists(local_path): os.remove(local_path) return res @func_timeout.func_set_timeout(10) def check_region(ip): session = requests.session() # ret = requests.get(f"https://api.ip2location.io/?ip={ip}") ret = requests.get(f"https://realip.cc/?ip={ip}") # print(ret) nat = ret.json()['country'].lower() if nat in Regions.lower(): print(nat, 'invalid', ip) return False else: print(nat, 'valid', ip) return True def check_region_warp(ip): try: return check_region(ip) except Exception as e: print(e) return True @func_timeout.func_set_timeout(10) def get_country_info(ip): """获取IP对应的国家信息""" try: session = requests.session() ret = requests.get(f"https://realip.cc/?ip={ip}") country = ret.json()['country'] return country except Exception as e: print(f"获取IP属地失败: {e}") return "Unknown" def get_country_info_safe(ip): """安全获取IP属地信息,出错时返回Unknown""" try: return get_country_info(ip) except Exception as e: print(f"获取IP属地失败: {e}") return "Unknown" def check_nsfw(prompt): """ 检查prompt是否包含NSFW内容,包含返回1,否则返回0 """ try: response = requests.post( url="https://openrouter.ai/api/v1/chat/completions", headers={ "Authorization": f"Bearer {LLMKEY}", "Content-Type": "application/json", }, data=json.dumps({ "model": "google/gemini-2.5-flash", "messages": [ { "role": "system", "content": "你是一个nsfw指令判断助手,请判断用户输入的prompt指令是否会导致nsfw内容? 你只需要回答 是 或者 否" }, { "role": "user", "content": prompt } ], }) ) res_json = response.json() # 兼容不同模型返回格式 if "choices" in res_json and len(res_json["choices"]) > 0: content = res_json["choices"][0].get("message", {}).get("content", "") if "是" in content: return 1 else: return 0 else: return 0 except Exception as e: # 出错时默认返回0 return 0 def submit_image_edit_task(user_image_url, prompt): """ 提交图片编辑任务 """ headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {APIKEY}' } data = { "user_image": user_image_url, "task_type": "80", "prompt": prompt, "secret_key": "219ngu", "is_private": "0" } try: response = requests.post( f'{UKAPIURL}/public_image_edit', headers=headers, json=data ) if response.status_code == 200: result = response.json() if result.get('code') == 0: return result['data']['task_id'], None else: return None, f"API 错误: {result.get('message', '未知错误')}" else: return None, f"HTTP 错误: {response.status_code}" except Exception as e: return None, f"请求异常: {str(e)}" def check_task_status(task_id): """ 查询任务状态 """ headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {APIKEY}' } data = { "task_id": task_id } try: response = requests.post( f'{UKAPIURL}/status_image_edit', headers=headers, json=data ) if response.status_code == 200: result = response.json() if result.get('code') == 0: task_data = result['data'] return task_data['status'], task_data.get('output1'), task_data else: return 'error', None, result.get('message', '未知错误') else: return 'error', None, f"HTTP 错误: {response.status_code}" except Exception as e: return 'error', None, f"请求异常: {str(e)}" def process_image_edit(img_path, prompt, progress_callback=None): """ 处理图片编辑的完整流程 """ try: # 生成客户端 IP 和时间戳 client_ip = "127.0.0.1" # 默认IP time_id = int(time.time()) if progress_callback: progress_callback("uploading image...") # 上传用户图片 uploaded_url = upload_user_img_r2(client_ip, time_id, img_path) if not uploaded_url: return None, "image upload failed" # 从上传 URL 中提取实际的图片 URL if "?" in uploaded_url: uploaded_url = uploaded_url.split("?")[0] if progress_callback: progress_callback("submitting edit task...") # 提交图片编辑任务 task_id, error = submit_image_edit_task(uploaded_url, prompt) if error: return None, error if progress_callback: progress_callback(f"task submitted, ID: {task_id}, processing...") # 等待任务完成 max_attempts = 60 # 最多等待10分钟 for attempt in range(max_attempts): status, output_url, task_data = check_task_status(task_id) if status == 'completed': if output_url: return output_url, "image edit completed" else: return None, "任务完成但未返回结果图片" elif status == 'error' or status == 'failed': return None, f"task processing failed: {task_data}" elif status in ['queued', 'processing', 'running', 'created', 'working']: if progress_callback: progress_callback(f"task processing... (status: {status})") time.sleep(1) # 等待10秒后重试 else: if progress_callback: progress_callback(f"unknown status: {status}") time.sleep(1) return None, "task processing timeout" except Exception as e: return None, f"error occurred during processing: {str(e)}" if __name__ == "__main__": pass