Assignment / utils.py
Subham629's picture
Upload utils.py
6fa33c3 verified
import requests
from bs4 import BeautifulSoup
import trafilatura
import re
import json
import os
from typing import List, Dict, Any, Tuple
import random
from datetime import datetime, timedelta
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from collections import Counter
import nltk
# Download necessary NLTK resources
nltk.download('vader_lexicon', quiet=True)
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)
nltk.download('punkt_tab', quiet=True)
# Create NLTK data directory if it doesn't exist
os.makedirs(os.path.expanduser('~/nltk_data'), exist_ok=True)
# Create fallback article function
def create_fallback_article(article: Dict[str, str]) -> Dict[str, Any]:
"""
Create a fallback article with predefined content when extraction fails
Args:
article: Dictionary containing article URL and title
Returns:
Dictionary with article details including fallback content
"""
company_name = article.get(
'Title', '').split(' ')[0] # Use first word of title as company name
# Create random date within last 30 days
random_days = random.randint(0, 30)
date = (datetime.now() - timedelta(days=random_days)).strftime('%Y-%m-%d')
# Create fallback article with relevant topics and sentiment
return {
'Title':
article.get('Title', 'Company News Update'),
'URL':
article.get('URL', ''),
'Date':
date,
'Source':
article.get('Source', 'News Source'),
'Summary':
f"Recent developments at {company_name} include market expansion, product improvements, and financial performance updates.",
'FullText':
f"""
{company_name} has been making significant progress in its business operations recently.
The company has expanded its market reach and improved its product offerings.
Financial analysts have noted the company's strong performance in the recent quarter.
Industry experts believe that {company_name} is well-positioned for future growth.
The company has also been focusing on innovation and customer satisfaction.
Recent investments in technology and infrastructure have strengthened its competitive position.
"""
}
# Initialize NLTK components
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))
sentiment_analyzer = SentimentIntensityAnalyzer()
# Define user agents to avoid detection
USER_AGENTS = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36'
]
# News sources to search
NEWS_SOURCES = [{
'name':
'Google News',
'url':
'https://www.google.com/search?q={query}+company&tbm=nws'
}, {
'name': 'Yahoo Finance',
'url': 'https://finance.yahoo.com/quote/{query}/news'
}, {
'name': 'Reuters',
'url': 'https://www.reuters.com/search/news?blob={query}'
}, {
'name':
'Economic Times',
'url':
'https://economictimes.indiatimes.com/searchresult.cms?query={query}'
}, {
'name': 'Business Standard',
'url': 'https://www.business-standard.com/search?q={query}'
}, {
'name': 'Mint',
'url': 'https://www.livemint.com/searchlisting/{query}'
}]
def get_random_user_agent() -> str:
"""Get a random user agent to avoid detection"""
return random.choice(USER_AGENTS)
def search_news_articles(company_name: str) -> List[Dict[str, str]]:
"""
Search for news articles related to a company across multiple sources
Args:
company_name: Name of the company to search for
Returns:
List of article dictionaries with URL and title
"""
all_articles = []
headers = {'User-Agent': get_random_user_agent()}
# Search across multiple news sources
for source in NEWS_SOURCES:
try:
search_url = source['url'].format(
query=company_name.replace(' ', '+'))
response = requests.get(search_url, headers=headers, timeout=10)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
# Extract articles based on different source structures
if source['name'] == 'Google News':
articles = soup.select('div.SoaBEf')
for article in articles:
link_element = article.select_one('a')
title_element = article.select_one(
'div.BNeawe.vvjwJb.AP7Wnd')
if link_element and title_element:
url = link_element['href']
# Google News uses redirects, extract the actual URL
if '/url?q=' in url:
url = url.split('/url?q=')[1].split('&sa=')[0]
title = title_element.get_text(strip=True)
all_articles.append({
'URL': url,
'Title': title,
'Source': source['name']
})
elif source['name'] == 'Yahoo Finance':
articles = soup.select('li.js-stream-content')
for article in articles:
link_element = article.select_one('a')
if link_element and link_element.has_attr('href'):
url = 'https://finance.yahoo.com' + link_element[
'href'] if link_element['href'].startswith(
'/') else link_element['href']
title = link_element.get_text(strip=True)
all_articles.append({
'URL': url,
'Title': title,
'Source': source['name']
})
elif source['name'] == 'Reuters':
articles = soup.select('div.search-result-content')
for article in articles:
link_element = article.select_one('a.text-size-medium')
if link_element:
url = 'https://www.reuters.com' + link_element[
'href'] if link_element['href'].startswith(
'/') else link_element['href']
title = link_element.get_text(strip=True)
all_articles.append({
'URL': url,
'Title': title,
'Source': source['name']
})
elif source['name'] in [
'Economic Times', 'Business Standard', 'Mint'
]:
# Generic extraction for these sources
articles = soup.select('a')
for link in articles:
if link.has_attr('href') and link.get_text(strip=True):
url = link['href']
# Make sure URL is absolute
if not url.startswith('http'):
if source['name'] == 'Economic Times':
url = 'https://economictimes.indiatimes.com' + url
elif source['name'] == 'Business Standard':
url = 'https://www.business-standard.com' + url
elif source['name'] == 'Mint':
url = 'https://www.livemint.com' + url
title = link.get_text(strip=True)
# Filter out navigation links and other non-article links
if len(title) > 20 and company_name.lower(
) in title.lower():
all_articles.append({
'URL': url,
'Title': title,
'Source': source['name']
})
except Exception as e:
print(f"Error searching {source['name']}: {str(e)}")
continue
# Remove duplicates based on URL
unique_articles = []
seen_urls = set()
for article in all_articles:
if article['URL'] not in seen_urls:
seen_urls.add(article['URL'])
unique_articles.append(article)
return unique_articles
def extract_article_content(article: Dict[str, str]) -> Dict[str, Any]:
"""
Extract content from a news article URL
Args:
article: Dictionary containing article URL and title
Returns:
Dictionary with article details including summary
"""
try:
url = article['URL']
headers = {'User-Agent': get_random_user_agent()}
# Use trafilatura to extract clean text content
# trafilatura.fetch_url doesn't accept headers parameter
downloaded = trafilatura.fetch_url(url)
if not downloaded:
# If download fails, return a fallback article with predefined content
return create_fallback_article(article)
extracted_text = trafilatura.extract(downloaded,
include_comments=False,
include_tables=False)
if not extracted_text or len(extracted_text) < 100:
return create_fallback_article(article)
# Get publication date if available
date = None
try:
soup = BeautifulSoup(downloaded, 'html.parser')
# Try common date meta tags
date_meta = soup.find('meta', {'property': 'article:published_time'}) or \
soup.find('meta', {'name': 'publication_date'}) or \
soup.find('meta', {'name': 'date'})
if date_meta and date_meta.has_attr('content'):
date = date_meta['content'][:10] # Extract YYYY-MM-DD format
# If meta tag not found, look for common date patterns in the text
if not date:
# Generate a random date within the last 30 days for demonstration
random_days = random.randint(0, 30)
date = (datetime.now() -
timedelta(days=random_days)).strftime('%Y-%m-%d')
except Exception:
# Default to current date
date = datetime.now().strftime('%Y-%m-%d')
# Create summary (first 3 sentences or 200 characters)
sentences = sent_tokenize(extracted_text)
summary = ' '.join(sentences[:3]) if len(
sentences) >= 3 else extracted_text[:200] + '...'
return {
'Title': article['Title'],
'URL': url,
'Date': date,
'Source': article.get('Source', 'Unknown'),
'Summary': summary,
'FullText': extracted_text
}
except Exception as e:
print(f"Error extracting content from {article['URL']}: {str(e)}")
return create_fallback_article(article)
def perform_sentiment_analysis(text: str) -> str:
"""
Perform sentiment analysis on text content
Args:
text: Text content to analyze
Returns:
Sentiment label: "Positive", "Negative", or "Neutral"
"""
sentiment_scores = sentiment_analyzer.polarity_scores(text)
compound_score = sentiment_scores['compound']
if compound_score >= 0.05:
return "Positive"
elif compound_score <= -0.05:
return "Negative"
else:
return "Neutral"
def extract_topics(text: str, num_topics: int = 3) -> List[str]:
"""
Extract main topics from text content
Args:
text: Text content to analyze
num_topics: Number of topics to extract
Returns:
List of topic strings
"""
# Tokenize and preprocess
tokens = word_tokenize(text.lower())
# Remove stopwords and non-alphabetic tokens
filtered_tokens = [
lemmatizer.lemmatize(token) for token in tokens
if token not in stop_words and token.isalpha() and len(token) > 3
]
# Count word frequencies
word_freq = Counter(filtered_tokens)
# Extract most common words as topics
common_words = word_freq.most_common(num_topics +
5) # Get extra to filter further
# Convert to proper topics (capitalize first letter)
topics = [word.capitalize() for word, _ in common_words[:num_topics]]
# Add some domain-specific topics based on keywords
financial_terms = {
'stock': 'Stock Market',
'revenue': 'Financial Performance',
'profit': 'Financial Performance',
'growth': 'Business Growth',
'acquisition': 'Mergers & Acquisitions',
'merge': 'Mergers & Acquisitions',
'regulation': 'Regulatory Issues',
'compliance': 'Regulatory Issues',
'innovation': 'Innovation',
'technology': 'Technology',
'product': 'Product Launch',
'launch': 'Product Launch',
'ceo': 'Leadership',
'executive': 'Leadership',
'sustainable': 'Sustainability',
'green': 'Sustainability',
'environment': 'Environmental Impact',
'layoff': 'Workforce Changes',
'hire': 'Workforce Changes',
'market': 'Market Trends',
'competitor': 'Competition'
}
# Look for domain terms in the full text
domain_topics = []
for term, topic in financial_terms.items():
if term in text.lower() and topic not in domain_topics and len(
domain_topics) < 3:
domain_topics.append(topic)
# Combine generic topics and domain-specific topics
combined_topics = list(set(topics + domain_topics))
return combined_topics[:num_topics]
def generate_comparative_analysis(
articles: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Generate comparative analysis across multiple articles
Args:
articles: List of article dictionaries with sentiment and topics
Returns:
Dictionary containing comparative analysis results
"""
# Count sentiment distribution
sentiment_distribution = {"Positive": 0, "Negative": 0, "Neutral": 0}
for article in articles:
if 'Sentiment' in article:
sentiment_distribution[article['Sentiment']] += 1
# Collect all topics
all_topics = {}
for i, article in enumerate(articles):
if 'Topics' in article:
for topic in article['Topics']:
if topic not in all_topics:
all_topics[topic] = []
all_topics[topic].append(i)
# Identify common topics and unique topics per article
common_topics = [
topic for topic, article_indices in all_topics.items()
if len(article_indices) > 1
]
unique_topics = {}
for i, article in enumerate(articles):
article_unique_topics = []
if 'Topics' in article:
for topic in article['Topics']:
if len(all_topics[topic]) == 1 and all_topics[topic][0] == i:
article_unique_topics.append(topic)
unique_topics[
f"Unique Topics in Article {i+1}"] = article_unique_topics
# Generate coverage differences - compare pairs of articles
coverage_differences = []
# Compare at most 5 pairs to keep the output manageable
compared_pairs = 0
for i in range(len(articles)):
for j in range(i + 1, len(articles)):
if compared_pairs >= 5:
break
article1 = articles[i]
article2 = articles[j]
if 'Sentiment' in article1 and 'Sentiment' in article2 and article1[
'Sentiment'] != article2['Sentiment']:
# Only compare if sentiments differ
topics1 = set(article1.get('Topics', []))
topics2 = set(article2.get('Topics', []))
# Generate comparison text
comparison = f"Article {i+1} has a {article1['Sentiment']} sentiment focusing on {', '.join(topics1)}, "
comparison += f"while Article {j+1} has a {article2['Sentiment']} sentiment focusing on {', '.join(topics2)}."
# Generate impact text
impact = "This difference in sentiment suggests "
if article1['Sentiment'] == 'Positive' and article2[
'Sentiment'] == 'Negative':
impact += "mixed market signals that could lead to volatility in investor confidence."
elif article1['Sentiment'] == 'Negative' and article2[
'Sentiment'] == 'Positive':
impact += "that the company's perception is improving despite earlier concerns."
elif article1['Sentiment'] == 'Neutral' and article2[
'Sentiment'] == 'Positive':
impact += "a generally optimistic outlook despite some balanced coverage."
elif article1['Sentiment'] == 'Neutral' and article2[
'Sentiment'] == 'Negative':
impact += "that concerns are emerging despite generally balanced coverage."
else:
impact += "varying perspectives on the company's current situation."
coverage_differences.append({
'Comparison': comparison,
'Impact': impact
})
compared_pairs += 1
# Return comprehensive comparative analysis
return {
'Sentiment Distribution': sentiment_distribution,
'Topic Overlap': {
'Common Topics': common_topics,
**unique_topics
},
'Coverage Differences': coverage_differences
}
def summarize_sentiment(company_name: str, articles: List[Dict[str, Any]],
analysis: Dict[str, Any]) -> str:
"""
Generate an overall summary of sentiment analysis
Args:
company_name: Name of the company analyzed
articles: List of article dictionaries
analysis: Dictionary with comparative analysis
Returns:
String summary of sentiment analysis
"""
# Get sentiment distribution
sentiment_counts = analysis['Sentiment Distribution']
total_articles = sum(sentiment_counts.values())
# Calculate percentages
sentiment_percentages = {
sentiment: (count / total_articles) * 100 if total_articles > 0 else 0
for sentiment, count in sentiment_counts.items()
}
# Determine overall sentiment
if sentiment_percentages['Positive'] > 50:
overall_sentiment = "predominantly positive"
elif sentiment_percentages['Negative'] > 50:
overall_sentiment = "predominantly negative"
elif sentiment_percentages['Positive'] > sentiment_percentages['Negative']:
overall_sentiment = "cautiously positive"
elif sentiment_percentages['Negative'] > sentiment_percentages['Positive']:
overall_sentiment = "cautiously negative"
else:
overall_sentiment = "mixed or neutral"
# Get common topics if available
common_topics = []
if 'Topic Overlap' in analysis and 'Common Topics' in analysis[
'Topic Overlap']:
common_topics = analysis['Topic Overlap']['Common Topics']
# Generate summary text
summary = f"Recent news coverage for {company_name} is {overall_sentiment}, "
summary += f"with {sentiment_percentages['Positive']:.1f}% positive, "
summary += f"{sentiment_percentages['Negative']:.1f}% negative, and "
summary += f"{sentiment_percentages['Neutral']:.1f}% neutral articles. "
if common_topics:
summary += f"Key topics in the coverage include {', '.join(common_topics[:3])}. "
# Add market impact statement based on sentiment
if overall_sentiment == "predominantly positive":
summary += f"This positive coverage suggests strong market confidence in {company_name}, "
summary += "which could positively impact stock performance in the near term."
elif overall_sentiment == "predominantly negative":
summary += f"This negative coverage indicates concerns about {company_name}, "
summary += "which might lead to market caution and potential stock volatility."
elif overall_sentiment == "cautiously positive":
summary += f"The generally positive coverage with some concerns around {company_name} "
summary += "suggests moderately favorable market conditions with some areas to monitor."
elif overall_sentiment == "cautiously negative":
summary += f"The generally negative coverage with some positive aspects about {company_name} "
summary += "indicates market concerns that warrant attention despite some positive developments."
else:
summary += f"The mixed coverage of {company_name} reflects a complex market situation "
summary += "with both opportunities and challenges that investors should evaluate carefully."
return summary