Spaces:
Running
Running
File size: 9,639 Bytes
1949e5b be80984 1949e5b d0ebfee 1949e5b d0ebfee 1949e5b be80984 1949e5b d87848b 1949e5b a34745f 1949e5b a34745f 1949e5b a34745f 1949e5b 09d4ed0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
from typing import List, Dict, Optional
import requests
from bs4 import BeautifulSoup
from urllib.parse import quote, urlparse, parse_qs
import time
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from math import ceil
class GoogleBussinessNews:
BASE_URL = "https://www.google.com/search"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"Mozilla/5.0 (Linux; Android 10; SM-A505F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Mobile Safari/537.36"
]
financial_business_news_domains = [
"economictimes.indiatimes.com",
"business-standard.com"
]
SELECTORS = {
"title": "div.n0jPhd",
"url": "a.WlydOe",
"description": "div.GI74Re",
"date": "div.rbYSKb",
"source": "div.NUnG9d"
}
def __init__(self, financial_business_news_domains, max_articles: int = 50, max_retries: int = 3):
"""
Initialize the scraper with configuration options.
"""
self.article_per_pages = 100
self.max_pages = ceil(max_articles/self.article_per_pages)
self.max_articles = max_articles
self.max_retries = max_retries
self.financial_business_news_domains = financial_business_news_domains
self.proxies = [
# {"http": "http://207.244.217.165:6712"},
]
def construct_url(
self,
query: str,
start_date: datetime = None,
end_date: datetime = None,
page: int = 0,
hl: str = "en",
lr: str = "lang_en",
num: int = None,
sort_by_date: bool = False
) -> str:
if num is None:
num = self.article_per_pages
if start_date is None:
start_date = datetime.today() - timedelta(days=1)
if end_date is None:
end_date = datetime.today() # Current date
date_filter = (
f"cdr:1,"
f"cd_min:{start_date.strftime('%m/%d/%Y')},"
f"cd_max:{end_date.strftime('%m/%d/%Y')}"
)
tbs_parts = [date_filter]
if sort_by_date:
tbs_parts.append("sbd:1")
params = {
"q": quote(query+" "+" OR ".join([f'site:{x}' for x in self.financial_business_news_domains])),
"tbm": "nws",
"tbs": ",".join(tbs_parts),
"start": page * num,
"hl": hl,
"lr": lr,
"num": str(num),
}
# Build URL
return f"{self.BASE_URL}?{'&'.join([f'{k}={v}' for k, v in params.items()])}"
def get_random_delay(self) -> float:
"""Generate a longer random delay between requests to avoid detection."""
return random.uniform(5, 15)
def get_headers(self):
"""Return a random User-Agent."""
return {
"User-Agent": random.choice(self.USER_AGENTS),
"Accept-Language": "en-US,en;q=0.9",
}
def is_captcha_page(self, html: str) -> bool:
"""Check if the response contains a CAPTCHA."""
return "Our systems have detected unusual traffic" in html
def parse_date(self, date_str: Optional[str]) -> Optional[str]:
"""
Convert relative date strings (e.g., '1 day ago', '2 weeks ago', '1 month ago')
or absolute date strings ('24 Mar 2023', '2023-03-24') to YYYY-MM-DD format.
"""
if not date_str:
return None
date_str = date_str.lower().strip()
today = datetime.today()
try:
if "ago" in date_str:
date_str = date_str.replace("ago", "").strip()
if "hour" in date_str or "minute" in date_str or "second" in date_str:
return today.strftime("%Y-%m-%d")
if "day" in date_str:
days = int(date_str.split()[0])
return (today - timedelta(days=days)).strftime("%Y-%m-%d")
if "week" in date_str:
weeks = int(date_str.split()[0])
return (today - timedelta(weeks=weeks)).strftime("%Y-%m-%d")
if "month" in date_str:
months = int(date_str.split()[0])
return (today - relativedelta(months=months)).strftime("%Y-%m-%d")
if "year" in date_str:
years = int(date_str.split()[0])
return (today - relativedelta(years=years)).strftime("%Y-%m-%d")
try:
return datetime.strptime(date_str, "%Y-%m-%d").strftime("%Y-%m-%d")
except ValueError:
pass
try:
return datetime.strptime(date_str, "%d %b %Y").strftime("%Y-%m-%d") # e.g., "24 Mar 2023"
except ValueError:
pass
try:
return datetime.strptime(date_str, "%d %B %Y").strftime("%Y-%m-%d") # e.g., "24 March 2023"
except ValueError:
pass
except Exception as e:
print(f"Failed to parse date '{date_str}': {e}")
return None
def extract_articles(self, html: str) -> List[Dict[str, Optional[str]]]:
"""Extract article details from the HTML."""
soup = BeautifulSoup(html, "html.parser")
articles = []
for container in soup.find_all("div", class_="SoaBEf"):
article = {
"title": self._safe_extract(container, self.SELECTORS["title"], "text"),
"url": self._clean_url(self._safe_extract(container, self.SELECTORS["url"], "href")),
"source": self._safe_extract(container, self.SELECTORS["source"], "text"),
"date": self.parse_date(self._safe_extract(container, self.SELECTORS["date"], "text")),
"description": self._safe_extract(container, self.SELECTORS["description"], "text"),
}
if article["url"]:
articles.append(article)
return articles
def _clean_url(self, url: Optional[str]) -> Optional[str]:
"""Clean and extract the actual URL from Google's redirect links."""
if url and url.startswith("/url?"):
parsed = urlparse(url)
qs = parse_qs(parsed.query)
return qs.get("q", [url])[0]
return url
def _safe_extract(self, parent, selector: str, attr: str) -> Optional[str]:
"""Safely extract text or attributes from an element."""
try:
element = parent.select_one(selector)
if not element:
return None
if attr == "text":
return element.get_text().strip()
return element.get(attr, "")
except Exception as e:
print(f"Failed to extract {selector}: {e}")
return None
def scrape(self, query: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Optional[str]]]:
"""
Scrape Google News articles based on the query and date range.
"""
all_articles = []
empty_page_count = 0
for page in range(self.max_pages):
if len(all_articles) >= self.max_articles:
print(f"Reached article limit ({self.max_articles}). Stopping.")
break
time.sleep(self.get_random_delay())
url = self.construct_url(query, start_date, end_date, page)
retries = 0
while retries < self.max_retries:
try:
print(f"Fetching page {page + 1}: {url}")
response = requests.get(
url,
headers=self.get_headers(),
proxies=random.choice(self.proxies) if self.proxies else None,
timeout=30,
)
response.raise_for_status()
if self.is_captcha_page(response.text):
print("CAPTCHA detected. Stopping scraping.")
return all_articles
articles = self.extract_articles(response.text)
if not articles:
empty_page_count += 1
print(f"No articles found on page {page + 1}. Empty count: {empty_page_count}")
if empty_page_count >= 2: # Stop if two consecutive pages are empty
print("No more articles found. Stopping.")
return all_articles
else:
empty_page_count = 0 # Reset if we find articles
all_articles.extend(articles)
print(f"Page {page + 1}: Added {len(articles)} articles")
break
except requests.exceptions.RequestException as e:
retries += 1
print(f"Request failed (attempt {retries}/{self.max_retries}): {e}")
if retries < self.max_retries:
time.sleep(2**retries)
else:
print("Max retries reached. Stopping.")
return all_articles
return all_articles[:self.max_articles]
if __name__ == "__main__":
scrapper = GoogleBussinessNews(50)
res = scrapper.scrape("reliance industry", datetime(2025,1,1), datetime(2025,2,1))
print(res)
|