File size: 9,639 Bytes
1949e5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
be80984
1949e5b
 
 
 
 
 
 
 
 
 
 
d0ebfee
1949e5b
 
 
 
 
 
 
d0ebfee
1949e5b
 
 
be80984
1949e5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d87848b
1949e5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a34745f
1949e5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a34745f
 
 
 
 
 
 
1949e5b
 
a34745f
1949e5b
 
 
 
 
 
 
 
 
 
 
09d4ed0
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
from typing import List, Dict, Optional
import requests
from bs4 import BeautifulSoup
from urllib.parse import quote, urlparse, parse_qs
import time
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from math import ceil




class GoogleBussinessNews:
    BASE_URL = "https://www.google.com/search"

    USER_AGENTS = [
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
        "Mozilla/5.0 (Linux; Android 10; SM-A505F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.0.0 Mobile Safari/537.36"
    ]

    financial_business_news_domains = [
        "economictimes.indiatimes.com",
        "business-standard.com"
    ]


    SELECTORS = {
        "title": "div.n0jPhd",
        "url": "a.WlydOe",
        "description": "div.GI74Re",
        "date": "div.rbYSKb",
        "source": "div.NUnG9d"
    }

    def __init__(self, financial_business_news_domains, max_articles: int = 50, max_retries: int = 3):
        """
        Initialize the scraper with configuration options.
        """
        self.article_per_pages = 100
        self.max_pages = ceil(max_articles/self.article_per_pages)
        self.max_articles = max_articles
        self.max_retries = max_retries
        self.financial_business_news_domains = financial_business_news_domains
        self.proxies = [
            # {"http": "http://207.244.217.165:6712"},
        ]
    

    def construct_url(
        self,
        query: str,
        start_date: datetime = None,
        end_date: datetime = None,
        page: int = 0,
        hl: str = "en",
        lr: str = "lang_en",
        num: int = None,
        sort_by_date: bool = False
    ) -> str:

        if num is None:
            num = self.article_per_pages

        if start_date is None:
            start_date = datetime.today() - timedelta(days=1)
        if end_date is None:
            end_date = datetime.today()  # Current date
            
        date_filter = (
            f"cdr:1,"
            f"cd_min:{start_date.strftime('%m/%d/%Y')},"
            f"cd_max:{end_date.strftime('%m/%d/%Y')}"
        )
        
        tbs_parts = [date_filter]
        
        if sort_by_date:
            tbs_parts.append("sbd:1")
            
        params = {
            "q": quote(query+" "+" OR ".join([f'site:{x}' for x in self.financial_business_news_domains])),
            "tbm": "nws",
            "tbs": ",".join(tbs_parts),
            "start": page * num,
            "hl": hl,
            "lr": lr,
            "num": str(num),
        }

        # Build URL
        return f"{self.BASE_URL}?{'&'.join([f'{k}={v}' for k, v in params.items()])}"

    def get_random_delay(self) -> float:
        """Generate a longer random delay between requests to avoid detection."""
        return random.uniform(5, 15)
    
    def get_headers(self):
        """Return a random User-Agent."""
        return {
            "User-Agent": random.choice(self.USER_AGENTS),
            "Accept-Language": "en-US,en;q=0.9",
        }

    def is_captcha_page(self, html: str) -> bool:
        """Check if the response contains a CAPTCHA."""
        return "Our systems have detected unusual traffic" in html

    def parse_date(self, date_str: Optional[str]) -> Optional[str]:
        """
        Convert relative date strings (e.g., '1 day ago', '2 weeks ago', '1 month ago')
        or absolute date strings ('24 Mar 2023', '2023-03-24') to YYYY-MM-DD format.
        """
        if not date_str:
            return None

        date_str = date_str.lower().strip()
        today = datetime.today()

        try:
            if "ago" in date_str:
                date_str = date_str.replace("ago", "").strip()

            if "hour" in date_str or "minute" in date_str or "second" in date_str:
                return today.strftime("%Y-%m-%d")
            
            if "day" in date_str:
                days = int(date_str.split()[0])
                return (today - timedelta(days=days)).strftime("%Y-%m-%d")

            if "week" in date_str:
                weeks = int(date_str.split()[0])
                return (today - timedelta(weeks=weeks)).strftime("%Y-%m-%d")

            if "month" in date_str:
                months = int(date_str.split()[0])
                return (today - relativedelta(months=months)).strftime("%Y-%m-%d")

            if "year" in date_str:
                years = int(date_str.split()[0])
                return (today - relativedelta(years=years)).strftime("%Y-%m-%d")

            try:
                return datetime.strptime(date_str, "%Y-%m-%d").strftime("%Y-%m-%d")
            except ValueError:
                pass

            try:
                return datetime.strptime(date_str, "%d %b %Y").strftime("%Y-%m-%d")  # e.g., "24 Mar 2023"
            except ValueError:
                pass

            try:
                return datetime.strptime(date_str, "%d %B %Y").strftime("%Y-%m-%d")  # e.g., "24 March 2023"
            except ValueError:
                pass

        except Exception as e:
            print(f"Failed to parse date '{date_str}': {e}")

        return None


    def extract_articles(self, html: str) -> List[Dict[str, Optional[str]]]:
        """Extract article details from the HTML."""
        soup = BeautifulSoup(html, "html.parser")
        articles = []

        for container in soup.find_all("div", class_="SoaBEf"):
            article = {
                "title": self._safe_extract(container, self.SELECTORS["title"], "text"),
                "url": self._clean_url(self._safe_extract(container, self.SELECTORS["url"], "href")),
                "source": self._safe_extract(container, self.SELECTORS["source"], "text"),
                "date": self.parse_date(self._safe_extract(container, self.SELECTORS["date"], "text")),
                "description": self._safe_extract(container, self.SELECTORS["description"], "text"),
            }

            if article["url"]:
                articles.append(article)

        return articles

    def _clean_url(self, url: Optional[str]) -> Optional[str]:
        """Clean and extract the actual URL from Google's redirect links."""
        if url and url.startswith("/url?"):
            parsed = urlparse(url)
            qs = parse_qs(parsed.query)
            return qs.get("q", [url])[0]
        return url

    def _safe_extract(self, parent, selector: str, attr: str) -> Optional[str]:
        """Safely extract text or attributes from an element."""
        try:
            element = parent.select_one(selector)
            if not element:
                return None
            if attr == "text":
                return element.get_text().strip()
            return element.get(attr, "")
        except Exception as e:
            print(f"Failed to extract {selector}: {e}")
            return None

    def scrape(self, query: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Optional[str]]]:
        """
        Scrape Google News articles based on the query and date range.
        """
        all_articles = []

        empty_page_count = 0
        for page in range(self.max_pages):
            if len(all_articles) >= self.max_articles:
                print(f"Reached article limit ({self.max_articles}). Stopping.")
                break

            time.sleep(self.get_random_delay())
            url = self.construct_url(query, start_date, end_date, page)

            retries = 0
            while retries < self.max_retries:
                try:
                    print(f"Fetching page {page + 1}: {url}")
                    response = requests.get(
                        url,
                        headers=self.get_headers(),
                        proxies=random.choice(self.proxies) if self.proxies else None,
                        timeout=30,
                    )
                    response.raise_for_status()

                    if self.is_captcha_page(response.text):
                        print("CAPTCHA detected. Stopping scraping.")
                        return all_articles

                    articles = self.extract_articles(response.text)
                    if not articles:
                        empty_page_count += 1
                        print(f"No articles found on page {page + 1}. Empty count: {empty_page_count}")
                        if empty_page_count >= 2:  # Stop if two consecutive pages are empty
                            print("No more articles found. Stopping.")
                            return all_articles
                    else:
                        empty_page_count = 0  # Reset if we find articles

                    all_articles.extend(articles)
                    print(f"Page {page + 1}: Added {len(articles)} articles")
                    break

                except requests.exceptions.RequestException as e:
                    retries += 1
                    print(f"Request failed (attempt {retries}/{self.max_retries}): {e}")
                    if retries < self.max_retries:
                        time.sleep(2**retries)
                    else:
                        print("Max retries reached. Stopping.")
                        return all_articles

        return all_articles[:self.max_articles]
    

if __name__ == "__main__":
    scrapper = GoogleBussinessNews(50)
    res = scrapper.scrape("reliance industry", datetime(2025,1,1), datetime(2025,2,1))
    print(res)