import requests from lxml import html from collections import deque import json import time import os # --- Choose your Selenium setup --- # OPTION A: Standard Selenium from selenium import webdriver from selenium.webdriver.chrome.service import Service # OPTION B: undetected_chromedriver (Uncomment these if you want to use UC) # import undetected_chromedriver as uc from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException from PIL import Image from io import BytesIO def set_screenshot(driver, images=[]): png = driver.get_screenshot_as_png() image = Image.open(BytesIO(png)) images.append(image) return images def get_chrome_options(): options = webdriver.ChromeOptions() options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') return options def set_driver(): options = get_chrome_options() try: web_driver = webdriver.Chrome(options=options) web_driver.set_window_size(1080, 720) # Adjust the window size here except WebDriverException as e: return Image.new('RGB', (1, 1)) return web_driver # --- Selenium setup functions (choose one based on your choice above) --- # OPTION A: Standard Selenium (Use this if you prefer standard selenium) # def get_chrome_options(): # options = webdriver.ChromeOptions() # options.add_argument("--headless") # options.add_argument("--no-sandbox") # options.add_argument("--disable-gpu") # options.add_argument("--disable-dev-shm-usage") # options.add_argument("--window-size=1920,1080") # options.add_argument( # "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36") # return options def create_webdriver_instance(browser_type="chrome"): if browser_type.lower() == "chrome": chrome_options = get_chrome_options() try: # Assumes chromedriver is in PATH or specified path (e.g., /usr/bin/chromedriver on GitHub Actions) service = Service(executable_path="/usr/bin/chromedriver") driver = webdriver.Chrome(service=service, options=chrome_options) return driver except WebDriverException as e: print(f"Error initializing ChromeDriver. Error: {e}") return None else: raise ValueError("Unsupported browser type.") # OPTION B: undetected_chromedriver (Uncomment this block and comment OPTION A if you want to use UC) # def get_chrome_options(): # options = uc.ChromeOptions() # options.add_argument("--headless") # options.add_argument("--no-sandbox") # options.add_argument("--disable-gpu") # options.add_argument("--disable-dev-shm-usage") # options.add_argument("--window-size=1920,1080") # return options # def create_webdriver_instance(browser_type="chrome"): # if browser_type.lower() == "chrome": # chrome_options = get_chrome_options() # try: # driver = uc.Chrome(options=chrome_options) # return driver # except WebDriverException as e: # print(f"Error initializing undetected_chromedriver. Error: {e}") # return None # else: # raise ValueError("Unsupported browser type.") # --- Resumable Crawling Logic --- def save_crawl_state(to_visit_deque, visited_set, song_urls_list, state_filename="crawl_state.json", song_pages_json_file="pagalgana_song_pages.json"): """Saves the current state of the crawler to JSON files.""" try: with open(song_pages_json_file, 'w', encoding='utf-8') as f: json.dump(song_urls_list, f, indent=4) crawl_state_data = { "to_visit": list(to_visit_deque), "visited_urls": list(visited_set) } with open(state_filename, 'w', encoding='utf-8') as f: json.dump(crawl_state_data, f, indent=4) print( f"--- Crawl state saved. URLs to visit: {len(to_visit_deque)}, Visited: {len(visited_set)}, Song pages found: {len(song_urls_list)} ---") except IOError as e: print(f"Error saving crawl state: {e}") except Exception as e: print(f"An unexpected error occurred while saving state: {e}") def load_crawl_state(state_filename="crawl_state.json", song_pages_json_file="pagalgana_song_pages.json"): """Loads previous crawl state if files exist.""" to_visit_deque = deque() visited_set = set() song_urls_list = [] if os.path.exists(song_pages_json_file): try: with open(song_pages_json_file, 'r', encoding='utf-8') as f: song_urls_list = json.load(f) print(f"Loaded {len(song_urls_list)} song URLs from '{song_pages_json_file}'.") except json.JSONDecodeError: print(f"Warning: '{song_pages_json_file}' corrupted or empty. Starting fresh song list.") song_urls_list = [] except Exception as e: print(f"Error loading '{song_pages_json_file}': {e}") if os.path.exists(state_filename): try: with open(state_filename, 'r', encoding='utf-8') as f: crawl_state_data = json.load(f) to_visit_deque = deque(crawl_state_data.get("to_visit", [])) visited_set = set(crawl_state_data.get("visited_urls", [])) print(f"Loaded crawl state: {len(to_visit_deque)} URLs to visit, {len(visited_set)} visited.") except json.JSONDecodeError: print(f"Warning: '{state_filename}' corrupted or empty. Starting fresh state.") to_visit_deque = deque() visited_set = set() except Exception as e: print(f"Error loading '{state_filename}': {e}") return to_visit_deque, visited_set, song_urls_list def crawl_pagalgana_site(base_url: str, song_pages_json_file: str, max_crawl_depth: int, state_filename: str, save_interval: int,images): """ Crawls Pagalgana.com to find and save song page URLs. Supports resuming a crawl. """ # driver = create_webdriver_instance() driver = set_driver() if not driver: print("Failed to initialize WebDriver. Exiting.") return [] # Return empty list if WebDriver fails to_visit, visited_urls, song_page_urls = load_crawl_state(state_filename, song_pages_json_file) if not to_visit and not visited_urls: print("No previous crawl state found. Starting fresh.") to_visit.append((base_url, 0)) else: print("Resuming crawl from previous state.") if base_url not in visited_urls and (base_url, 0) not in to_visit: to_visit.appendleft((base_url, 0)) AUDIO_CONTAINER_XPATH = '//*[@id="audio-container"]' LOAD_MORE_BUTTON_XPATH = '//a[@class="button" and contains(@onclick, "loadMoreCategory")]' print(f"Starting/Resuming crawl with base: {base_url}, max depth: {max_crawl_depth}") print( f"Initial Queue size: {len(to_visit)}, Initial Visited size: {len(visited_urls)}, Song page URLs: {len(song_page_urls)}") processed_count = 0 while to_visit: current_url, current_depth = to_visit.popleft() if current_url in visited_urls: continue if current_depth > max_crawl_depth: print(f"Skipping {current_url} - max depth reached ({max_crawl_depth})") continue print(f"\n--- Visiting ({current_depth}): {current_url} ---") visited_urls.add(current_url) processed_count += 1 try: driver.get(current_url) time.sleep(3) # Give page more time to load and execute JS print(f" Page title: {driver.title}") print(f" Current URL after load: {driver.current_url}") images=set_screenshot(driver=driver,images=images) # Optional: print HTML snippet for debugging. Remove for cleaner logs in production. # print(" --- HTML snippet (first 2000 chars) ---") # print(driver.page_source[:2000]) # print(" --- End HTML snippet ---") # Check for Cloudflare challenge (if using standard Selenium) if "Attention Required" in driver.title or "cloudflare" in driver.page_source.lower(): print( " --> Cloudflare challenge detected! Try switching to undetected_chromedriver or add a longer sleep.") print(" --> Skipping current URL due to Cloudflare challenge.") images = set_screenshot(driver=driver, images=images) continue # Skip this URL if Cloudflare is blocking it # Check if it's a song page audio_container_elements = driver.find_elements(By.XPATH, AUDIO_CONTAINER_XPATH) if audio_container_elements: print(f" --> FOUND AUDIO CONTAINER! This is a song page: {current_url}") if current_url not in song_page_urls: song_page_urls.append(current_url) # Handle "Load More" button if present load_more_found_and_clicked = False while True: try: load_more_button = WebDriverWait(driver, 15).until( EC.element_to_be_clickable((By.XPATH, LOAD_MORE_BUTTON_XPATH)) ) last_height = driver.execute_script("return document.body.scrollHeight") print(" Clicking 'Load More' button...") load_more_button.click() load_more_found_and_clicked = True new_height = last_height scroll_attempts = 0 while new_height == last_height and scroll_attempts < 7: time.sleep(2) new_height = driver.execute_script("return document.body.scrollHeight") scroll_attempts += 1 if new_height == last_height: print(" No more content loaded after click, or button disappeared.") break except (NoSuchElementException, TimeoutException): if not load_more_found_and_clicked: print(" 'Load More' button not found or not clickable.") else: print(" 'Load More' button no longer present (all content likely loaded).") break except Exception as e: print(f" Error clicking 'Load More': {e}") break # After all content is loaded, parse the HTML tree = html.fromstring(driver.page_source) # Extract nested links from the fully loaded page links = tree.xpath('//a/@href') print(f" Found {len(links)} raw links on the page.") links_added_to_queue = 0 for link in links: absolute_url = requests.compat.urljoin(current_url, link) if "pagalgana.com" in absolute_url and "#" not in absolute_url and "?" not in absolute_url: if not (absolute_url.endswith( ('.mp3', '.zip', '.rar', '.jpg', '.png', '.gif', '.pdf', '.txt', '.xml', '.css', '.js'))): if absolute_url not in visited_urls and (absolute_url, current_depth + 1) not in to_visit: if absolute_url not in song_page_urls: # Don't re-add if already identified as a song page to_visit.append((absolute_url, current_depth + 1)) links_added_to_queue += 1 # print(f" Added {links_added_to_queue} new valid links to the queue from {current_url}.") except Exception as e: print(f" An unexpected error occurred for {current_url}: {e}") finally: if processed_count % save_interval == 0: print(f"--- Processed {processed_count} pages. Saving current crawl state... ---") save_crawl_state(to_visit, visited_urls, song_page_urls, state_filename, song_pages_json_file) driver.quit() print("\n--- Crawl finished. Performing final save of song page URLs. ---") save_crawl_state(to_visit, visited_urls, song_page_urls, state_filename, song_pages_json_file) print(f"\nCrawl complete. Total {len(song_page_urls)} song pages found and saved to '{song_pages_json_file}'.") images = set_screenshot(driver=driver, images=images) return song_page_urls,images # Return the list of discovered song pages # This __name__ block is for testing `crawler.py` independently if __name__ == "__main__": # Example usage for standalone testing of the crawler # When run via main_script.py, this block won't execute images=[] discovered_urls,images = crawl_pagalgana_site( base_url="https://pagalgana.com/category/bollywood-mp3-songs.html", song_pages_json_file="bollywood_song_pages.json", state_filename="bollywood_crawl_state.json", max_crawl_depth=2, # Keep low for testing save_interval=5, images=images ) print(f"Crawler finished. Discovered {len(discovered_urls)} song URLs.")