import requests
from lxml import html
from collections import deque
import json
import time
import os
# --- Choose your Selenium setup ---
# OPTION A: Standard Selenium
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
# OPTION B: undetected_chromedriver (Uncomment these if you want to use UC)
# import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException
from PIL import Image
from io import BytesIO
def set_screenshot(driver, images=[]):
png = driver.get_screenshot_as_png()
image = Image.open(BytesIO(png))
images.append(image)
return images
def get_chrome_options():
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
return options
def set_driver():
options = get_chrome_options()
try:
web_driver = webdriver.Chrome(options=options)
web_driver.set_window_size(1080, 720) # Adjust the window size here
except WebDriverException as e:
return Image.new('RGB', (1, 1))
return web_driver
# --- Selenium setup functions (choose one based on your choice above) ---
# OPTION A: Standard Selenium (Use this if you prefer standard selenium)
# def get_chrome_options():
# options = webdriver.ChromeOptions()
# options.add_argument("--headless")
# options.add_argument("--no-sandbox")
# options.add_argument("--disable-gpu")
# options.add_argument("--disable-dev-shm-usage")
# options.add_argument("--window-size=1920,1080")
# options.add_argument(
# "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
# return options
def create_webdriver_instance(browser_type="chrome"):
if browser_type.lower() == "chrome":
chrome_options = get_chrome_options()
try:
# Assumes chromedriver is in PATH or specified path (e.g., /usr/bin/chromedriver on GitHub Actions)
service = Service(executable_path="/usr/bin/chromedriver")
driver = webdriver.Chrome(service=service, options=chrome_options)
return driver
except WebDriverException as e:
print(f"Error initializing ChromeDriver. Error: {e}")
return None
else:
raise ValueError("Unsupported browser type.")
# OPTION B: undetected_chromedriver (Uncomment this block and comment OPTION A if you want to use UC)
# def get_chrome_options():
# options = uc.ChromeOptions()
# options.add_argument("--headless")
# options.add_argument("--no-sandbox")
# options.add_argument("--disable-gpu")
# options.add_argument("--disable-dev-shm-usage")
# options.add_argument("--window-size=1920,1080")
# return options
# def create_webdriver_instance(browser_type="chrome"):
# if browser_type.lower() == "chrome":
# chrome_options = get_chrome_options()
# try:
# driver = uc.Chrome(options=chrome_options)
# return driver
# except WebDriverException as e:
# print(f"Error initializing undetected_chromedriver. Error: {e}")
# return None
# else:
# raise ValueError("Unsupported browser type.")
# --- Resumable Crawling Logic ---
def save_crawl_state(to_visit_deque, visited_set, song_urls_list, state_filename="crawl_state.json",
song_pages_json_file="pagalgana_song_pages.json"):
"""Saves the current state of the crawler to JSON files."""
try:
with open(song_pages_json_file, 'w', encoding='utf-8') as f:
json.dump(song_urls_list, f, indent=4)
crawl_state_data = {
"to_visit": list(to_visit_deque),
"visited_urls": list(visited_set)
}
with open(state_filename, 'w', encoding='utf-8') as f:
json.dump(crawl_state_data, f, indent=4)
print(
f"--- Crawl state saved. URLs to visit: {len(to_visit_deque)}, Visited: {len(visited_set)}, Song pages found: {len(song_urls_list)} ---")
except IOError as e:
print(f"Error saving crawl state: {e}")
except Exception as e:
print(f"An unexpected error occurred while saving state: {e}")
def load_crawl_state(state_filename="crawl_state.json", song_pages_json_file="pagalgana_song_pages.json"):
"""Loads previous crawl state if files exist."""
to_visit_deque = deque()
visited_set = set()
song_urls_list = []
if os.path.exists(song_pages_json_file):
try:
with open(song_pages_json_file, 'r', encoding='utf-8') as f:
song_urls_list = json.load(f)
print(f"Loaded {len(song_urls_list)} song URLs from '{song_pages_json_file}'.")
except json.JSONDecodeError:
print(f"Warning: '{song_pages_json_file}' corrupted or empty. Starting fresh song list.")
song_urls_list = []
except Exception as e:
print(f"Error loading '{song_pages_json_file}': {e}")
if os.path.exists(state_filename):
try:
with open(state_filename, 'r', encoding='utf-8') as f:
crawl_state_data = json.load(f)
to_visit_deque = deque(crawl_state_data.get("to_visit", []))
visited_set = set(crawl_state_data.get("visited_urls", []))
print(f"Loaded crawl state: {len(to_visit_deque)} URLs to visit, {len(visited_set)} visited.")
except json.JSONDecodeError:
print(f"Warning: '{state_filename}' corrupted or empty. Starting fresh state.")
to_visit_deque = deque()
visited_set = set()
except Exception as e:
print(f"Error loading '{state_filename}': {e}")
return to_visit_deque, visited_set, song_urls_list
def crawl_pagalgana_site(base_url: str, song_pages_json_file: str, max_crawl_depth: int, state_filename: str,
save_interval: int,images):
"""
Crawls Pagalgana.com to find and save song page URLs.
Supports resuming a crawl.
"""
# driver = create_webdriver_instance()
driver = set_driver()
if not driver:
print("Failed to initialize WebDriver. Exiting.")
return [] # Return empty list if WebDriver fails
to_visit, visited_urls, song_page_urls = load_crawl_state(state_filename, song_pages_json_file)
if not to_visit and not visited_urls:
print("No previous crawl state found. Starting fresh.")
to_visit.append((base_url, 0))
else:
print("Resuming crawl from previous state.")
if base_url not in visited_urls and (base_url, 0) not in to_visit:
to_visit.appendleft((base_url, 0))
AUDIO_CONTAINER_XPATH = '//*[@id="audio-container"]'
LOAD_MORE_BUTTON_XPATH = '//a[@class="button" and contains(@onclick, "loadMoreCategory")]'
print(f"Starting/Resuming crawl with base: {base_url}, max depth: {max_crawl_depth}")
print(
f"Initial Queue size: {len(to_visit)}, Initial Visited size: {len(visited_urls)}, Song page URLs: {len(song_page_urls)}")
processed_count = 0
while to_visit:
current_url, current_depth = to_visit.popleft()
if current_url in visited_urls:
continue
if current_depth > max_crawl_depth:
print(f"Skipping {current_url} - max depth reached ({max_crawl_depth})")
continue
print(f"\n--- Visiting ({current_depth}): {current_url} ---")
visited_urls.add(current_url)
processed_count += 1
try:
driver.get(current_url)
time.sleep(3) # Give page more time to load and execute JS
print(f" Page title: {driver.title}")
print(f" Current URL after load: {driver.current_url}")
images=set_screenshot(driver=driver,images=images)
# Optional: print HTML snippet for debugging. Remove for cleaner logs in production.
# print(" --- HTML snippet (first 2000 chars) ---")
# print(driver.page_source[:2000])
# print(" --- End HTML snippet ---")
# Check for Cloudflare challenge (if using standard Selenium)
if "Attention Required" in driver.title or "cloudflare" in driver.page_source.lower():
print(
" --> Cloudflare challenge detected! Try switching to undetected_chromedriver or add a longer sleep.")
print(" --> Skipping current URL due to Cloudflare challenge.")
images = set_screenshot(driver=driver, images=images)
continue # Skip this URL if Cloudflare is blocking it
# Check if it's a song page
audio_container_elements = driver.find_elements(By.XPATH, AUDIO_CONTAINER_XPATH)
if audio_container_elements:
print(f" --> FOUND AUDIO CONTAINER! This is a song page: {current_url}")
if current_url not in song_page_urls:
song_page_urls.append(current_url)
# Handle "Load More" button if present
load_more_found_and_clicked = False
while True:
try:
load_more_button = WebDriverWait(driver, 15).until(
EC.element_to_be_clickable((By.XPATH, LOAD_MORE_BUTTON_XPATH))
)
last_height = driver.execute_script("return document.body.scrollHeight")
print(" Clicking 'Load More' button...")
load_more_button.click()
load_more_found_and_clicked = True
new_height = last_height
scroll_attempts = 0
while new_height == last_height and scroll_attempts < 7:
time.sleep(2)
new_height = driver.execute_script("return document.body.scrollHeight")
scroll_attempts += 1
if new_height == last_height:
print(" No more content loaded after click, or button disappeared.")
break
except (NoSuchElementException, TimeoutException):
if not load_more_found_and_clicked:
print(" 'Load More' button not found or not clickable.")
else:
print(" 'Load More' button no longer present (all content likely loaded).")
break
except Exception as e:
print(f" Error clicking 'Load More': {e}")
break
# After all content is loaded, parse the HTML
tree = html.fromstring(driver.page_source)
# Extract nested links from the fully loaded page
links = tree.xpath('//a/@href')
print(f" Found {len(links)} raw links on the page.")
links_added_to_queue = 0
for link in links:
absolute_url = requests.compat.urljoin(current_url, link)
if "pagalgana.com" in absolute_url and "#" not in absolute_url and "?" not in absolute_url:
if not (absolute_url.endswith(
('.mp3', '.zip', '.rar', '.jpg', '.png', '.gif', '.pdf', '.txt', '.xml', '.css', '.js'))):
if absolute_url not in visited_urls and (absolute_url, current_depth + 1) not in to_visit:
if absolute_url not in song_page_urls: # Don't re-add if already identified as a song page
to_visit.append((absolute_url, current_depth + 1))
links_added_to_queue += 1
# print(f" Added {links_added_to_queue} new valid links to the queue from {current_url}.")
except Exception as e:
print(f" An unexpected error occurred for {current_url}: {e}")
finally:
if processed_count % save_interval == 0:
print(f"--- Processed {processed_count} pages. Saving current crawl state... ---")
save_crawl_state(to_visit, visited_urls, song_page_urls, state_filename, song_pages_json_file)
driver.quit()
print("\n--- Crawl finished. Performing final save of song page URLs. ---")
save_crawl_state(to_visit, visited_urls, song_page_urls, state_filename, song_pages_json_file)
print(f"\nCrawl complete. Total {len(song_page_urls)} song pages found and saved to '{song_pages_json_file}'.")
images = set_screenshot(driver=driver, images=images)
return song_page_urls,images # Return the list of discovered song pages
# This __name__ block is for testing `crawler.py` independently
if __name__ == "__main__":
# Example usage for standalone testing of the crawler
# When run via main_script.py, this block won't execute
images=[]
discovered_urls,images = crawl_pagalgana_site(
base_url="https://pagalgana.com/category/bollywood-mp3-songs.html",
song_pages_json_file="bollywood_song_pages.json",
state_filename="bollywood_crawl_state.json",
max_crawl_depth=2, # Keep low for testing
save_interval=5,
images=images
)
print(f"Crawler finished. Discovered {len(discovered_urls)} song URLs.")