playwright / app.py
ahmetalper's picture
Update app.py
e9f2517 verified
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
from fastapi import FastAPI
from random import choice
from io import BytesIO
from time import sleep
from PIL import Image
import requests
#-------------------------------------------------- << RUN >> --------------------------------------------------#
# uvicorn app:app
#-------------------------------------------------- << VARIABLES >> --------------------------------------------------#
#-------------------------------------------------- << FUNCTIONS >> --------------------------------------------------#
def get_email():
try:
url = 'https://temp-mail107.p.rapidapi.com/domains'
headers = {
'x-rapidapi-key': 'd3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e',
'x-rapidapi-host': 'temp-mail107.p.rapidapi.com'
}
response = requests.get(url, headers = headers)
data = response.json()
domains = data['data']
domain = choice(domains)
url = 'https://temp-mail107.p.rapidapi.com/create-account'
payload = { 'domain': domain }
headers = {
'x-rapidapi-key': 'd3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e',
'x-rapidapi-host': 'temp-mail107.p.rapidapi.com',
'Content-Type': 'application/json'
}
response = requests.post(url, json = payload, headers = headers)
data = response.json()
email = data['data']['address']
return email
except Exception as error:
print(f'error : {error}')
def get_verification_url(email):
try:
url = "https://temp-mail107.p.rapidapi.com/check-message"
querystring = {"address" : email}
headers = {
"x-rapidapi-key": "d3349ed2b8msh2a16ceeb6b2a192p173237jsn4ff2c7c6467e",
"x-rapidapi-host": "temp-mail107.p.rapidapi.com"
}
response = requests.get(url, headers = headers, params = querystring)
data = response.json()
while data['data'] == None:
sleep(2)
response = requests.get(url, headers = headers, params = querystring)
data = response.json()
html = data['data']['html']
soup = BeautifulSoup(html, 'html.parser')
verification_url = soup.find('a').get('href')
return verification_url
except Exception as error:
print(f'error : {error}')
class Bot:
def __init__(self):
self.playwright = None
self.browser = None
self.page = None
def get_captcha(self):
if self.page:
self.page.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(headless = True, args = ['--start-maximized'])
self.page = self.browser.new_page(no_viewport = True)
self.page.goto('https://www.thinkdiffusion.com/api/auth/signup')
self.page.wait_for_timeout(1000)
captcha_element = self.page.query_selector('body > div.c526f8a4f.c8a792eff > main > section > div > div > div > form > div.c8b219657.c9a020846 > div.c8b219657.c1f2d1695 > div > div.cc9af1055.cddcd8014 > div.c4026b06e.cb1295f34 > img')
captcha_base64 = captcha_element.screenshot(type = 'png')
captcha_image = Image.open(BytesIO(captcha_base64))
captcha_buffer = BytesIO()
captcha_image.save(captcha_buffer, format = 'png')
captcha_buffer.seek(0)
return captcha_buffer
def generate_account(self, captcha_solution):
if self.playwright and self.browser and self.page:
if captcha_solution.strip() != '':
try:
email = get_email()
password = 'Qwerty12321*'
self.page.locator('#email').press_sequentially(email)
self.page.locator('#password').press_sequentially(password)
self.page.locator('#captcha').press_sequentially(captcha_solution)
self.page.locator('body > div.c526f8a4f.c8a792eff > main > section > div > div > div > form > div.c2f62a829 > button').click()
verification_url = get_verification_url(email)
self.page.goto(verification_url)
self.page.wait_for_timeout(3000)
cookies = self.page.context.cookies()
csrf_token = [cookie['value'] for cookie in cookies if cookie['name'] == 'csrf_token'][0]
app_session = [cookie['value'] for cookie in cookies if cookie['name'] == 'appSession'][0]
self.page.close()
self.browser.close()
self.playwright.stop()
self.page = None
self.browser = None
self.playwright = None
headers = {
'user-agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36'
}
data = {
'civitaiModelVersionId' : None,
'presetSlug' : 'turbo-comfy-exp',
'runningTime' : 1200,
'teamId' : None
}
cookies = {
'csrf_token' : csrf_token,
'appSession' : app_session
}
url = 'https://www.thinkdiffusion.com/api/machines/create'
response = requests.post(url = url, headers = headers, cookies = cookies, data = data)
data = response.json()
subdomain = data['subdomain']
url = f'https://{subdomain}.thinkdiffusion.xyz'
return {'status' : 'success', 'url' : url}
except Exception as exception:
self.page.close()
self.browser.close()
self.playwright.stop()
self.page = None
self.browser = None
self.playwright = None
return {'status' : 'error', 'message' : str(exception)}
else:
return {'status' : 'error', 'message' : 'captcha solution cannot be empty.'}
else:
return {'status' : 'error', 'message' : 'no browser available'}
#-------------------------------------------------- << APP >> --------------------------------------------------#
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins = ['*'], allow_credentials = True, allow_methods = ['*'], allow_headers = ['*'])
#-------------------------------------------------- << BOT >> --------------------------------------------------#
bot = Bot()
#-------------------------------------------------- << ROUTES >> --------------------------------------------------#
@app.get('/')
def main_route():
try:
return JSONResponse(status_code = 200, content = {'status' : 'success'})
except Exception as exception:
return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception})
@app.get('/get-captcha')
def get_captcha_route():
try:
captcha_buffer = bot.get_captcha()
return StreamingResponse(status_code = 200, content = captcha_buffer, media_type = 'image/png')
except Exception as exception:
return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception})
@app.get('/generate-account/{captcha_solution}')
def generate_account_route(captcha_solution: str):
try:
response = bot.generate_account(captcha_solution)
if response['status'] == 'success':
return JSONResponse(status_code = 200, content = {'status' : response['status'], 'url' : response['url']})
if response['status'] == 'error':
return JSONResponse(status_code = 500, content = {'status' : response['status'], 'message' : response['message']})
except Exception as exception:
return JSONResponse(status_code = 500, content = {'status' : 'error', 'message' : exception})