Spaces:
Runtime error
Runtime error
from fastapi.responses import JSONResponse, StreamingResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from playwright.sync_api import sync_playwright | |
from bs4 import BeautifulSoup | |
from fastapi import FastAPI | |
from random import choice | |
from io import BytesIO | |
from time import sleep | |
from PIL import Image | |
import requests | |
#-------------------------------------------------- << RUN >> --------------------------------------------------# | |
# uvicorn app:app | |
#-------------------------------------------------- << VARIABLES >> --------------------------------------------------# | |
#-------------------------------------------------- << FUNCTIONS >> --------------------------------------------------# | |
def get_email(): | |
try: | |
url = 'https://temp-mail107.p.rapidapi.com/domains' | |
headers = { | |
'x-rapidapi-key': 'd3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e', | |
'x-rapidapi-host': 'temp-mail107.p.rapidapi.com' | |
} | |
response = requests.get(url, headers = headers) | |
data = response.json() | |
domains = data['data'] | |
domain = choice(domains) | |
url = 'https://temp-mail107.p.rapidapi.com/create-account' | |
payload = { 'domain': domain } | |
headers = { | |
'x-rapidapi-key': 'd3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e', | |
'x-rapidapi-host': 'temp-mail107.p.rapidapi.com', | |
'Content-Type': 'application/json' | |
} | |
response = requests.post(url, json = payload, headers = headers) | |
data = response.json() | |
email = data['data']['address'] | |
return email | |
except Exception as error: | |
print(f'error : {error}') | |
def get_verification_url(email): | |
try: | |
url = "https://temp-mail107.p.rapidapi.com/check-message" | |
querystring = {"address" : email} | |
headers = { | |
"x-rapidapi-key": "d3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e", | |
"x-rapidapi-host": "temp-mail107.p.rapidapi.com" | |
} | |
response = requests.get(url, headers = headers, params = querystring) | |
data = response.json() | |
while data['data'] == None: | |
sleep(2) | |
response = requests.get(url, headers = headers, params = querystring) | |
data = response.json() | |
html = data['data']['html'] | |
soup = BeautifulSoup(html, 'html.parser') | |
verification_url = soup.find('a').get('href') | |
return verification_url | |
except Exception as error: | |
print(f'error : {error}') | |
class Bot: | |
def __init__(self): | |
self.playwright = None | |
self.browser = None | |
self.page = None | |
def get_captcha(self): | |
if self.page: | |
self.page.close() | |
if self.browser: | |
self.browser.close() | |
if self.playwright: | |
self.playwright.stop() | |
self.playwright = sync_playwright().start() | |
self.browser = self.playwright.chromium.launch(headless = True, args = ['--start-maximized']) | |
self.page = self.browser.new_page(no_viewport = True) | |
self.page.goto('https://www.thinkdiffusion.com/api/auth/signup') | |
self.page.wait_for_timeout(1000) | |
captcha_element = self.page.query_selector('body > div.c526f8a4f.c8a792eff > main > section > div > div > div > form > div.c8b219657.c9a020846 > div.c8b219657.c1f2d1695 > div > div.cc9af1055.cddcd8014 > div.c4026b06e.cb1295f34 > img') | |
captcha_base64 = captcha_element.screenshot(type = 'png') | |
captcha_image = Image.open(BytesIO(captcha_base64)) | |
captcha_buffer = BytesIO() | |
captcha_image.save(captcha_buffer, format = 'png') | |
captcha_buffer.seek(0) | |
return captcha_buffer | |
def generate_account(self, captcha_solution): | |
if self.playwright and self.browser and self.page: | |
if captcha_solution.strip() != '': | |
try: | |
email = get_email() | |
password = 'Qwerty12321*' | |
self.page.locator('#email').press_sequentially(email) | |
self.page.locator('#password').press_sequentially(password) | |
self.page.locator('#captcha').press_sequentially(captcha_solution) | |
self.page.locator('body > div.c526f8a4f.c8a792eff > main > section > div > div > div > form > div.c2f62a829 > button').click() | |
verification_url = get_verification_url(email) | |
self.page.goto(verification_url) | |
self.page.wait_for_timeout(3000) | |
cookies = self.page.context.cookies() | |
csrf_token = [cookie['value'] for cookie in cookies if cookie['name'] == 'csrf_token'][0] | |
app_session = [cookie['value'] for cookie in cookies if cookie['name'] == 'appSession'][0] | |
self.page.close() | |
self.browser.close() | |
self.playwright.stop() | |
self.page = None | |
self.browser = None | |
self.playwright = None | |
headers = { | |
'user-agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36' | |
} | |
data = { | |
'civitaiModelVersionId' : None, | |
'presetSlug' : 'turbo-comfy-exp', | |
'runningTime' : 1200, | |
'teamId' : None | |
} | |
cookies = { | |
'csrf_token' : csrf_token, | |
'appSession' : app_session | |
} | |
url = 'https://www.thinkdiffusion.com/api/machines/create' | |
response = requests.post(url = url, headers = headers, cookies = cookies, data = data) | |
data = response.json() | |
subdomain = data['subdomain'] | |
url = f'https://{subdomain}.thinkdiffusion.xyz' | |
return {'status' : 'success', 'url' : url} | |
except Exception as exception: | |
self.page.close() | |
self.browser.close() | |
self.playwright.stop() | |
self.page = None | |
self.browser = None | |
self.playwright = None | |
return {'status' : 'error', 'message' : str(exception)} | |
else: | |
return {'status' : 'error', 'message' : 'captcha solution cannot be empty.'} | |
else: | |
return {'status' : 'error', 'message' : 'no browser available'} | |
#-------------------------------------------------- << APP >> --------------------------------------------------# | |
app = FastAPI() | |
app.add_middleware(CORSMiddleware, allow_origins = ['*'], allow_credentials = True, allow_methods = ['*'], allow_headers = ['*']) | |
#-------------------------------------------------- << BOT >> --------------------------------------------------# | |
bot = Bot() | |
#-------------------------------------------------- << ROUTES >> --------------------------------------------------# | |
def main_route(): | |
try: | |
return JSONResponse(status_code = 200, content = {'status' : 'success'}) | |
except Exception as exception: | |
return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception}) | |
def get_captcha_route(): | |
try: | |
captcha_buffer = bot.get_captcha() | |
return StreamingResponse(status_code = 200, content = captcha_buffer, media_type = 'image/png') | |
except Exception as exception: | |
return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception}) | |
def generate_account_route(captcha_solution: str): | |
try: | |
response = bot.generate_account(captcha_solution) | |
if response['status'] == 'success': | |
return JSONResponse(status_code = 200, content = {'status' : response['status'], 'url' : response['url']}) | |
if response['status'] == 'error': | |
return JSONResponse(status_code = 500, content = {'status' : response['status'], 'message' : response['message']}) | |
except Exception as exception: | |
return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception}) |