Spaces:
Sleeping
Sleeping
import requests | |
from bs4 import BeautifulSoup | |
from transformers import pipeline | |
import gtts | |
import io | |
import os | |
from tts import TextToSpeechConverter | |
from datetime import datetime | |
import xml.etree.ElementTree as ET | |
from fake_useragent import UserAgent | |
import locale | |
news_topics = { | |
"Technology": ["tech", "digital", "software", "hardware", "IT"], | |
"AI": ["artificial intelligence", "machine learning", "deep learning", "neural network"], | |
"Business": ["company", "corporate", "firm", "enterprise", "startup", "market"], | |
"Finance": ["finance", "investment", "stock", "economy", "trading", "bank"], | |
"Partnership": ["partner", "collaboration", "alliance", "merger", "acquisition"], | |
"Social Media": ["social", "platform", "tweet", "facebook", "instagram", "linkedin", "post"], | |
"Innovation": ["innovate", "new", "advance", "breakthrough", "disruption"], | |
"Outage": ["outage", "downtime", "disrupt", "service interruption"], | |
"Launch": ["launch", "release", "introduce", "unveil"], | |
"Publicity": ["public", "campaign", "promo", "advertisement"], | |
"Privacy": ["privacy", "data", "security", "breach"], | |
"Entertainment": ["entertain", "media", "show", "movie", "series"], | |
"Leadership": ["promotion", "leader", "executive", "ceo", "chairman", "manager"], | |
"Mergers & Acquisitions": ["merger", "acquisition", "buyout", "takeover"] | |
} | |
def fetch_news(company, language=None, region=None): | |
base_url = "https://news.google.com/rss/search" | |
language = language or locale.getdefaultlocale()[0].replace('_', '-').lower() or 'en-US' | |
region = region or 'US' | |
params = { | |
"q": f'"{company}"', | |
"hl": language, | |
"gl": region, | |
"ceid": f"{region}:{language.split('-')[0]}" | |
} | |
headers = {"User-Agent": UserAgent().random, "Accept": "application/xml"} | |
print(f"Fetching news for {company} with URL: {base_url}?{'&'.join(f'{k}={v}' for k, v in params.items())}") | |
try: | |
response = requests.get(base_url, headers=headers, params=params, timeout=15) | |
print(f"Response status for {company}: {response.status_code}") | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, features="xml") | |
if not soup: | |
print("Error: BeautifulSoup returned None. Falling back to ElementTree.") | |
return parse_with_elementtree(response.content, company) | |
items = soup.find_all("item")[:10] | |
if not items: | |
print(f"No news items found in the RSS feed for {company} with BeautifulSoup.") | |
return parse_with_elementtree(response.content, company) | |
print(f"Found {len(items)} items with BeautifulSoup.") | |
articles = [] | |
for item in items: | |
title = getattr(item.title, 'text', "No title") if item.title else "No title" | |
desc = getattr(item.description, 'text', title) if item.description else title | |
link = item.link.next_sibling.strip() if item.link and item.link.next_sibling else "No link" | |
raw_date = getattr(item.pubDate, 'text', "Date not available") if item.pubDate else "Date not available" | |
try: | |
pub_date = datetime.strptime(raw_date, "%a, %d %b %Y %H:%M:%S %Z").strftime("%a, %d %b %Y") | |
except ValueError: | |
pub_date = "Date not available" | |
desc_soup = BeautifulSoup(desc, "html.parser") | |
full_text = desc_soup.get_text(separator=" ").strip() | |
summary = full_text.replace(title, "").strip() | |
summary_words = summary.split() | |
source = title.split(" - ")[-1].strip() if " - " in title else "Unknown Source" | |
final_summary = " ".join(summary_words[:80]) + f" - {source}" if len(summary_words) > 10 else f"{title} - {source}" | |
articles.append({ | |
"title": title, | |
"summary": final_summary, | |
"link": link, | |
"pub_date": pub_date | |
}) | |
print(f"Successfully fetched {len(articles)} articles for {company} with BeautifulSoup") | |
return articles | |
except requests.exceptions.RequestException as e: | |
print(f"Request failed for {company}: {str(e)}") | |
return [] | |
except Exception as e: | |
print(f"Error processing news for {company} with BeautifulSoup: {str(e)}. Falling back to ElementTree.") | |
return parse_with_elementtree(response.content, company) | |
def parse_with_elementtree(content, company): | |
print("Attempting to parse with ElementTree...") | |
try: | |
root = ET.fromstring(content) | |
items = root.findall(".//item")[:10] | |
if not items: | |
print(f"No news items found in the RSS feed for {company} with ElementTree") | |
return [] | |
articles = [] | |
for item in items: | |
title = item.find("title").text if item.find("title") is not None else "No title" | |
desc = item.find("description").text if item.find("description") is not None else title | |
link = item.find("link").text if item.find("link") is not None else "No link" | |
raw_date = item.find("pubDate").text if item.find("pubDate") is not None else "Date not available" | |
try: | |
pub_date = datetime.strptime(raw_date, "%a, %d %b %Y %H:%M:%S %Z").strftime("%a, %d %b %Y") | |
except ValueError: | |
pub_date = "Date not available" | |
desc_soup = BeautifulSoup(desc, "html.parser") | |
full_text = desc_soup.get_text(separator=" ").strip() | |
summary = full_text if full_text else title | |
summary_words = summary.split() | |
source = title.split(" - ")[-1].strip() if " - " in title else "Unknown Source" | |
final_summary = " ".join(summary_words[:80]) + f" - {source}" if len(summary_words) > 10 else f"{title} - {source}" | |
articles.append({ | |
"title": title, | |
"summary": final_summary, | |
"link": link, | |
"pub_date": pub_date | |
}) | |
print(f"Successfully fetched {len(articles)} articles for {company} with ElementTree") | |
return articles | |
except Exception as e: | |
print(f"Error processing news for {company} with ElementTree: {str(e)}") | |
return [] | |
sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english") | |
def analyze_sentiment(text): | |
try: | |
result = sentiment_analyzer(text[:512])[0] | |
score = result["score"] | |
label = result["label"] | |
if score < 0.7 or "how to" in text.lower() or "review" in text.lower(): | |
return "Neutral" | |
return "Positive" if label == "POSITIVE" else "Negative" | |
except Exception as e: | |
print(f"Sentiment analysis error: {e}") | |
return "Neutral" | |
def extract_topics(text, max_topics=2): | |
text_lower = text.lower() | |
topic_scores = {} | |
for topic, keywords in news_topics.items(): | |
count = sum(text_lower.count(keyword.lower()) for keyword in keywords) | |
if count > 0: | |
topic_scores[topic] = count | |
sorted_topics = sorted(topic_scores.items(), key=lambda x: x[1], reverse=True) | |
return [topic for topic, _ in sorted_topics][:max_topics] if sorted_topics else ["General News"] | |
tts_converter = TextToSpeechConverter() | |
def generate_tts(text, language='hi'): | |
try: | |
if language == 'hi': | |
result = tts_converter.generate_speech(text) | |
if result["success"]: | |
print(f"Hindi audio generated in memory") | |
return result["audio_buffer"] | |
else: | |
print(f"Hindi audio error: {result['message']}") | |
return None | |
else: | |
tts = gtts.gTTS(text=text, lang='en', slow=False) | |
audio_buffer = io.BytesIO() | |
tts.write_to_fp(audio_buffer) | |
audio_buffer.seek(0) | |
return audio_buffer | |
except Exception as e: | |
print(f"Audio generation error for {language}: {str(e)}") | |
return None |