Spaces:
Sleeping
Sleeping
""" | |
Advanced Research Trend Monitor - Web App Version | |
Based on the notebook implementation with enhanced features | |
""" | |
import json | |
import time | |
from datetime import datetime, timedelta | |
from typing import List, Dict, Any, Optional | |
from collections import defaultdict, Counter | |
import re | |
# Optional imports for advanced features | |
try: | |
import networkx as nx | |
HAS_NETWORKX = True | |
except ImportError: | |
HAS_NETWORKX = False | |
print("⚠️ NetworkX not available - some advanced features disabled") | |
try: | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
HAS_PLOTTING = True | |
except ImportError: | |
HAS_PLOTTING = False | |
print("⚠️ Matplotlib/Seaborn not available - plotting features disabled") | |
try: | |
from wordcloud import WordCloud | |
HAS_WORDCLOUD = True | |
except ImportError: | |
HAS_WORDCLOUD = False | |
print("⚠️ WordCloud not available - word cloud features disabled") | |
try: | |
import numpy as np | |
HAS_NUMPY = True | |
except ImportError: | |
HAS_NUMPY = False | |
print("⚠️ NumPy not available - some numerical features disabled") | |
class AdvancedTrendMonitor: | |
"""Advanced research trend monitoring with temporal analysis and gap detection""" | |
def __init__(self, groq_processor=None): | |
self.groq_processor = groq_processor | |
self.trend_data = {} | |
self.keyword_trends = defaultdict(list) | |
self.temporal_data = defaultdict(list) | |
self.gap_analysis_cache = {} | |
print("✅ Advanced Research Trend Monitor initialized!") | |
def analyze_temporal_trends(self, papers: List[Dict], timeframe: str = "yearly") -> Dict: | |
"""Analyze trends over time with sophisticated temporal analysis""" | |
try: | |
if not papers: | |
return {'error': 'No papers provided for temporal analysis'} | |
# Group papers by time period | |
temporal_groups = defaultdict(list) | |
year_counts = defaultdict(int) | |
keyword_evolution = defaultdict(lambda: defaultdict(int)) | |
for paper in papers: | |
year = paper.get('year') | |
if not year: | |
continue | |
# Handle different year formats | |
if isinstance(year, str): | |
try: | |
year = int(year) | |
except ValueError: | |
continue | |
if year < 1990 or year > 2030: # Filter unrealistic years | |
continue | |
temporal_groups[year].append(paper) | |
year_counts[year] += 1 | |
# Track keyword evolution | |
title = paper.get('title', '').lower() | |
abstract = paper.get('abstract', '').lower() | |
content = f"{title} {abstract}" | |
# Extract keywords (simple approach) | |
keywords = self._extract_keywords(content) | |
for keyword in keywords: | |
keyword_evolution[year][keyword] += 1 | |
# Calculate trends | |
trends = { | |
'publication_trend': dict(sorted(year_counts.items())), | |
'keyword_evolution': dict(keyword_evolution), | |
'temporal_analysis': {}, | |
'growth_analysis': {}, | |
'emerging_topics': {}, | |
'declining_topics': {} | |
} | |
# Analyze publication growth | |
years = sorted(year_counts.keys()) | |
if len(years) >= 2: | |
recent_years = years[-3:] # Last 3 years | |
earlier_years = years[:-3] if len(years) > 3 else years[:-1] | |
recent_avg = sum(year_counts[y] for y in recent_years) / len(recent_years) | |
earlier_avg = sum(year_counts[y] for y in earlier_years) / len(earlier_years) if earlier_years else 0 | |
growth_rate = ((recent_avg - earlier_avg) / earlier_avg * 100) if earlier_avg > 0 else 0 | |
trends['growth_analysis'] = { | |
'recent_average': recent_avg, | |
'earlier_average': earlier_avg, | |
'growth_rate_percent': growth_rate, | |
'trend_direction': 'growing' if growth_rate > 5 else 'declining' if growth_rate < -5 else 'stable' | |
} | |
# Analyze emerging vs declining topics | |
if len(years) >= 2: | |
recent_year = years[-1] | |
previous_year = years[-2] if len(years) > 1 else years[-1] | |
recent_keywords = set(keyword_evolution[recent_year].keys()) | |
previous_keywords = set(keyword_evolution[previous_year].keys()) | |
emerging = recent_keywords - previous_keywords | |
declining = previous_keywords - recent_keywords | |
trends['emerging_topics'] = { | |
'topics': list(emerging)[:10], # Top 10 emerging | |
'count': len(emerging) | |
} | |
trends['declining_topics'] = { | |
'topics': list(declining)[:10], # Top 10 declining | |
'count': len(declining) | |
} | |
# Temporal analysis summary | |
trends['temporal_analysis'] = { | |
'total_years': len(years), | |
'year_range': f"{min(years)}-{max(years)}" if years else "N/A", | |
'peak_year': max(year_counts.items(), key=lambda x: x[1])[0] if year_counts else None, | |
'total_papers': sum(year_counts.values()), | |
'average_per_year': sum(year_counts.values()) / len(years) if years else 0 | |
} | |
return trends | |
except Exception as e: | |
return { | |
'error': f'Temporal trend analysis failed: {str(e)}', | |
'analysis_timestamp': datetime.now().isoformat() | |
} | |
def detect_research_gaps(self, papers: List[Dict]) -> Dict: | |
"""Detect research gaps using advanced analysis""" | |
try: | |
if not papers: | |
return {'error': 'No papers provided for gap analysis'} | |
# Analyze methodologies | |
methodologies = defaultdict(int) | |
research_areas = defaultdict(int) | |
data_types = defaultdict(int) | |
evaluation_methods = defaultdict(int) | |
# Common research area keywords | |
area_keywords = { | |
'natural_language_processing': ['nlp', 'language', 'text', 'linguistic'], | |
'computer_vision': ['vision', 'image', 'visual', 'cv'], | |
'machine_learning': ['ml', 'learning', 'algorithm', 'model'], | |
'deep_learning': ['deep', 'neural', 'network', 'cnn', 'rnn'], | |
'reinforcement_learning': ['reinforcement', 'rl', 'agent', 'policy'], | |
'robotics': ['robot', 'robotic', 'manipulation', 'control'], | |
'healthcare': ['medical', 'health', 'clinical', 'patient'], | |
'finance': ['financial', 'trading', 'market', 'economic'], | |
'security': ['security', 'privacy', 'attack', 'defense'] | |
} | |
# Methodology keywords | |
method_keywords = { | |
'supervised_learning': ['supervised', 'classification', 'regression'], | |
'unsupervised_learning': ['unsupervised', 'clustering', 'dimensionality'], | |
'semi_supervised': ['semi-supervised', 'few-shot', 'zero-shot'], | |
'transfer_learning': ['transfer', 'domain adaptation', 'fine-tuning'], | |
'federated_learning': ['federated', 'distributed', 'decentralized'], | |
'meta_learning': ['meta', 'learning to learn', 'few-shot'], | |
'explainable_ai': ['explainable', 'interpretable', 'explanation'], | |
'adversarial': ['adversarial', 'robust', 'attack'] | |
} | |
# Analyze papers | |
for paper in papers: | |
content = f"{paper.get('title', '')} {paper.get('abstract', '')}".lower() | |
# Count research areas | |
for area, keywords in area_keywords.items(): | |
if any(keyword in content for keyword in keywords): | |
research_areas[area] += 1 | |
# Count methodologies | |
for method, keywords in method_keywords.items(): | |
if any(keyword in content for keyword in keywords): | |
methodologies[method] += 1 | |
# Identify data types | |
if 'dataset' in content or 'data' in content: | |
if any(word in content for word in ['text', 'corpus', 'language']): | |
data_types['text'] += 1 | |
elif any(word in content for word in ['image', 'visual', 'video']): | |
data_types['image'] += 1 | |
elif any(word in content for word in ['audio', 'speech', 'sound']): | |
data_types['audio'] += 1 | |
elif any(word in content for word in ['sensor', 'iot', 'time series']): | |
data_types['sensor'] += 1 | |
else: | |
data_types['tabular'] += 1 | |
# Identify gaps | |
gaps = { | |
'methodology_gaps': [], | |
'research_area_gaps': [], | |
'data_type_gaps': [], | |
'interdisciplinary_gaps': [], | |
'emerging_gaps': [] | |
} | |
# Find underexplored methodologies | |
total_papers = len(papers) | |
for method, count in methodologies.items(): | |
coverage = (count / total_papers) * 100 | |
if coverage < 5: # Less than 5% coverage | |
gaps['methodology_gaps'].append({ | |
'method': method.replace('_', ' ').title(), | |
'coverage_percent': coverage, | |
'papers_count': count | |
}) | |
# Find underexplored research areas | |
for area, count in research_areas.items(): | |
coverage = (count / total_papers) * 100 | |
if coverage < 10: # Less than 10% coverage | |
gaps['research_area_gaps'].append({ | |
'area': area.replace('_', ' ').title(), | |
'coverage_percent': coverage, | |
'papers_count': count | |
}) | |
# Find underexplored data types | |
for dtype, count in data_types.items(): | |
coverage = (count / total_papers) * 100 | |
if coverage < 15: # Less than 15% coverage | |
gaps['data_type_gaps'].append({ | |
'data_type': dtype.replace('_', ' ').title(), | |
'coverage_percent': coverage, | |
'papers_count': count | |
}) | |
# Generate AI-powered gap analysis | |
if self.groq_processor: | |
ai_analysis = self._generate_ai_gap_analysis(papers, gaps) | |
gaps['ai_analysis'] = ai_analysis | |
gaps['analysis_summary'] = { | |
'total_papers_analyzed': total_papers, | |
'methodology_gaps_found': len(gaps['methodology_gaps']), | |
'research_area_gaps_found': len(gaps['research_area_gaps']), | |
'data_type_gaps_found': len(gaps['data_type_gaps']), | |
'analysis_timestamp': datetime.now().isoformat() | |
} | |
return gaps | |
except Exception as e: | |
return { | |
'error': f'Gap detection failed: {str(e)}', | |
'analysis_timestamp': datetime.now().isoformat() | |
} | |
def generate_trend_report(self, papers: List[Dict]) -> Dict: | |
"""Generate comprehensive trend report""" | |
try: | |
if not papers: | |
return {'error': 'No papers provided for trend report'} | |
print(f"📊 Generating trend report for {len(papers)} papers...") | |
# Run all analyses | |
temporal_trends = self.analyze_temporal_trends(papers) | |
research_gaps = self.detect_research_gaps(papers) | |
# Generate keyword trends | |
keyword_analysis = self._analyze_keyword_trends(papers) | |
# Generate emerging topics | |
emerging_topics = self._detect_emerging_topics(papers) | |
# Generate AI-powered executive summary | |
executive_summary = self._generate_executive_summary(papers, temporal_trends, research_gaps) | |
# Compile comprehensive report | |
report = { | |
'executive_summary': executive_summary, | |
'temporal_trends': temporal_trends, | |
'research_gaps': research_gaps, | |
'keyword_analysis': keyword_analysis, | |
'emerging_topics': emerging_topics, | |
'report_metadata': { | |
'papers_analyzed': len(papers), | |
'analysis_date': datetime.now().isoformat(), | |
'report_version': '2.0' | |
} | |
} | |
return report | |
except Exception as e: | |
return { | |
'error': f'Trend report generation failed: {str(e)}', | |
'analysis_timestamp': datetime.now().isoformat() | |
} | |
def _extract_keywords(self, content: str) -> List[str]: | |
"""Extract keywords from content using simple NLP""" | |
# Remove common words and extract meaningful terms | |
stop_words = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'a', 'an', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'shall', 'can', 'this', 'that', 'these', 'those', 'we', 'they', 'our', 'their', 'using', 'based', 'approach', 'method', 'model', 'paper', 'study', 'research', 'work', 'results', 'show', 'propose', 'present'} | |
# Extract words (simple tokenization) | |
words = re.findall(r'\b[a-zA-Z]+\b', content.lower()) | |
# Filter keywords | |
keywords = [word for word in words if len(word) > 3 and word not in stop_words] | |
# Return top keywords | |
return list(Counter(keywords).keys())[:20] | |
def _analyze_keyword_trends(self, papers: List[Dict]) -> Dict: | |
"""Analyze keyword trends over time""" | |
try: | |
keyword_by_year = defaultdict(lambda: defaultdict(int)) | |
for paper in papers: | |
year = paper.get('year') | |
if not year: | |
continue | |
content = f"{paper.get('title', '')} {paper.get('abstract', '')}".lower() | |
keywords = self._extract_keywords(content) | |
for keyword in keywords[:10]: # Top 10 keywords per paper | |
keyword_by_year[year][keyword] += 1 | |
# Find trending keywords | |
trending_keywords = {} | |
for keyword in set().union(*[keywords.keys() for keywords in keyword_by_year.values()]): | |
years = sorted(keyword_by_year.keys()) | |
if len(years) >= 2: | |
recent_count = keyword_by_year[years[-1]][keyword] | |
previous_count = keyword_by_year[years[-2]][keyword] | |
if previous_count > 0: | |
trend = ((recent_count - previous_count) / previous_count) * 100 | |
trending_keywords[keyword] = trend | |
# Get top trending keywords | |
top_trending = sorted(trending_keywords.items(), key=lambda x: x[1], reverse=True)[:10] | |
return { | |
'keyword_evolution': dict(keyword_by_year), | |
'trending_keywords': top_trending, | |
'analysis_timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
return { | |
'error': f'Keyword trend analysis failed: {str(e)}', | |
'analysis_timestamp': datetime.now().isoformat() | |
} | |
def _detect_emerging_topics(self, papers: List[Dict]) -> Dict: | |
"""Detect emerging research topics""" | |
try: | |
# Group papers by recent years | |
recent_papers = [] | |
older_papers = [] | |
current_year = datetime.now().year | |
for paper in papers: | |
year = paper.get('year') | |
if not year: | |
continue | |
if isinstance(year, str): | |
try: | |
year = int(year) | |
except ValueError: | |
continue | |
if year >= current_year - 2: # Last 2 years | |
recent_papers.append(paper) | |
else: | |
older_papers.append(paper) | |
# Extract topics from recent vs older papers | |
recent_topics = set() | |
older_topics = set() | |
for paper in recent_papers: | |
content = f"{paper.get('title', '')} {paper.get('abstract', '')}".lower() | |
topics = self._extract_keywords(content) | |
recent_topics.update(topics[:5]) # Top 5 topics per paper | |
for paper in older_papers: | |
content = f"{paper.get('title', '')} {paper.get('abstract', '')}".lower() | |
topics = self._extract_keywords(content) | |
older_topics.update(topics[:5]) | |
# Find emerging topics (in recent but not in older) | |
emerging = recent_topics - older_topics | |
return { | |
'emerging_topics': list(emerging)[:15], # Top 15 emerging topics | |
'recent_papers_count': len(recent_papers), | |
'older_papers_count': len(older_papers), | |
'analysis_timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
return { | |
'error': f'Emerging topic detection failed: {str(e)}', | |
'analysis_timestamp': datetime.now().isoformat() | |
} | |
def _generate_ai_gap_analysis(self, papers: List[Dict], gaps: Dict) -> str: | |
"""Generate AI-powered gap analysis""" | |
try: | |
if not self.groq_processor: | |
return "AI analysis not available - Groq processor not initialized" | |
# Prepare summary for AI analysis | |
summary = f""" | |
Research Gap Analysis Summary: | |
- Total Papers Analyzed: {len(papers)} | |
- Methodology Gaps Found: {len(gaps['methodology_gaps'])} | |
- Research Area Gaps Found: {len(gaps['research_area_gaps'])} | |
- Data Type Gaps Found: {len(gaps['data_type_gaps'])} | |
Top Methodology Gaps: | |
{', '.join([gap['method'] for gap in gaps['methodology_gaps'][:5]])} | |
Top Research Area Gaps: | |
{', '.join([gap['area'] for gap in gaps['research_area_gaps'][:5]])} | |
""" | |
prompt = f"""Based on this research gap analysis, provide insights on: | |
{summary} | |
Please provide: | |
1. **Key Research Gaps**: Most significant gaps and why they matter | |
2. **Opportunities**: Potential research opportunities in underexplored areas | |
3. **Recommendations**: Specific recommendations for future research | |
4. **Priority Areas**: Which gaps should be prioritized and why | |
Format as a structured analysis.""" | |
response = self.groq_processor.generate_response(prompt, max_tokens=1500) | |
return response | |
except Exception as e: | |
return f"AI gap analysis failed: {str(e)}" | |
def _generate_executive_summary(self, papers: List[Dict], temporal_trends: Dict, research_gaps: Dict) -> str: | |
"""Generate executive summary of trend analysis""" | |
try: | |
if not self.groq_processor: | |
return "Executive summary not available - Groq processor not initialized" | |
# Prepare data for summary | |
growth_info = temporal_trends.get('growth_analysis', {}) | |
gap_summary = research_gaps.get('analysis_summary', {}) | |
prompt = f"""Generate an executive summary for this research trend analysis: | |
Papers Analyzed: {len(papers)} | |
Publication Growth: {growth_info.get('trend_direction', 'unknown')} ({growth_info.get('growth_rate_percent', 0):.1f}%) | |
Research Gaps Found: {gap_summary.get('methodology_gaps_found', 0)} methodology gaps, {gap_summary.get('research_area_gaps_found', 0)} area gaps | |
Temporal Analysis: | |
- Year Range: {temporal_trends.get('temporal_analysis', {}).get('year_range', 'N/A')} | |
- Peak Year: {temporal_trends.get('temporal_analysis', {}).get('peak_year', 'N/A')} | |
- Average Papers/Year: {temporal_trends.get('temporal_analysis', {}).get('average_per_year', 0):.1f} | |
Provide a 3-paragraph executive summary covering: | |
1. Overall research landscape and trends | |
2. Key findings and patterns | |
3. Implications and future directions""" | |
response = self.groq_processor.generate_response(prompt, max_tokens=1000) | |
return response | |
except Exception as e: | |
return f"Executive summary generation failed: {str(e)}" | |
def get_trend_summary(self) -> Dict: | |
"""Get summary of all trend data""" | |
return { | |
'total_trends_tracked': len(self.trend_data), | |
'keyword_trends_count': len(self.keyword_trends), | |
'temporal_data_points': sum(len(data) for data in self.temporal_data.values()), | |
'last_analysis': datetime.now().isoformat() | |
} | |