File size: 13,554 Bytes
5ca8483 58b06a1 5ca8483 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
import requests
from lxml import html
from collections import deque
import json
import time
import os
# --- Choose your Selenium setup ---
# OPTION A: Standard Selenium
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
# OPTION B: undetected_chromedriver (Uncomment these if you want to use UC)
# import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException
from PIL import Image
from io import BytesIO
def set_screenshot(driver, images=[]):
png = driver.get_screenshot_as_png()
image = Image.open(BytesIO(png))
images.append(image)
return images
def get_chrome_options():
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
return options
def set_driver():
options = get_chrome_options()
try:
web_driver = webdriver.Chrome(options=options)
web_driver.set_window_size(1080, 720) # Adjust the window size here
except WebDriverException as e:
return Image.new('RGB', (1, 1))
return web_driver
# --- Selenium setup functions (choose one based on your choice above) ---
# OPTION A: Standard Selenium (Use this if you prefer standard selenium)
# def get_chrome_options():
# options = webdriver.ChromeOptions()
# options.add_argument("--headless")
# options.add_argument("--no-sandbox")
# options.add_argument("--disable-gpu")
# options.add_argument("--disable-dev-shm-usage")
# options.add_argument("--window-size=1920,1080")
# options.add_argument(
# "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
# return options
def create_webdriver_instance(browser_type="chrome"):
if browser_type.lower() == "chrome":
chrome_options = get_chrome_options()
try:
# Assumes chromedriver is in PATH or specified path (e.g., /usr/bin/chromedriver on GitHub Actions)
service = Service(executable_path="/usr/bin/chromedriver")
driver = webdriver.Chrome(service=service, options=chrome_options)
return driver
except WebDriverException as e:
print(f"Error initializing ChromeDriver. Error: {e}")
return None
else:
raise ValueError("Unsupported browser type.")
# OPTION B: undetected_chromedriver (Uncomment this block and comment OPTION A if you want to use UC)
# def get_chrome_options():
# options = uc.ChromeOptions()
# options.add_argument("--headless")
# options.add_argument("--no-sandbox")
# options.add_argument("--disable-gpu")
# options.add_argument("--disable-dev-shm-usage")
# options.add_argument("--window-size=1920,1080")
# return options
# def create_webdriver_instance(browser_type="chrome"):
# if browser_type.lower() == "chrome":
# chrome_options = get_chrome_options()
# try:
# driver = uc.Chrome(options=chrome_options)
# return driver
# except WebDriverException as e:
# print(f"Error initializing undetected_chromedriver. Error: {e}")
# return None
# else:
# raise ValueError("Unsupported browser type.")
# --- Resumable Crawling Logic ---
def save_crawl_state(to_visit_deque, visited_set, song_urls_list, state_filename="crawl_state.json",
song_pages_json_file="pagalgana_song_pages.json"):
"""Saves the current state of the crawler to JSON files."""
try:
with open(song_pages_json_file, 'w', encoding='utf-8') as f:
json.dump(song_urls_list, f, indent=4)
crawl_state_data = {
"to_visit": list(to_visit_deque),
"visited_urls": list(visited_set)
}
with open(state_filename, 'w', encoding='utf-8') as f:
json.dump(crawl_state_data, f, indent=4)
print(
f"--- Crawl state saved. URLs to visit: {len(to_visit_deque)}, Visited: {len(visited_set)}, Song pages found: {len(song_urls_list)} ---")
except IOError as e:
print(f"Error saving crawl state: {e}")
except Exception as e:
print(f"An unexpected error occurred while saving state: {e}")
def load_crawl_state(state_filename="crawl_state.json", song_pages_json_file="pagalgana_song_pages.json"):
"""Loads previous crawl state if files exist."""
to_visit_deque = deque()
visited_set = set()
song_urls_list = []
if os.path.exists(song_pages_json_file):
try:
with open(song_pages_json_file, 'r', encoding='utf-8') as f:
song_urls_list = json.load(f)
print(f"Loaded {len(song_urls_list)} song URLs from '{song_pages_json_file}'.")
except json.JSONDecodeError:
print(f"Warning: '{song_pages_json_file}' corrupted or empty. Starting fresh song list.")
song_urls_list = []
except Exception as e:
print(f"Error loading '{song_pages_json_file}': {e}")
if os.path.exists(state_filename):
try:
with open(state_filename, 'r', encoding='utf-8') as f:
crawl_state_data = json.load(f)
to_visit_deque = deque(crawl_state_data.get("to_visit", []))
visited_set = set(crawl_state_data.get("visited_urls", []))
print(f"Loaded crawl state: {len(to_visit_deque)} URLs to visit, {len(visited_set)} visited.")
except json.JSONDecodeError:
print(f"Warning: '{state_filename}' corrupted or empty. Starting fresh state.")
to_visit_deque = deque()
visited_set = set()
except Exception as e:
print(f"Error loading '{state_filename}': {e}")
return to_visit_deque, visited_set, song_urls_list
def crawl_pagalgana_site(base_url: str, song_pages_json_file: str, max_crawl_depth: int, state_filename: str,
save_interval: int,images):
"""
Crawls Pagalgana.com to find and save song page URLs.
Supports resuming a crawl.
"""
# driver = create_webdriver_instance()
driver = set_driver()
if not driver:
print("Failed to initialize WebDriver. Exiting.")
return [] # Return empty list if WebDriver fails
to_visit, visited_urls, song_page_urls = load_crawl_state(state_filename, song_pages_json_file)
if not to_visit and not visited_urls:
print("No previous crawl state found. Starting fresh.")
to_visit.append((base_url, 0))
else:
print("Resuming crawl from previous state.")
if base_url not in visited_urls and (base_url, 0) not in to_visit:
to_visit.appendleft((base_url, 0))
AUDIO_CONTAINER_XPATH = '//*[@id="audio-container"]'
LOAD_MORE_BUTTON_XPATH = '//a[@class="button" and contains(@onclick, "loadMoreCategory")]'
print(f"Starting/Resuming crawl with base: {base_url}, max depth: {max_crawl_depth}")
print(
f"Initial Queue size: {len(to_visit)}, Initial Visited size: {len(visited_urls)}, Song page URLs: {len(song_page_urls)}")
processed_count = 0
while to_visit:
current_url, current_depth = to_visit.popleft()
if current_url in visited_urls:
continue
if current_depth > max_crawl_depth:
print(f"Skipping {current_url} - max depth reached ({max_crawl_depth})")
continue
print(f"\n--- Visiting ({current_depth}): {current_url} ---")
visited_urls.add(current_url)
processed_count += 1
try:
driver.get(current_url)
time.sleep(3) # Give page more time to load and execute JS
print(f" Page title: {driver.title}")
print(f" Current URL after load: {driver.current_url}")
images=set_screenshot(driver=driver,images=images)
# Optional: print HTML snippet for debugging. Remove for cleaner logs in production.
# print(" --- HTML snippet (first 2000 chars) ---")
# print(driver.page_source[:2000])
# print(" --- End HTML snippet ---")
# Check for Cloudflare challenge (if using standard Selenium)
if "Attention Required" in driver.title or "cloudflare" in driver.page_source.lower():
print(
" --> Cloudflare challenge detected! Try switching to undetected_chromedriver or add a longer sleep.")
print(" --> Skipping current URL due to Cloudflare challenge.")
images = set_screenshot(driver=driver, images=images)
continue # Skip this URL if Cloudflare is blocking it
# Check if it's a song page
audio_container_elements = driver.find_elements(By.XPATH, AUDIO_CONTAINER_XPATH)
if audio_container_elements:
print(f" --> FOUND AUDIO CONTAINER! This is a song page: {current_url}")
if current_url not in song_page_urls:
song_page_urls.append(current_url)
# Handle "Load More" button if present
load_more_found_and_clicked = False
while True:
try:
load_more_button = WebDriverWait(driver, 15).until(
EC.element_to_be_clickable((By.XPATH, LOAD_MORE_BUTTON_XPATH))
)
last_height = driver.execute_script("return document.body.scrollHeight")
print(" Clicking 'Load More' button...")
load_more_button.click()
load_more_found_and_clicked = True
new_height = last_height
scroll_attempts = 0
while new_height == last_height and scroll_attempts < 7:
time.sleep(2)
new_height = driver.execute_script("return document.body.scrollHeight")
scroll_attempts += 1
if new_height == last_height:
print(" No more content loaded after click, or button disappeared.")
break
except (NoSuchElementException, TimeoutException):
if not load_more_found_and_clicked:
print(" 'Load More' button not found or not clickable.")
else:
print(" 'Load More' button no longer present (all content likely loaded).")
break
except Exception as e:
print(f" Error clicking 'Load More': {e}")
break
# After all content is loaded, parse the HTML
tree = html.fromstring(driver.page_source)
# Extract nested links from the fully loaded page
links = tree.xpath('//a/@href')
print(f" Found {len(links)} raw links on the page.")
links_added_to_queue = 0
for link in links:
absolute_url = requests.compat.urljoin(current_url, link)
if "pagalgana.com" in absolute_url and "#" not in absolute_url and "?" not in absolute_url:
if not (absolute_url.endswith(
('.mp3', '.zip', '.rar', '.jpg', '.png', '.gif', '.pdf', '.txt', '.xml', '.css', '.js'))):
if absolute_url not in visited_urls and (absolute_url, current_depth + 1) not in to_visit:
if absolute_url not in song_page_urls: # Don't re-add if already identified as a song page
to_visit.append((absolute_url, current_depth + 1))
links_added_to_queue += 1
# print(f" Added {links_added_to_queue} new valid links to the queue from {current_url}.")
except Exception as e:
print(f" An unexpected error occurred for {current_url}: {e}")
finally:
if processed_count % save_interval == 0:
print(f"--- Processed {processed_count} pages. Saving current crawl state... ---")
save_crawl_state(to_visit, visited_urls, song_page_urls, state_filename, song_pages_json_file)
driver.quit()
print("\n--- Crawl finished. Performing final save of song page URLs. ---")
save_crawl_state(to_visit, visited_urls, song_page_urls, state_filename, song_pages_json_file)
print(f"\nCrawl complete. Total {len(song_page_urls)} song pages found and saved to '{song_pages_json_file}'.")
images = set_screenshot(driver=driver, images=images)
return song_page_urls,images # Return the list of discovered song pages
# This __name__ block is for testing `crawler.py` independently
if __name__ == "__main__":
# Example usage for standalone testing of the crawler
# When run via main_script.py, this block won't execute
images=[]
discovered_urls,images = crawl_pagalgana_site(
base_url="https://pagalgana.com/category/bollywood-mp3-songs.html",
song_pages_json_file="bollywood_song_pages.json",
state_filename="bollywood_crawl_state.json",
max_crawl_depth=2, # Keep low for testing
save_interval=5,
images=images
)
print(f"Crawler finished. Discovered {len(discovered_urls)} song URLs.")
|