from fastapi.responses import JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware from playwright.sync_api import sync_playwright from bs4 import BeautifulSoup from fastapi import FastAPI from random import choice from io import BytesIO from time import sleep from PIL import Image import requests #-------------------------------------------------- << RUN >> --------------------------------------------------# # uvicorn app:app #-------------------------------------------------- << VARIABLES >> --------------------------------------------------# #-------------------------------------------------- << FUNCTIONS >> --------------------------------------------------# def get_email(): try: url = 'https://temp-mail107.p.rapidapi.com/domains' headers = { 'x-rapidapi-key': 'd3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e', 'x-rapidapi-host': 'temp-mail107.p.rapidapi.com' } response = requests.get(url, headers = headers) data = response.json() domains = data['data'] domain = choice(domains) url = 'https://temp-mail107.p.rapidapi.com/create-account' payload = { 'domain': domain } headers = { 'x-rapidapi-key': 'd3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e', 'x-rapidapi-host': 'temp-mail107.p.rapidapi.com', 'Content-Type': 'application/json' } response = requests.post(url, json = payload, headers = headers) data = response.json() email = data['data']['address'] return email except Exception as error: print(f'error : {error}') def get_verification_url(email): try: url = "https://temp-mail107.p.rapidapi.com/check-message" querystring = {"address" : email} headers = { "x-rapidapi-key": "d3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e", "x-rapidapi-host": "temp-mail107.p.rapidapi.com" } response = requests.get(url, headers = headers, params = querystring) data = response.json() while data['data'] == None: sleep(2) response = requests.get(url, headers = headers, params = querystring) data = response.json() html = data['data']['html'] soup = BeautifulSoup(html, 'html.parser') verification_url = soup.find('a').get('href') return verification_url except Exception as error: print(f'error : {error}') class Bot: def __init__(self): self.playwright = None self.browser = None self.page = None def get_captcha(self): if self.page: self.page.close() if self.browser: self.browser.close() if self.playwright: self.playwright.stop() self.playwright = sync_playwright().start() self.browser = self.playwright.chromium.launch(headless = True, args = ['--start-maximized']) self.page = self.browser.new_page(no_viewport = True) self.page.goto('https://www.thinkdiffusion.com/api/auth/signup') self.page.wait_for_timeout(1000) captcha_element = self.page.query_selector('body > div.c526f8a4f.c8a792eff > main > section > div > div > div > form > div.c8b219657.c9a020846 > div.c8b219657.c1f2d1695 > div > div.cc9af1055.cddcd8014 > div.c4026b06e.cb1295f34 > img') captcha_base64 = captcha_element.screenshot(type = 'png') captcha_image = Image.open(BytesIO(captcha_base64)) captcha_buffer = BytesIO() captcha_image.save(captcha_buffer, format = 'png') captcha_buffer.seek(0) return captcha_buffer def generate_account(self, captcha_solution): if self.playwright and self.browser and self.page: if captcha_solution.strip() != '': try: email = get_email() password = 'Qwerty12321*' self.page.locator('#email').press_sequentially(email) self.page.locator('#password').press_sequentially(password) self.page.locator('#captcha').press_sequentially(captcha_solution) self.page.locator('body > div.c526f8a4f.c8a792eff > main > section > div > div > div > form > div.c2f62a829 > button').click() verification_url = get_verification_url(email) self.page.goto(verification_url) self.page.wait_for_timeout(3000) cookies = self.page.context.cookies() csrf_token = [cookie['value'] for cookie in cookies if cookie['name'] == 'csrf_token'][0] app_session = [cookie['value'] for cookie in cookies if cookie['name'] == 'appSession'][0] self.page.close() self.browser.close() self.playwright.stop() self.page = None self.browser = None self.playwright = None headers = { 'user-agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36' } data = { 'civitaiModelVersionId' : None, 'presetSlug' : 'turbo-comfy-exp', 'runningTime' : 1200, 'teamId' : None } cookies = { 'csrf_token' : csrf_token, 'appSession' : app_session } url = 'https://www.thinkdiffusion.com/api/machines/create' response = requests.post(url = url, headers = headers, cookies = cookies, data = data) data = response.json() subdomain = data['subdomain'] url = f'https://{subdomain}.thinkdiffusion.xyz' return {'status' : 'success', 'url' : url} except Exception as exception: self.page.close() self.browser.close() self.playwright.stop() self.page = None self.browser = None self.playwright = None return {'status' : 'error', 'message' : str(exception)} else: return {'status' : 'error', 'message' : 'captcha solution cannot be empty.'} else: return {'status' : 'error', 'message' : 'no browser available'} #-------------------------------------------------- << APP >> --------------------------------------------------# app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins = ['*'], allow_credentials = True, allow_methods = ['*'], allow_headers = ['*']) #-------------------------------------------------- << BOT >> --------------------------------------------------# bot = Bot() #-------------------------------------------------- << ROUTES >> --------------------------------------------------# @app.get('/') def main_route(): try: return JSONResponse(status_code = 200, content = {'status' : 'success'}) except Exception as exception: return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception}) @app.get('/get-captcha') def get_captcha_route(): try: captcha_buffer = bot.get_captcha() return StreamingResponse(status_code = 200, content = captcha_buffer, media_type = 'image/png') except Exception as exception: return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception}) @app.get('/generate-account/{captcha_solution}') def generate_account_route(captcha_solution: str): try: response = bot.generate_account(captcha_solution) if response['status'] == 'success': return JSONResponse(status_code = 200, content = {'status' : response['status'], 'url' : response['url']}) if response['status'] == 'error': return JSONResponse(status_code = 500, content = {'status' : response['status'], 'message' : response['message']}) except Exception as exception: return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception})