FinanceNews / gfnews.py
sahilmayekar's picture
console prints removed.
f0583ee
from typing import List, Dict, Optional
import requests
from bs4 import BeautifulSoup
from urllib.parse import quote, urlparse, parse_qs
import time
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from math import ceil
class GoogleBussinessNews:
BASE_URL = "https://www.google.com/search"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"Mozilla/5.0 (Linux; Android 10; SM-A505F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Mobile Safari/537.36"
]
financial_business_news_domains = [
"economictimes.indiatimes.com",
"business-standard.com"
]
SELECTORS = {
"title": "div.n0jPhd",
"url": "a.WlydOe",
"description": "div.GI74Re",
"date": "div.rbYSKb",
"source": "div.NUnG9d"
}
def __init__(self, financial_business_news_domains, max_articles: int = 50, max_retries: int = 3):
"""
Initialize the scraper with configuration options.
"""
self.article_per_pages = 100
self.max_pages = ceil(max_articles/self.article_per_pages)
self.max_articles = max_articles
self.max_retries = max_retries
self.financial_business_news_domains = financial_business_news_domains
self.proxies = [
# {"http": "http://207.244.217.165:6712"},
]
def construct_url(
self,
query: str,
start_date: datetime = None,
end_date: datetime = None,
page: int = 0,
hl: str = "en",
lr: str = "lang_en",
num: int = None,
sort_by_date: bool = False
) -> str:
if num is None:
num = self.article_per_pages
if start_date is None:
start_date = datetime.today() - timedelta(days=1)
if end_date is None:
end_date = datetime.today() # Current date
date_filter = (
f"cdr:1,"
f"cd_min:{start_date.strftime('%m/%d/%Y')},"
f"cd_max:{end_date.strftime('%m/%d/%Y')}"
)
tbs_parts = [date_filter]
if sort_by_date:
tbs_parts.append("sbd:1")
params = {
"q": quote(query+" "+" OR ".join([f'site:{x}' for x in self.financial_business_news_domains])),
"tbm": "nws",
"tbs": ",".join(tbs_parts),
"start": page * num,
"hl": hl,
"lr": lr,
"num": str(num),
}
# Build URL
return f"{self.BASE_URL}?{'&'.join([f'{k}={v}' for k, v in params.items()])}"
def get_random_delay(self) -> float:
"""Generate a longer random delay between requests to avoid detection."""
return random.uniform(5, 15)
def get_headers(self):
"""Return a random User-Agent."""
return {
"User-Agent": random.choice(self.USER_AGENTS),
"Accept-Language": "en-US,en;q=0.9",
}
def is_captcha_page(self, html: str) -> bool:
"""Check if the response contains a CAPTCHA."""
return "Our systems have detected unusual traffic" in html
def parse_date(self, date_str: Optional[str]) -> Optional[str]:
"""
Convert relative date strings (e.g., '1 day ago', '2 weeks ago', '1 month ago')
or absolute date strings ('24 Mar 2023', '2023-03-24') to YYYY-MM-DD format.
"""
if not date_str:
return None
date_str = date_str.lower().strip()
today = datetime.today()
try:
if "ago" in date_str:
date_str = date_str.replace("ago", "").strip()
if "hour" in date_str or "minute" in date_str or "second" in date_str:
return today.strftime("%Y-%m-%d")
if "day" in date_str:
days = int(date_str.split()[0])
return (today - timedelta(days=days)).strftime("%Y-%m-%d")
if "week" in date_str:
weeks = int(date_str.split()[0])
return (today - timedelta(weeks=weeks)).strftime("%Y-%m-%d")
if "month" in date_str:
months = int(date_str.split()[0])
return (today - relativedelta(months=months)).strftime("%Y-%m-%d")
if "year" in date_str:
years = int(date_str.split()[0])
return (today - relativedelta(years=years)).strftime("%Y-%m-%d")
try:
return datetime.strptime(date_str, "%Y-%m-%d").strftime("%Y-%m-%d")
except ValueError:
pass
try:
return datetime.strptime(date_str, "%d %b %Y").strftime("%Y-%m-%d") # e.g., "24 Mar 2023"
except ValueError:
pass
try:
return datetime.strptime(date_str, "%d %B %Y").strftime("%Y-%m-%d") # e.g., "24 March 2023"
except ValueError:
pass
except Exception as e:
print(f"Failed to parse date '{date_str}': {e}")
return None
def extract_articles(self, html: str) -> List[Dict[str, Optional[str]]]:
"""Extract article details from the HTML."""
soup = BeautifulSoup(html, "html.parser")
articles = []
for container in soup.find_all("div", class_="SoaBEf"):
article = {
"title": self._safe_extract(container, self.SELECTORS["title"], "text"),
"url": self._clean_url(self._safe_extract(container, self.SELECTORS["url"], "href")),
"source": self._safe_extract(container, self.SELECTORS["source"], "text"),
"date": self.parse_date(self._safe_extract(container, self.SELECTORS["date"], "text")),
"description": self._safe_extract(container, self.SELECTORS["description"], "text"),
}
if article["url"]:
articles.append(article)
return articles
def _clean_url(self, url: Optional[str]) -> Optional[str]:
"""Clean and extract the actual URL from Google's redirect links."""
if url and url.startswith("/url?"):
parsed = urlparse(url)
qs = parse_qs(parsed.query)
return qs.get("q", [url])[0]
return url
def _safe_extract(self, parent, selector: str, attr: str) -> Optional[str]:
"""Safely extract text or attributes from an element."""
try:
element = parent.select_one(selector)
if not element:
return None
if attr == "text":
return element.get_text().strip()
return element.get(attr, "")
except Exception as e:
print(f"Failed to extract {selector}: {e}")
return None
def scrape(self, query: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Optional[str]]]:
"""
Scrape Google News articles based on the query and date range.
"""
all_articles = []
empty_page_count = 0
for page in range(self.max_pages):
if len(all_articles) >= self.max_articles:
print(f"Reached article limit ({self.max_articles}). Stopping.")
break
time.sleep(self.get_random_delay())
url = self.construct_url(query, start_date, end_date, page)
retries = 0
while retries < self.max_retries:
try:
print(f"Fetching page {page + 1}: {url}")
response = requests.get(
url,
headers=self.get_headers(),
proxies=random.choice(self.proxies) if self.proxies else None,
timeout=30,
)
response.raise_for_status()
if self.is_captcha_page(response.text):
print("CAPTCHA detected. Stopping scraping.")
return all_articles
articles = self.extract_articles(response.text)
if not articles:
empty_page_count += 1
print(f"No articles found on page {page + 1}. Empty count: {empty_page_count}")
if empty_page_count >= 2: # Stop if two consecutive pages are empty
print("No more articles found. Stopping.")
return all_articles
else:
empty_page_count = 0 # Reset if we find articles
all_articles.extend(articles)
print(f"Page {page + 1}: Added {len(articles)} articles")
break
except requests.exceptions.RequestException as e:
retries += 1
print(f"Request failed (attempt {retries}/{self.max_retries}): {e}")
if retries < self.max_retries:
time.sleep(2**retries)
else:
print("Max retries reached. Stopping.")
return all_articles
return all_articles[:self.max_articles]
if __name__ == "__main__":
scrapper = GoogleBussinessNews(50)
res = scrapper.scrape("reliance industry", datetime(2025,1,1), datetime(2025,2,1))
print(res)