Assignment5-7 / utils.py
Sirapatrwan's picture
Upload utils.py
abf89f6 verified
import requests
from bs4 import BeautifulSoup
from transformers import pipeline
import gtts
import io
import os
from tts import TextToSpeechConverter
from datetime import datetime
import xml.etree.ElementTree as ET
from fake_useragent import UserAgent
import locale
news_topics = {
"Technology": ["tech", "digital", "software", "hardware", "IT"],
"AI": ["artificial intelligence", "machine learning", "deep learning", "neural network"],
"Business": ["company", "corporate", "firm", "enterprise", "startup", "market"],
"Finance": ["finance", "investment", "stock", "economy", "trading", "bank"],
"Partnership": ["partner", "collaboration", "alliance", "merger", "acquisition"],
"Social Media": ["social", "platform", "tweet", "facebook", "instagram", "linkedin", "post"],
"Innovation": ["innovate", "new", "advance", "breakthrough", "disruption"],
"Outage": ["outage", "downtime", "disrupt", "service interruption"],
"Launch": ["launch", "release", "introduce", "unveil"],
"Publicity": ["public", "campaign", "promo", "advertisement"],
"Privacy": ["privacy", "data", "security", "breach"],
"Entertainment": ["entertain", "media", "show", "movie", "series"],
"Leadership": ["promotion", "leader", "executive", "ceo", "chairman", "manager"],
"Mergers & Acquisitions": ["merger", "acquisition", "buyout", "takeover"]
}
def fetch_news(company, language=None, region=None):
base_url = "https://news.google.com/rss/search"
language = language or locale.getdefaultlocale()[0].replace('_', '-').lower() or 'en-US'
region = region or 'US'
params = {
"q": f'"{company}"',
"hl": language,
"gl": region,
"ceid": f"{region}:{language.split('-')[0]}"
}
headers = {"User-Agent": UserAgent().random, "Accept": "application/xml"}
print(f"Fetching news for {company} with URL: {base_url}?{'&'.join(f'{k}={v}' for k, v in params.items())}")
try:
response = requests.get(base_url, headers=headers, params=params, timeout=15)
print(f"Response status for {company}: {response.status_code}")
response.raise_for_status()
soup = BeautifulSoup(response.content, features="xml")
if not soup:
print("Error: BeautifulSoup returned None. Falling back to ElementTree.")
return parse_with_elementtree(response.content, company)
items = soup.find_all("item")[:10]
if not items:
print(f"No news items found in the RSS feed for {company} with BeautifulSoup.")
return parse_with_elementtree(response.content, company)
print(f"Found {len(items)} items with BeautifulSoup.")
articles = []
for item in items:
title = getattr(item.title, 'text', "No title") if item.title else "No title"
desc = getattr(item.description, 'text', title) if item.description else title
link = item.link.next_sibling.strip() if item.link and item.link.next_sibling else "No link"
raw_date = getattr(item.pubDate, 'text', "Date not available") if item.pubDate else "Date not available"
try:
pub_date = datetime.strptime(raw_date, "%a, %d %b %Y %H:%M:%S %Z").strftime("%a, %d %b %Y")
except ValueError:
pub_date = "Date not available"
desc_soup = BeautifulSoup(desc, "html.parser")
full_text = desc_soup.get_text(separator=" ").strip()
summary = full_text.replace(title, "").strip()
summary_words = summary.split()
source = title.split(" - ")[-1].strip() if " - " in title else "Unknown Source"
final_summary = " ".join(summary_words[:80]) + f" - {source}" if len(summary_words) > 10 else f"{title} - {source}"
articles.append({
"title": title,
"summary": final_summary,
"link": link,
"pub_date": pub_date
})
print(f"Successfully fetched {len(articles)} articles for {company} with BeautifulSoup")
return articles
except requests.exceptions.RequestException as e:
print(f"Request failed for {company}: {str(e)}")
return []
except Exception as e:
print(f"Error processing news for {company} with BeautifulSoup: {str(e)}. Falling back to ElementTree.")
return parse_with_elementtree(response.content, company)
def parse_with_elementtree(content, company):
print("Attempting to parse with ElementTree...")
try:
root = ET.fromstring(content)
items = root.findall(".//item")[:10]
if not items:
print(f"No news items found in the RSS feed for {company} with ElementTree")
return []
articles = []
for item in items:
title = item.find("title").text if item.find("title") is not None else "No title"
desc = item.find("description").text if item.find("description") is not None else title
link = item.find("link").text if item.find("link") is not None else "No link"
raw_date = item.find("pubDate").text if item.find("pubDate") is not None else "Date not available"
try:
pub_date = datetime.strptime(raw_date, "%a, %d %b %Y %H:%M:%S %Z").strftime("%a, %d %b %Y")
except ValueError:
pub_date = "Date not available"
desc_soup = BeautifulSoup(desc, "html.parser")
full_text = desc_soup.get_text(separator=" ").strip()
summary = full_text if full_text else title
summary_words = summary.split()
source = title.split(" - ")[-1].strip() if " - " in title else "Unknown Source"
final_summary = " ".join(summary_words[:80]) + f" - {source}" if len(summary_words) > 10 else f"{title} - {source}"
articles.append({
"title": title,
"summary": final_summary,
"link": link,
"pub_date": pub_date
})
print(f"Successfully fetched {len(articles)} articles for {company} with ElementTree")
return articles
except Exception as e:
print(f"Error processing news for {company} with ElementTree: {str(e)}")
return []
sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
def analyze_sentiment(text):
try:
result = sentiment_analyzer(text[:512])[0]
score = result["score"]
label = result["label"]
if score < 0.7 or "how to" in text.lower() or "review" in text.lower():
return "Neutral"
return "Positive" if label == "POSITIVE" else "Negative"
except Exception as e:
print(f"Sentiment analysis error: {e}")
return "Neutral"
def extract_topics(text, max_topics=2):
text_lower = text.lower()
topic_scores = {}
for topic, keywords in news_topics.items():
count = sum(text_lower.count(keyword.lower()) for keyword in keywords)
if count > 0:
topic_scores[topic] = count
sorted_topics = sorted(topic_scores.items(), key=lambda x: x[1], reverse=True)
return [topic for topic, _ in sorted_topics][:max_topics] if sorted_topics else ["General News"]
tts_converter = TextToSpeechConverter()
def generate_tts(text, language='hi'):
try:
if language == 'hi':
result = tts_converter.generate_speech(text)
if result["success"]:
print(f"Hindi audio generated in memory")
return result["audio_buffer"]
else:
print(f"Hindi audio error: {result['message']}")
return None
else:
tts = gtts.gTTS(text=text, lang='en', slow=False)
audio_buffer = io.BytesIO()
tts.write_to_fp(audio_buffer)
audio_buffer.seek(0)
return audio_buffer
except Exception as e:
print(f"Audio generation error for {language}: {str(e)}")
return None