|
import gradio as gr |
|
import os |
|
import re |
|
import time |
|
import torch |
|
import torch.nn as nn |
|
from PIL import Image |
|
import pytesseract |
|
from playwright.sync_api import sync_playwright |
|
import asyncio |
|
from transformers import AutoTokenizer, BertTokenizerFast |
|
from torchvision import transforms |
|
from torchvision import models |
|
from torchvision.transforms import functional as F |
|
import pandas as pd |
|
from huggingface_hub import hf_hub_download |
|
import warnings |
|
warnings.filterwarnings("ignore") |
|
from pathlib import Path |
|
import subprocess |
|
import traceback |
|
|
|
|
|
|
|
|
|
|
|
BLOCK_PATTERNS = [ |
|
"doubleclick", "adservice", "googlesyndication", "ads", "adserver", "cookie", "consent", |
|
"analytics", "tracker", "tracking", "stats", "metric", "telemetry", "social", "facebook", |
|
"twitter", "linkedin", "pinterest", "popup", "notification", "banner" |
|
] |
|
PAGE_TIMEOUT = 30000 |
|
WAIT_FOR_LOAD_TIMEOUT = 5000 |
|
CLOUDFLARE_CHECK_KEYWORDS = ["Checking your browser", "Just a moment", "Cloudflare"] |
|
MAX_REDIRECTS = 5 |
|
|
|
|
|
|
|
|
|
|
|
def ensure_http(url): |
|
if not url.startswith(('http://', 'https://')): |
|
return 'http://' + url |
|
return url |
|
|
|
def sanitize_filename(url): |
|
return re.sub(r'[^\w\-_\. ]', '_', url) |
|
|
|
def block_ads_and_cookies(page): |
|
def route_intercept(route): |
|
if any(resource in route.request.url.lower() for resource in BLOCK_PATTERNS): |
|
route.abort() |
|
else: |
|
route.continue_() |
|
page.route("**/*", route_intercept) |
|
|
|
def wait_for_page_stable(page): |
|
try: |
|
|
|
page.wait_for_load_state('domcontentloaded', timeout=PAGE_TIMEOUT) |
|
|
|
|
|
try: |
|
page.wait_for_load_state('networkidle', timeout=WAIT_FOR_LOAD_TIMEOUT) |
|
except: |
|
print("Network not fully idle, continuing anyway...") |
|
|
|
|
|
time.sleep(2) |
|
except Exception as e: |
|
print(f"⚠️ Page not fully stable: {e}") |
|
|
|
def detect_and_bypass_cloudflare(page): |
|
try: |
|
content = page.content() |
|
if any(keyword.lower() in content.lower() for keyword in CLOUDFLARE_CHECK_KEYWORDS): |
|
print("⚡ Detected Cloudflare challenge, waiting 5 seconds...") |
|
time.sleep(5) |
|
page.reload() |
|
wait_for_page_stable(page) |
|
except Exception as e: |
|
print(f"⚠️ Failed to bypass Cloudflare: {e}") |
|
|
|
|
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
print(f"Using device: {device}") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tokenizer = BertTokenizerFast.from_pretrained("indobenchmark/indobert-base-p1") |
|
except Exception as e: |
|
print(f"Error loading tokenizer: {e}") |
|
|
|
print("Falling back to default BERT tokenizer") |
|
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') |
|
|
|
|
|
class ResizePadToSquare: |
|
def __init__(self, target_size=300): |
|
self.target_size = target_size |
|
|
|
def __call__(self, img): |
|
img = img.convert("RGB") |
|
img.thumbnail((self.target_size, self.target_size), Image.BILINEAR) |
|
delta_w = self.target_size - img.size[0] |
|
delta_h = self.target_size - img.size[1] |
|
padding = (delta_w // 2, delta_h // 2, delta_w - delta_w // 2, delta_h - delta_h // 2) |
|
img = F.pad(img, padding, fill=0, padding_mode='constant') |
|
return img |
|
|
|
transform = transforms.Compose([ |
|
ResizePadToSquare(300), |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], |
|
std=[0.229, 0.224, 0.225]), |
|
]) |
|
|
|
|
|
def ensure_playwright_chromium(): |
|
try: |
|
print("Checking and installing Playwright Chromium if not present...") |
|
subprocess.run(["playwright", "install", "chromium"], check=True) |
|
print("Playwright Chromium installation completed.") |
|
except Exception as e: |
|
print("Error during Playwright Chromium installation:", e) |
|
traceback.print_exc() |
|
|
|
|
|
ensure_playwright_chromium() |
|
|
|
|
|
SCREENSHOT_DIR = "screenshots" |
|
os.makedirs(SCREENSHOT_DIR, exist_ok=True) |
|
|
|
|
|
pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract' |
|
print("Tesseract OCR initialized.") |
|
|
|
|
|
class LateFusionModel(nn.Module): |
|
def __init__(self, image_model, text_model): |
|
super(LateFusionModel, self).__init__() |
|
self.image_model = image_model |
|
self.text_model = text_model |
|
self.image_weight = nn.Parameter(torch.tensor(0.5)) |
|
self.text_weight = nn.Parameter(torch.tensor(0.5)) |
|
|
|
def forward(self, images, input_ids, attention_mask): |
|
with torch.no_grad(): |
|
image_logits = self.image_model(images).squeeze(1) |
|
text_logits = self.text_model(input_ids=input_ids, attention_mask=attention_mask).logits.squeeze(1) |
|
|
|
weights = torch.softmax(torch.stack([self.image_weight, self.text_weight]), dim=0) |
|
fused_logits = weights[0] * image_logits + weights[1] * text_logits |
|
|
|
return fused_logits, image_logits, text_logits, weights |
|
|
|
|
|
model_path = "models/best_fusion_model.pt" |
|
if os.path.exists(model_path): |
|
fusion_model = torch.load(model_path, map_location=device, weights_only=False) |
|
else: |
|
model_path = hf_hub_download(repo_id="azzandr/gambling-fusion-model", filename="best_fusion_model.pt") |
|
fusion_model = torch.load(model_path, map_location=device, weights_only=False) |
|
|
|
fusion_model.to(device) |
|
fusion_model.eval() |
|
print("Fusion model loaded successfully!") |
|
|
|
|
|
|
|
image_model_path = "models/best_image_model_Adam_lr0.0001_bs32_state_dict.pt" |
|
if os.path.exists(image_model_path): |
|
image_only_model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT) |
|
num_features = image_only_model.classifier[1].in_features |
|
image_only_model.classifier = nn.Linear(num_features, 1) |
|
image_only_model.load_state_dict(torch.load(image_model_path, map_location=device)) |
|
image_only_model.to(device) |
|
image_only_model.eval() |
|
print("Image-only model loaded from state_dict successfully!") |
|
else: |
|
|
|
image_model_path = hf_hub_download(repo_id="azzandr/gambling-image-model", |
|
filename="best_image_model_Adam_lr0.0001_bs32_state_dict.pt") |
|
image_only_model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT) |
|
num_features = image_only_model.classifier[1].in_features |
|
image_only_model.classifier = nn.Linear(num_features, 1) |
|
image_only_model.load_state_dict(torch.load(image_model_path, map_location=device)) |
|
image_only_model.to(device) |
|
image_only_model.eval() |
|
print("Image-only model loaded from HuggingFace successfully!") |
|
|
|
|
|
def clean_text(text): |
|
exceptions = { |
|
"di", "ke", "ya" |
|
} |
|
|
|
text = re.sub(r"http\S+", "", text) |
|
text = re.sub(r"\n", " ", text) |
|
text = re.sub(r"[^a-zA-Z']", " ", text) |
|
text = re.sub(r"\s{2,}", " ", text).strip().lower() |
|
|
|
|
|
words = text.split() |
|
filtered_words = [ |
|
w for w in words |
|
if (len(w) > 2 or w in exceptions) |
|
] |
|
text = ' '.join(filtered_words) |
|
|
|
|
|
text = re.sub(r'\b[aeiou]+\b', '', text) |
|
text = re.sub(r'\b[^aeiou\s]+\b', '', text) |
|
text = re.sub(r'\b\w{20,}\b', '', text) |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
|
|
if len(text.split()) < 5: |
|
print(f"Cleaned text too short ({len(text.split())} words). Ignoring text.") |
|
return "" |
|
return text |
|
|
|
def create_browser_context(playwright): |
|
return playwright.chromium.launch( |
|
args=[ |
|
'--disable-features=IsolateOrigins,site-per-process', |
|
'--disable-web-security', |
|
'--disable-site-isolation-trials', |
|
'--disable-setuid-sandbox', |
|
'--no-sandbox', |
|
'--disable-gpu', |
|
'--disable-dev-shm-usage', |
|
'--disable-extensions', |
|
'--disable-plugins', |
|
'--disable-background-timer-throttling', |
|
'--disable-backgrounding-occluded-windows', |
|
'--disable-renderer-backgrounding', |
|
'--no-first-run', |
|
'--no-default-browser-check', |
|
'--disable-translate', |
|
'--disable-ipc-flooding-protection' |
|
] |
|
).new_context( |
|
viewport={"width": 1280, "height": 800}, |
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36", |
|
ignore_https_errors=True, |
|
java_script_enabled=True, |
|
bypass_csp=True, |
|
extra_http_headers={ |
|
"Accept-Language": "en-US,en;q=0.9", |
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
|
"Connection": "keep-alive", |
|
"DNT": "1", |
|
"Cache-Control": "no-cache" |
|
} |
|
) |
|
|
|
def setup_request_interception(page): |
|
redirect_urls = set() |
|
|
|
def handle_request(route): |
|
request = route.request |
|
url = request.url |
|
|
|
|
|
if any(pattern in url.lower() for pattern in BLOCK_PATTERNS): |
|
print(f"Blocking request to: {url}") |
|
route.abort() |
|
return |
|
|
|
|
|
if request.resource_type == "document": |
|
if url in redirect_urls: |
|
if len(redirect_urls) > MAX_REDIRECTS: |
|
print(f"Too many redirects (>{MAX_REDIRECTS}), aborting request") |
|
route.abort() |
|
return |
|
redirect_urls.add(url) |
|
|
|
|
|
route.continue_() |
|
|
|
|
|
def handle_response(response): |
|
if response.status >= 300 and response.status <= 399: |
|
redirect_urls.add(response.url) |
|
|
|
page.on("response", handle_response) |
|
page.route("**/*", handle_request) |
|
|
|
def try_navigation_strategies(page, url): |
|
strategies = [ |
|
{"wait_until": "commit", "timeout": 15000}, |
|
{"wait_until": "domcontentloaded", "timeout": 10000}, |
|
{"wait_until": "load", "timeout": 20000}, |
|
{"wait_until": "networkidle", "timeout": 30000} |
|
] |
|
|
|
for i, strategy in enumerate(strategies): |
|
try: |
|
print(f"Trying navigation strategy {i+1}: {strategy}") |
|
response = page.goto(url, **strategy) |
|
print(f"Navigation successful with strategy {i+1}") |
|
return response |
|
except Exception as e: |
|
print(f"Strategy {i+1} failed: {e}") |
|
if "ERR_TOO_MANY_REDIRECTS" in str(e): |
|
print(f"Redirect error detected, trying next strategy...") |
|
continue |
|
elif i == len(strategies) - 1: |
|
raise e |
|
continue |
|
|
|
raise Exception("All navigation strategies failed") |
|
|
|
def take_screenshot(url): |
|
url = ensure_http(url) |
|
filename = sanitize_filename(url) + '.png' |
|
filepath = os.path.join(SCREENSHOT_DIR, filename) |
|
|
|
max_retries = 3 |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
print(f"\n=== [SCREENSHOT ATTEMPT {attempt + 1}/{max_retries}] URL: {url} ===") |
|
|
|
with sync_playwright() as p: |
|
print("Launching browser with aggressive configuration...") |
|
context = create_browser_context(p) |
|
page = context.new_page() |
|
|
|
|
|
if attempt == 0: |
|
print("Setting up basic request interception...") |
|
def simple_block(route): |
|
url_lower = route.request.url.lower() |
|
if any(pattern in url_lower for pattern in BLOCK_PATTERNS): |
|
route.abort() |
|
else: |
|
route.continue_() |
|
page.route("**/*", simple_block) |
|
|
|
try: |
|
|
|
if attempt == 0: |
|
|
|
response = try_navigation_strategies(page, url) |
|
elif attempt == 1: |
|
|
|
print("Trying minimal navigation approach...") |
|
response = page.goto(url, wait_until="commit", timeout=10000) |
|
else: |
|
|
|
print("Trying basic navigation...") |
|
response = page.goto(url, timeout=15000) |
|
|
|
if response: |
|
print(f"Response status: {response.status}") |
|
|
|
|
|
try: |
|
page.wait_for_timeout(3000) |
|
if attempt == 0: |
|
wait_for_page_stable(page) |
|
except Exception as e: |
|
print(f"Page stability warning: {e}") |
|
|
|
|
|
print("Taking screenshot...") |
|
page.screenshot(path=filepath) |
|
|
|
|
|
context.close() |
|
print(f"Screenshot saved successfully to {filepath}") |
|
return filepath |
|
|
|
except Exception as nav_error: |
|
print(f"Navigation error on attempt {attempt + 1}: {nav_error}") |
|
|
|
|
|
try: |
|
if page.url != "about:blank": |
|
print("Taking screenshot of partial page...") |
|
page.screenshot(path=filepath) |
|
context.close() |
|
if os.path.exists(filepath): |
|
print(f"Partial screenshot saved to {filepath}") |
|
return filepath |
|
except Exception as screenshot_error: |
|
print(f"Failed to take partial screenshot: {screenshot_error}") |
|
|
|
context.close() |
|
|
|
|
|
if attempt == max_retries - 1: |
|
raise nav_error |
|
else: |
|
print(f"Retrying with different approach...") |
|
time.sleep(2) |
|
continue |
|
|
|
except Exception as e: |
|
print(f"[ERROR] Attempt {attempt + 1} failed: {e}") |
|
if attempt == max_retries - 1: |
|
print(f"All {max_retries} attempts failed for URL: {url}") |
|
traceback.print_exc() |
|
return None |
|
else: |
|
print("Waiting before next attempt...") |
|
time.sleep(3) |
|
continue |
|
|
|
return None |
|
|
|
def resize_if_needed(image_path, max_mb=1, target_width=720): |
|
file_size = os.path.getsize(image_path) / (1024 * 1024) |
|
if file_size > max_mb: |
|
try: |
|
with Image.open(image_path) as img: |
|
width, height = img.size |
|
if width > target_width: |
|
ratio = target_width / float(width) |
|
new_height = int((float(height) * float(ratio))) |
|
img = img.resize((target_width, new_height), Image.Resampling.LANCZOS) |
|
img.save(image_path, optimize=True, quality=85) |
|
print(f"Image resized to {target_width}x{new_height}") |
|
except Exception as e: |
|
print(f"Resize error: {e}") |
|
|
|
def extract_text_from_image(image_path): |
|
try: |
|
resize_if_needed(image_path, max_mb=1, target_width=720) |
|
|
|
|
|
text = pytesseract.image_to_string(Image.open(image_path), lang='ind') |
|
print(f"OCR text extracted with Tesseract: {len(text)} characters") |
|
|
|
return text.strip() |
|
except Exception as e: |
|
print(f"Tesseract OCR error: {e}") |
|
return "" |
|
|
|
def prepare_data_for_model(image_path, text): |
|
image = Image.open(image_path) |
|
image_tensor = transform(image).unsqueeze(0).to(device) |
|
|
|
clean_text_data = clean_text(text) |
|
encoding = tokenizer.encode_plus( |
|
clean_text_data, |
|
add_special_tokens=True, |
|
max_length=128, |
|
padding='max_length', |
|
truncation=True, |
|
return_tensors='pt' |
|
) |
|
|
|
input_ids = encoding['input_ids'].to(device) |
|
attention_mask = encoding['attention_mask'].to(device) |
|
|
|
return image_tensor, input_ids, attention_mask |
|
|
|
def predict_single_url(url): |
|
if not url.startswith(('http://', 'https://')): |
|
url = 'https://' + url |
|
|
|
screenshot_path = take_screenshot(url) |
|
if not screenshot_path: |
|
return f"Error: Failed to take screenshot for {url}", None, None, None, None |
|
|
|
raw_text = extract_text_from_image(screenshot_path) |
|
cleaned_text = clean_text(raw_text) if raw_text.strip() else "" |
|
|
|
if not raw_text.strip(): |
|
print(f"No OCR text found for {url}. Using Image-Only Model.") |
|
image = Image.open(screenshot_path) |
|
image_tensor = transform(image).unsqueeze(0).to(device) |
|
|
|
with torch.no_grad(): |
|
image_logits = image_only_model(image_tensor).squeeze(1) |
|
image_probs = torch.sigmoid(image_logits) |
|
|
|
threshold = 0.6 |
|
is_gambling = image_probs[0] > threshold |
|
|
|
label = "Gambling" if is_gambling else "Non-Gambling" |
|
confidence = image_probs[0].item() if is_gambling else 1 - image_probs[0].item() |
|
print(f"[Image-Only] URL: {url}") |
|
print(f"Prediction: {label} | Confidence: {confidence:.2f}\n") |
|
return label, f"Confidence: {confidence:.2f}", screenshot_path, raw_text, cleaned_text |
|
|
|
else: |
|
image_tensor, input_ids, attention_mask = prepare_data_for_model(screenshot_path, raw_text) |
|
|
|
with torch.no_grad(): |
|
fused_logits, image_logits, text_logits, weights = fusion_model(image_tensor, input_ids, attention_mask) |
|
fused_probs = torch.sigmoid(fused_logits) |
|
image_probs = torch.sigmoid(image_logits) |
|
text_probs = torch.sigmoid(text_logits) |
|
|
|
threshold = 0.6 |
|
is_gambling = fused_probs[0] > threshold |
|
|
|
label = "Gambling" if is_gambling else "Non-Gambling" |
|
confidence = fused_probs[0].item() if is_gambling else 1 - fused_probs[0].item() |
|
|
|
|
|
print(f"[Fusion Model] URL: {url}") |
|
print(f"Image Model Prediction Probability: {image_probs[0]:.2f}") |
|
print(f"Text Model Prediction Probability: {text_probs[0]:.2f}") |
|
print(f"Fusion Final Prediction: {label} | Confidence: {confidence:.2f}\n") |
|
|
|
return label, f"Confidence: {confidence:.2f}", screenshot_path, raw_text, cleaned_text |
|
|
|
def predict_batch_urls(file_obj): |
|
results = [] |
|
content = file_obj.read().decode('utf-8') |
|
urls = [line.strip() for line in content.splitlines() if line.strip()] |
|
for url in urls: |
|
label, confidence, screenshot_path, raw_text, cleaned_text = predict_single_url(url) |
|
results.append({"url": url, "label": label, "confidence": confidence, "screenshot_path": screenshot_path, "raw_text": raw_text, "cleaned_text": cleaned_text}) |
|
|
|
df = pd.DataFrame(results) |
|
print(f"Batch prediction completed for {len(urls)} URLs.") |
|
return df |
|
|
|
|
|
|
|
with gr.Blocks() as app: |
|
gr.Markdown("# 🕵️ Gambling Website Detection (URL Based)") |
|
gr.Markdown("### Using Playwright & Tesseract OCR") |
|
|
|
with gr.Tab("Single URL"): |
|
url_input = gr.Textbox(label="Enter Website URL") |
|
predict_button = gr.Button("Predict") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
label_output = gr.Label() |
|
confidence_output = gr.Textbox(label="Confidence", interactive=False) |
|
|
|
with gr.Column(): |
|
screenshot_output = gr.Image(label="Screenshot", type="filepath") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
raw_text_output = gr.Textbox(label="Raw OCR Text", lines=5) |
|
with gr.Column(): |
|
cleaned_text_output = gr.Textbox(label="Cleaned Text", lines=5) |
|
|
|
predict_button.click( |
|
fn=predict_single_url, |
|
inputs=url_input, |
|
outputs=[label_output, confidence_output, screenshot_output, raw_text_output, cleaned_text_output] |
|
) |
|
|
|
with gr.Tab("Batch URLs"): |
|
file_input = gr.File(label="Upload .txt file with URLs (one per line)") |
|
batch_predict_button = gr.Button("Batch Predict") |
|
batch_output = gr.DataFrame() |
|
|
|
batch_predict_button.click(fn=predict_batch_urls, inputs=file_input, outputs=batch_output) |
|
|
|
if __name__ == "__main__": |
|
app.launch(server_name="0.0.0.0", server_port=7860) |