import gradio as gr import google.generativeai as genai from ddgs import DDGS import requests from bs4 import BeautifulSoup import time from urllib.parse import urlparse import re import json from typing import List, Dict, Any from datetime import datetime import os import tempfile from reportlab.lib.pagesizes import letter, A4 from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak, Table, TableStyle from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import inch from reportlab.lib import colors from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_RIGHT import markdown # Application Constants APP_NAME = "DeepResearchAgent-AI" APP_VERSION = "v2.0" APP_DESCRIPTION = "Advanced AI-Powered Research Assistant" # Enhanced topic detection and search helper functions def detect_topic_category(query: str) -> str: """Detect the category of research topic for specialized search strategies""" politics_keywords = ['politics', 'political', 'government', 'policy', 'election', 'democracy', 'parliament', 'congress', 'senate', 'president', 'minister', 'geopolitics', 'diplomacy', 'foreign policy', 'international relations'] history_keywords = ['history', 'historical', 'ancient', 'medieval', 'world war', 'civilization', 'empire', 'dynasty', 'revolution', 'century', 'era', 'timeline', 'past', 'heritage'] geography_keywords = ['geography', 'geographical', 'country', 'continent', 'ocean', 'mountain', 'river', 'climate', 'population', 'capital', 'border', 'region', 'territory', 'map'] current_affairs_keywords = ['current', 'news', 'today', 'recent', 'latest', 'breaking', 'update', 'happening', '2024', '2025', 'this year', 'now'] technology_keywords = ['technology', 'tech', 'ai', 'artificial intelligence', 'machine learning', 'software', 'hardware', 'computer', 'digital', 'programming', 'coding', 'algorithm', 'data science', 'cybersecurity'] war_keywords = ['war', 'warfare', 'conflict', 'battle', 'military', 'army', 'defense', 'weapon', 'strategy', 'combat', 'invasion', 'occupation', 'siege'] economics_keywords = ['economy', 'economic', 'finance', 'financial', 'market', 'trade', 'business', 'industry', 'company', 'corporation', 'gdp', 'inflation', 'recession'] science_keywords = ['science', 'scientific', 'research', 'study', 'experiment', 'discovery', 'innovation', 'physics', 'chemistry', 'biology', 'medicine', 'health'] query_lower = query.lower() if any(keyword in query_lower for keyword in politics_keywords): return 'politics' elif any(keyword in query_lower for keyword in history_keywords): return 'history' elif any(keyword in query_lower for keyword in geography_keywords): return 'geography' elif any(keyword in query_lower for keyword in current_affairs_keywords): return 'current_affairs' elif any(keyword in query_lower for keyword in technology_keywords): return 'technology' elif any(keyword in query_lower for keyword in war_keywords): return 'war' elif any(keyword in query_lower for keyword in economics_keywords): return 'economics' elif any(keyword in query_lower for keyword in science_keywords): return 'science' else: return 'general' def get_specialized_domains(topic_type: str) -> List[str]: """Get specialized domains based on topic category""" domain_mapping = { 'politics': ['reuters.com', 'bbc.com', 'cnn.com', 'politico.com', 'foreignaffairs.com', 'cfr.org', 'brookings.edu', 'csis.org'], 'history': ['britannica.com', 'history.com', 'nationalgeographic.com', 'smithsonianmag.com', 'historynet.com', 'worldhistory.org'], 'geography': ['nationalgeographic.com', 'worldatlas.com', 'britannica.com', 'cia.gov', 'worldbank.org', 'un.org'], 'current_affairs': ['reuters.com', 'bbc.com', 'cnn.com', 'ap.org', 'npr.org', 'aljazeera.com', 'theguardian.com', 'nytimes.com'], 'technology': ['techcrunch.com', 'wired.com', 'ars-technica.com', 'ieee.org', 'nature.com', 'sciencemag.org', 'mit.edu', 'stanford.edu'], 'war': ['janes.com', 'defensenews.com', 'militarytimes.com', 'csis.org', 'rand.org', 'stratfor.com'], 'economics': ['reuters.com', 'bloomberg.com', 'economist.com', 'ft.com', 'worldbank.org', 'imf.org', 'federalreserve.gov'], 'science': ['nature.com', 'sciencemag.org', 'scientificamerican.com', 'newscientist.com', 'pnas.org', 'cell.com'], 'general': ['wikipedia.org', 'britannica.com', 'reuters.com', 'bbc.com', 'cnn.com'] } return domain_mapping.get(topic_type, domain_mapping['general']) def get_topic_keywords(query: str, topic_type: str) -> List[str]: """Get enhanced keywords based on topic category""" keyword_mapping = { 'politics': ['analysis', 'policy', 'government', 'official', 'statement', 'report', 'briefing', 'summit', 'debate', 'legislation'], 'history': ['timeline', 'chronology', 'facts', 'documented', 'archive', 'primary source', 'historian', 'evidence', 'analysis', 'context'], 'geography': ['facts', 'statistics', 'data', 'demographic', 'topography', 'atlas', 'survey', 'official', 'census', 'coordinates'], 'current_affairs': ['breaking', 'latest', 'update', 'developing', 'live', 'recent', 'today', 'headlines', 'news', 'report'], 'technology': ['innovation', 'breakthrough', 'development', 'advancement', 'research', 'cutting-edge', 'emerging', 'trend', 'future', 'application'], 'war': ['analysis', 'strategy', 'tactics', 'intelligence', 'assessment', 'report', 'conflict', 'situation', 'update', 'briefing'], 'economics': ['analysis', 'forecast', 'data', 'statistics', 'trend', 'market', 'report', 'outlook', 'indicator', 'growth'], 'science': ['research', 'study', 'discovery', 'breakthrough', 'publication', 'peer-reviewed', 'journal', 'findings', 'methodology', 'evidence'], 'general': ['information', 'facts', 'comprehensive', 'detailed', 'overview', 'guide', 'explanation', 'analysis', 'summary', 'background'] } return keyword_mapping.get(topic_type, keyword_mapping['general']) def get_priority_domains_for_topic(topic_type: str) -> List[str]: """Get priority domains for result ranking based on topic""" priority_mapping = { 'politics': ['reuters.com', 'bbc.com', 'cnn.com', 'politico.com', 'foreignaffairs.com', 'cfr.org', 'brookings.edu', 'apnews.com'], 'history': ['britannica.com', 'history.com', 'nationalgeographic.com', 'smithsonianmag.com', 'worldhistory.org', 'historynet.com'], 'geography': ['nationalgeographic.com', 'worldatlas.com', 'britannica.com', 'cia.gov', 'worldbank.org', 'un.org'], 'current_affairs': ['reuters.com', 'bbc.com', 'cnn.com', 'ap.org', 'npr.org', 'aljazeera.com', 'theguardian.com', 'nytimes.com'], 'technology': ['techcrunch.com', 'wired.com', 'ars-technica.com', 'ieee.org', 'nature.com', 'mit.edu', 'stanford.edu', 'acm.org'], 'war': ['janes.com', 'defensenews.com', 'csis.org', 'rand.org', 'stratfor.com', 'cfr.org'], 'economics': ['reuters.com', 'bloomberg.com', 'economist.com', 'ft.com', 'worldbank.org', 'imf.org', 'federalreserve.gov'], 'science': ['nature.com', 'sciencemag.org', 'scientificamerican.com', 'newscientist.com', 'pnas.org', 'cell.com'], 'general': ['wikipedia.org', 'britannica.com', 'reuters.com', 'bbc.com', 'cnn.com', 'nationalgeographic.com'] } return priority_mapping.get(topic_type, priority_mapping['general']) # Sanitize filename for safe file creation def sanitize_filename(filename: str) -> str: """Sanitize filename to remove invalid characters for Windows/Unix systems""" # Remove or replace invalid characters invalid_chars = '<>:"/\\|?*' for char in invalid_chars: filename = filename.replace(char, '_') # Remove multiple consecutive underscores and trim filename = re.sub(r'_+', '_', filename) filename = filename.strip('_') # Limit length to prevent issues if len(filename) > 200: filename = filename[:200] # Ensure it's not empty and add extension if missing if not filename: filename = "research_report" if not filename.endswith('.md'): filename += '.md' return filename # PDF Generation Function def create_pdf_report(content: str, topic: str, sources: List[Dict], filename: str) -> str: """Create a professional PDF report from markdown content""" try: # Create temporary PDF file temp_dir = tempfile.gettempdir() pdf_path = os.path.join(temp_dir, filename.replace('.md', '.pdf')) # Create PDF document doc = SimpleDocTemplate(pdf_path, pagesize=A4, topMargin=1*inch, bottomMargin=1*inch) styles = getSampleStyleSheet() story = [] # Custom styles title_style = ParagraphStyle( 'CustomTitle', parent=styles['Heading1'], fontSize=24, textColor=colors.HexColor('#2C3E50'), spaceAfter=30, alignment=TA_CENTER, fontName='Helvetica-Bold' ) subtitle_style = ParagraphStyle( 'CustomSubtitle', parent=styles['Heading2'], fontSize=14, textColor=colors.HexColor('#34495E'), spaceAfter=20, alignment=TA_CENTER ) header_style = ParagraphStyle( 'CustomHeader', parent=styles['Heading2'], fontSize=16, textColor=colors.HexColor('#2980B9'), spaceAfter=12, spaceBefore=20, fontName='Helvetica-Bold' ) body_style = ParagraphStyle( 'CustomBody', parent=styles['Normal'], fontSize=11, textColor=colors.HexColor('#2C3E50'), spaceAfter=6, alignment=TA_LEFT, leading=14 ) # Header Section story.append(Paragraph(APP_NAME, title_style)) story.append(Paragraph(APP_DESCRIPTION, subtitle_style)) story.append(Spacer(1, 0.2*inch)) # Add decorative line line_data = [['', '']] line_table = Table(line_data, colWidths=[5*inch]) line_table.setStyle(TableStyle([ ('LINEBELOW', (0,0), (-1,-1), 2, colors.HexColor('#3498DB')), ])) story.append(line_table) story.append(Spacer(1, 0.3*inch)) # Research Topic story.append(Paragraph("Research Topic", header_style)) story.append(Paragraph(topic, body_style)) story.append(Spacer(1, 0.2*inch)) # Generation Info current_time = datetime.now().strftime("%B %d, %Y at %I:%M %p") story.append(Paragraph("Generated", header_style)) story.append(Paragraph(f"{current_time}", body_style)) story.append(Spacer(1, 0.2*inch)) # Sources Summary if sources: story.append(Paragraph("Sources Analyzed", header_style)) story.append(Paragraph(f"{len(sources)} reliable sources processed", body_style)) story.append(Spacer(1, 0.3*inch)) story.append(PageBreak()) # Main Content story.append(Paragraph("Research Report", header_style)) story.append(Spacer(1, 0.1*inch)) # Process markdown content lines = content.split('\n') for line in lines: line = line.strip() if not line: story.append(Spacer(1, 6)) continue if line.startswith('# '): story.append(Paragraph(line[2:], header_style)) elif line.startswith('## '): story.append(Paragraph(line[3:], header_style)) elif line.startswith('### '): header_3_style = ParagraphStyle( 'Header3', parent=header_style, fontSize=14, textColor=colors.HexColor('#7F8C8D') ) story.append(Paragraph(line[4:], header_3_style)) elif line.startswith('**') and line.endswith('**'): bold_style = ParagraphStyle( 'Bold', parent=body_style, fontName='Helvetica-Bold' ) story.append(Paragraph(line[2:-2], bold_style)) elif line.startswith('- ') or line.startswith('* '): bullet_style = ParagraphStyle( 'Bullet', parent=body_style, leftIndent=20, bulletIndent=10, bulletText='•', bulletColor=colors.HexColor('#3498DB') ) story.append(Paragraph(line[2:], bullet_style)) elif line.startswith(('1. ', '2. ', '3. ', '4. ', '5. ')): story.append(Paragraph(line, body_style)) else: # Clean basic markdown formatting line = re.sub(r'\*\*(.*?)\*\*', r'\1', line) line = re.sub(r'\*(.*?)\*', r'\1', line) story.append(Paragraph(line, body_style)) # Footer section story.append(PageBreak()) story.append(Paragraph("Sources", header_style)) if sources: for i, source in enumerate(sources[:10], 1): # Limit to 10 sources source_style = ParagraphStyle( 'Source', parent=body_style, fontSize=10, leftIndent=10, spaceAfter=8 ) title = source.get('title', 'No Title')[:100] url = source.get('url', '') story.append(Paragraph(f"{i}. {title}", source_style)) if url: url_style = ParagraphStyle( 'URL', parent=source_style, fontSize=9, textColor=colors.HexColor('#3498DB'), leftIndent=20 ) story.append(Paragraph(url, url_style)) # Footer story.append(Spacer(1, 0.5*inch)) footer_style = ParagraphStyle( 'Footer', parent=styles['Normal'], fontSize=10, textColor=colors.HexColor('#7F8C8D'), alignment=TA_CENTER ) story.append(Paragraph(f"Generated by {APP_NAME} {APP_VERSION} | Advanced AI Research Assistant", footer_style)) # Build PDF doc.build(story) return pdf_path except Exception as e: print(f"PDF generation error: {e}") return None # Validate Gemini API key def validate_api_key(api_key: str) -> tuple[bool, str]: """Validate if the Gemini API key is working""" if not api_key or not api_key.strip(): return False, "❌ API key is empty. Please enter a valid Gemini API key." api_key = api_key.strip() # Basic format checks if len(api_key) < 20: return False, "❌ API key seems too short. Please check that you copied the complete key." if not api_key.replace('-', '').replace('_', '').isalnum(): return False, "❌ API key contains invalid characters. Please check your key format." try: # Test the API key with a simple request genai.configure(api_key=api_key) model = genai.GenerativeModel('gemini-2.0-flash') # Try a minimal test generation with timeout response = model.generate_content("Test", generation_config={"max_output_tokens": 10}) return True, "✅ API key is valid and working!" except Exception as e: error_msg = str(e).lower() print(f"API Key validation error: {e}") # Debug info if "api key not valid" in error_msg or "api_key_invalid" in error_msg: return False, """❌ Invalid API key. Please check your Gemini API key and try again. **Common issues:** • Make sure you copied the ENTIRE key from https://aistudio.google.com/ • Check for extra spaces at the beginning or end • Try refreshing the page and copying the key again • Make sure you're using the correct API key (not mixing up with other services)""" elif "quota" in error_msg or "limit" in error_msg: return False, """❌ API quota exceeded. Your Gemini API usage limit has been reached. **Solutions:** • Check your usage at https://aistudio.google.com/ • Wait for the quota to reset (usually monthly) • Consider upgrading your plan if needed""" elif "permission" in error_msg or "forbidden" in error_msg: return False, """❌ API key doesn't have required permissions. **Solutions:** • Regenerate your API key at https://aistudio.google.com/ • Make sure the API key is enabled for Gemini API • Check if your Google Cloud project has the necessary permissions""" elif "network" in error_msg or "connection" in error_msg or "timeout" in error_msg: return False, """❌ Network error. Please check your internet connection and try again. **Troubleshooting:** • Check your internet connection • Try again in a few minutes • Disable VPN if you're using one • Check if Google services are accessible in your region""" elif "model" in error_msg: return False, """❌ Model not available. The specified Gemini model might not be available. **Solutions:** • Try using a different model (like 'gemini-pro') • Check Gemini API availability at https://status.cloud.google.com/""" else: return False, f"""❌ API key validation failed: {str(e)} **Debugging tips:** • Make sure you're using a valid Gemini API key from https://aistudio.google.com/ • Try creating a new API key if the current one doesn't work • Check the Google Cloud Console for any billing or permission issues""" # Search the web for relevant information using DuckDuckGo with enhanced targeting for diverse topics def web_search(query: str, max_results: int = 15) -> List[Dict[str, str]]: """Enhanced search for diverse topics: Politics, History, Technology, Current Affairs, etc.""" try: with DDGS() as ddgs: all_results = [] # Detect topic category for specialized search topic_type = detect_topic_category(query.lower()) print(f"Detected topic category: {topic_type}") # Strategy 1: Exact phrase search try: exact_results = list(ddgs.text(f'"{query}"', max_results=max_results//3)) all_results.extend(exact_results) print(f"Found {len(exact_results)} results from exact search") except Exception as e: print(f"Exact search error: {e}") # Strategy 2: Topic-specific domain searches specialized_domains = get_specialized_domains(topic_type) for domain in specialized_domains: try: domain_results = list(ddgs.text(f'{query} site:{domain}', max_results=2)) all_results.extend(domain_results) if len(all_results) >= max_results: break except Exception as e: print(f"Domain search error for {domain}: {e}") continue # Strategy 3: Enhanced keyword searches based on topic enhanced_keywords = get_topic_keywords(query, topic_type) for keyword in enhanced_keywords[:5]: try: keyword_results = list(ddgs.text(f'{query} {keyword}', max_results=2)) all_results.extend(keyword_results) if len(all_results) >= max_results: break except Exception as e: print(f"Keyword search error for {keyword}: {e}") continue # Strategy 4: Time-based searches for current affairs if topic_type in ['current_affairs', 'politics', 'technology', 'news']: time_modifiers = ['2024', '2025', 'latest', 'recent', 'current', 'today', 'this year'] for modifier in time_modifiers[:3]: try: time_results = list(ddgs.text(f'{query} {modifier}', max_results=2)) all_results.extend(time_results) if len(all_results) >= max_results: break except Exception as e: print(f"Time-based search error for {modifier}: {e}") continue # Strategy 5: Academic and authoritative sources academic_modifiers = ['analysis', 'research', 'study', 'report', 'comprehensive', 'detailed'] for modifier in academic_modifiers[:3]: try: academic_results = list(ddgs.text(f'{query} {modifier}', max_results=2)) all_results.extend(academic_results) if len(all_results) >= max_results: break except Exception as e: print(f"Academic search error for {modifier}: {e}") continue # Strategy 6: Fallback comprehensive search if len(all_results) < 8: try: general_results = list(ddgs.text(query, max_results=max_results//2)) all_results.extend(general_results) except Exception as e: print(f"General search error: {e}") # Remove duplicates and prioritize authoritative domains seen_urls = set() unique_results = [] priority_domains = get_priority_domains_for_topic(topic_type) # First, add results from priority domains for result in all_results: url = result.get('href', '') if url not in seen_urls and any(domain in url for domain in priority_domains): seen_urls.add(url) unique_results.append(result) if len(unique_results) >= max_results: break # Then add other unique results for result in all_results: url = result.get('href', '') if url not in seen_urls: seen_urls.add(url) unique_results.append(result) if len(unique_results) >= max_results: break print(f"Total unique results found: {len(unique_results)}") return unique_results[:max_results] except Exception as e: print(f"Search error: {e}") # Final fallback - simple search try: with DDGS() as ddgs: results = list(ddgs.text(query, max_results=min(max_results, 5))) print(f"Fallback search found: {len(results)} results") return results except Exception as e2: print(f"Fallback search error: {e2}") return [] # Fetch and extract content from a URL with better error handling def fetch_url_content(url: str) -> str: """Fetch content from a URL and extract meaningful text with enhanced error handling""" try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } # Increase timeout and add retries response = requests.get(url, headers=headers, timeout=15, allow_redirects=True) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Remove unwanted elements for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'iframe', 'noscript']): element.decompose() # Try to get the main content area first main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=['content', 'main', 'body']) if main_content: text = main_content.get_text() else: text = soup.get_text() # Clean up text more thoroughly lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = ' '.join(chunk for chunk in chunks if chunk and len(chunk) > 2) # Remove excessive whitespace and clean up text = re.sub(r'\s+', ' ', text) text = text.strip() # Return more content for better analysis - increased from 5000 to 8000 return text[:8000] if text else "" except requests.exceptions.Timeout: print(f"Timeout error for {url} - trying with shorter timeout") try: # Retry with shorter timeout response = requests.get(url, headers=headers, timeout=8, allow_redirects=True) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') text = soup.get_text() text = re.sub(r'\s+', ' ', text.strip()) return text[:5000] if text else "" except Exception as retry_error: print(f"Retry failed for {url}: {retry_error}") return "" except requests.exceptions.RequestException as e: print(f"Request error fetching {url}: {e}") return "" except Exception as e: print(f"Unexpected error fetching {url}: {e}") return "" # Research function using web search and content extraction with enhanced analysis for diverse topics def perform_research(query: str, max_sources: int = 12) -> Dict[str, Any]: """Perform comprehensive research by searching and extracting content from multiple sources""" print(f"🔍 Starting comprehensive research for: {query}") # Detect topic category for better research strategy topic_type = detect_topic_category(query.lower()) print(f"📊 Detected topic category: {topic_type}") # Search for relevant sources with more results to ensure we get at least 10 quality sources search_results = web_search(query, max_results=max_sources*4) # Get more results initially print(f"📊 Found {len(search_results)} potential sources") sources = [] content_chunks = [] successful_fetches = 0 failed_fetches = 0 for i, result in enumerate(search_results): if successful_fetches >= max_sources: break url = result.get('href', '') title = result.get('title', 'No title') # Skip low-quality or duplicate sources if should_skip_source(url, title, sources): print(f"⏭️ Skipping {url} - low quality or duplicate") continue print(f"🌐 Fetching content from {url}") content = fetch_url_content(url) if content and len(content) > 150: # Minimum content threshold # Validate content quality for the specific topic if is_relevant_content(content, query, topic_type): sources.append({ 'title': title, 'url': url, 'content': content, 'topic_type': topic_type }) content_chunks.append(f"SOURCE {successful_fetches + 1} [{topic_type.upper()}]:\nTITLE: {title}\nURL: {url}\nCONTENT:\n{content}\n{'='*100}\n") successful_fetches += 1 print(f"✅ Successfully extracted {len(content)} characters from source {successful_fetches}") else: print(f"⚠️ Content not relevant for {query}") failed_fetches += 1 else: print(f"⚠️ Skipped {url} - insufficient content ({len(content) if content else 0} chars)") failed_fetches += 1 # Add small delay to be respectful time.sleep(0.3) # If we don't have enough sources, try a broader search if successful_fetches < 8: print(f"🔄 Only found {successful_fetches} quality sources, trying broader search...") broader_results = web_search(f"{query} comprehensive analysis", max_results=15) for result in broader_results: if successful_fetches >= max_sources: break url = result.get('href', '') title = result.get('title', 'No title') if should_skip_source(url, title, sources): continue content = fetch_url_content(url) if content and len(content) > 100: sources.append({ 'title': title, 'url': url, 'content': content, 'topic_type': 'additional' }) content_chunks.append(f"ADDITIONAL SOURCE {successful_fetches + 1}:\nTITLE: {title}\nURL: {url}\nCONTENT:\n{content}\n{'='*100}\n") successful_fetches += 1 print(f"✅ Additional source {successful_fetches} added") time.sleep(0.3) research_context = "\n".join(content_chunks) print(f"📝 Research completed: {successful_fetches} sources processed, {failed_fetches} failed") print(f"📊 Total content length: {len(research_context)} characters") return { 'sources': sources, 'research_context': research_context, 'query': query, 'total_sources': successful_fetches, 'topic_type': topic_type, 'failed_sources': failed_fetches } def should_skip_source(url: str, title: str, existing_sources: List[Dict]) -> bool: """Check if a source should be skipped based on quality and duplication""" # Skip if URL already exists existing_urls = [source['url'] for source in existing_sources] if url in existing_urls: return True # Skip low-quality domains low_quality_domains = ['pinterest.com', 'instagram.com', 'facebook.com', 'twitter.com', 'tiktok.com', 'reddit.com'] if any(domain in url for domain in low_quality_domains): return True # Skip if title is too short or generic if len(title) < 10 or title.lower() in ['no title', 'untitled', 'page not found']: return True return False def is_relevant_content(content: str, query: str, topic_type: str) -> bool: """Check if content is relevant to the query and topic type""" content_lower = content.lower() query_words = query.lower().split() # Check if at least 30% of query words appear in content matching_words = sum(1 for word in query_words if word in content_lower) word_relevance = matching_words / len(query_words) if query_words else 0 # Topic-specific relevance keywords topic_relevance_keywords = get_topic_keywords(query, topic_type) topic_matches = sum(1 for keyword in topic_relevance_keywords if keyword.lower() in content_lower) # Content should have reasonable length and relevance return len(content) > 200 and (word_relevance >= 0.3 or topic_matches >= 2) # Generate a research report using Gemini with enhanced topic handling def generate_research_report(research_data: Dict[str, Any], gemini_api_key: str) -> str: """Generate a comprehensive research report using Gemini for diverse topics""" if not gemini_api_key: return "❌ Gemini API key is required to generate the report." # Validate API key first is_valid, validation_message = validate_api_key(gemini_api_key) if not is_valid: return f"❌ {validation_message}" try: # Initialize Gemini (already configured in validation) model = genai.GenerativeModel('gemini-2.0-flash') topic_type = research_data.get('topic_type', 'general') failed_sources = research_data.get('failed_sources', 0) # Create topic-specific prompt prompt = f""" RESEARCH QUERY: {research_data['query']} TOPIC CATEGORY: {topic_type.upper()} TOTAL SOURCES ANALYZED: {research_data.get('total_sources', len(research_data['sources']))} FAILED SOURCES: {failed_sources} COMPREHENSIVE RESEARCH DATA FROM MULTIPLE AUTHORITATIVE SOURCES: {research_data['research_context']} INSTRUCTIONS FOR {topic_type.upper()} RESEARCH REPORT: Based on the above research data, create a comprehensive, well-structured report analyzing ALL the information provided. This is a {topic_type} research topic, so focus on relevant aspects for this domain. Your report structure should include: 1. **EXECUTIVE SUMMARY** - Key findings and main points about {research_data['query']} - Critical insights and takeaways - Brief overview of what the research reveals 2. **DETAILED ANALYSIS** - In-depth examination of all collected information - Multiple perspectives and viewpoints found in sources - Connections between different pieces of information - Contradictions or debates if any exist 3. **BACKGROUND & CONTEXT** - Historical background (if relevant) - Current situation and status - Relevant context that helps understand the topic 4. **KEY FINDINGS & INSIGHTS** - Most important discoveries from the research - Patterns and trends identified - Significant facts and statistics - Expert opinions and analysis 5. **CURRENT STATUS & DEVELOPMENTS** - Latest information and recent developments - Current state of affairs - Recent changes or updates 6. **DIFFERENT PERSPECTIVES** - Various viewpoints found in sources - Debates and discussions around the topic - Conflicting information (if any) 7. **IMPLICATIONS & SIGNIFICANCE** - Why this topic matters - Impact and consequences - Future implications 8. **DETAILED BREAKDOWN** - Specific details from each major source - Technical information (if applicable) - Statistics and data points - Quotes and specific information 9. **CONCLUSIONS** - Summary of what was discovered - Final thoughts and analysis - Gaps in information (if any) 10. **SOURCES & REFERENCES** - List all sources with proper attribution - Include URLs for verification - Note the reliability and type of each source FORMATTING REQUIREMENTS: - Use clear Markdown formatting with headers (##), subheaders (###), and bullet points - Make the content engaging, informative, and well-organized - Include specific details, examples, and quotes from the sources - Highlight important information with **bold text** - Use bullet points for lists and key points - Organize information logically and coherently - If information is conflicting, present both sides - If insufficient information is available for any section, clearly state what could not be determined CONTENT REQUIREMENTS: - Base your analysis ONLY on the provided source content - Do not make assumptions or add information not present in the sources - Include specific details and examples from multiple sources - Synthesize information from all sources, don't just summarize each one separately - Maintain objectivity and present facts as found in sources - If sources contradict each other, present both perspectives - Focus on creating a comprehensive understanding of {research_data['query']} TOPIC-SPECIFIC FOCUS FOR {topic_type.upper()}: {get_topic_specific_instructions(topic_type)} Remember: This report should be thorough, well-researched, and provide real value to someone wanting to understand {research_data['query']} comprehensively. """ response = model.generate_content(prompt) return response.text except Exception as e: error_msg = str(e).lower() print(f"Report generation error: {e}") # Debug info if "api key not valid" in error_msg or "api_key_invalid" in error_msg: return """❌ Invalid API key during report generation. **Common issues:** • Your API key may have expired or been revoked • Check if you copied the complete key • Try regenerating your API key at https://aistudio.google.com/""" elif "quota" in error_msg or "limit" in error_msg: return """❌ API quota exceeded during report generation. **Solutions:** • Check your usage at https://aistudio.google.com/ • Wait for the quota to reset (usually monthly) • Consider upgrading your plan if needed""" elif "permission" in error_msg or "forbidden" in error_msg: return """❌ API key doesn't have required permissions for report generation. **Solutions:** • Regenerate your API key at https://aistudio.google.com/ • Make sure the API key is enabled for Gemini API""" elif "network" in error_msg or "connection" in error_msg or "timeout" in error_msg: return """❌ Network error during report generation. **Troubleshooting:** • Check your internet connection • Try again in a few minutes • The report generation process may take some time""" elif "model" in error_msg: return """❌ Model not available for report generation. **Solutions:** • Try using a different model • Check Gemini API availability at https://status.cloud.google.com/""" else: return f"""❌ Error generating report: {str(e)} **Debugging tips:** • Try with a shorter research topic • Check your internet connection • Make sure your API key has sufficient quota""" def get_topic_specific_instructions(topic_type: str) -> str: """Get specific instructions based on topic category""" instructions = { 'politics': """ - Focus on political implications, policy details, and governmental aspects - Include information about key political figures, parties, and institutions - Analyze policy impacts and political consequences - Present multiple political perspectives objectively - Include information about voting patterns, polls, or public opinion if available """, 'history': """ - Provide chronological context and timeline of events - Include historical significance and long-term impacts - Mention key historical figures, dates, and places - Analyze causes and effects of historical events - Connect historical events to modern implications """, 'geography': """ - Include specific geographical data, coordinates, and locations - Provide demographic, climate, and physical geography information - Discuss economic geography and natural resources - Include maps, borders, and territorial information - Analyze geographical impacts on society and economy """, 'current_affairs': """ - Focus on the most recent developments and breaking news - Include timeline of recent events - Analyze immediate impacts and short-term consequences - Provide context for why this is currently significant - Include quotes from recent statements or press releases """, 'technology': """ - Focus on technical specifications, capabilities, and limitations - Include information about development timeline and key innovators - Analyze technological implications and future potential - Discuss adoption rates, market impact, and competitive landscape - Include technical details and how the technology works """, 'war': """ - Provide strategic analysis and military context - Include information about forces, tactics, and equipment involved - Analyze geopolitical implications and international responses - Discuss humanitarian impacts and civilian consequences - Present timeline of conflict development """, 'economics': """ - Include specific economic data, statistics, and indicators - Analyze market trends, financial impacts, and economic consequences - Discuss effects on different sectors and stakeholders - Include information about economic policies and their outcomes - Provide context about economic significance and implications """, 'science': """ - Focus on scientific methodology, research findings, and evidence - Include information about research institutions and scientists involved - Explain scientific concepts and their implications - Discuss peer review status and scientific consensus - Analyze potential applications and future research directions """ } return instructions.get(topic_type, "Focus on providing comprehensive, factual information with proper context and analysis.") # Main research function def run_research(topic: str, gemini_api_key: str, download_format: str = "markdown"): """Run the complete research process""" if not gemini_api_key.strip(): return "❌ Please enter your Gemini API key.", None, None, gr.update(visible=False), gr.update(visible=False) if not topic.strip(): return "❌ Please enter a research topic.", None, None, gr.update(visible=False), gr.update(visible=False) # First validate the API key is_valid, validation_message = validate_api_key(gemini_api_key) if not is_valid: return f"❌ {validation_message}", None, None, gr.update(visible=False), gr.update(visible=False) try: # Perform research print(f"Starting research for: {topic}") research_data = perform_research(topic) if not research_data['sources']: return "❌ No relevant sources found. Please try a different search term.", None, None, gr.update(visible=False), gr.update(visible=False) print(f"Found {len(research_data['sources'])} sources, generating report...") # Generate report report = generate_research_report(research_data, gemini_api_key) # Check if report generation was successful if report.startswith("❌"): return report, None, None, gr.update(visible=False), gr.update(visible=False) # Create safe downloadable filenames from the TOPIC, not the report content base_filename = sanitize_filename(topic) if not base_filename.endswith('.md'): base_filename = base_filename.replace('.md', '') + '_report.md' pdf_path = None try: # Generate PDF using the original topic for filename pdf_path = create_pdf_report(report, topic, research_data['sources'], base_filename) print(f"PDF generated successfully: {pdf_path}") except Exception as pdf_error: print(f"PDF generation failed: {pdf_error}") # Continue without PDF if it fails print(f"Research completed successfully. MD: {base_filename}") return report, base_filename, pdf_path, gr.update(visible=True), gr.update(visible=True) except Exception as e: print(f"Research error: {e}") # Debug info error_msg = f"❌ An error occurred during research: {str(e)}" return error_msg, None, None, gr.update(visible=False), gr.update(visible=False) # Gradio interface with dark theme def create_interface(): # Dark theme CSS dark_css = """ /* Dark theme base */ .gradio-container { background: linear-gradient(135deg, #1a1a2e 0%, #16213e 50%, #0f3460 100%) !important; min-height: 100vh; color: white !important; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; } /* All blocks and containers */ .block, .gr-box, .gr-form, .gr-panel { background: rgba(255, 255, 255, 0.05) !important; border: 1px solid rgba(255, 255, 255, 0.1) !important; border-radius: 15px !important; backdrop-filter: blur(10px) !important; padding: 1.5rem !important; margin: 0.5rem !important; } /* Text colors - ALL WHITE */ body, p, span, div, label, h1, h2, h3, h4, h5, h6 { color: white !important; } .gr-markdown, .gr-markdown * { color: white !important; background: transparent !important; } .gr-markdown h1, .gr-markdown h2, .gr-markdown h3 { color: #64b5f6 !important; border-bottom: 1px solid rgba(255, 255, 255, 0.2) !important; } /* Input fields */ .gr-textbox, .gr-textbox input, .gr-textbox textarea { background: rgba(255, 255, 255, 0.1) !important; border: 1px solid rgba(255, 255, 255, 0.3) !important; border-radius: 10px !important; color: white !important; padding: 12px !important; } .gr-textbox input::placeholder, .gr-textbox textarea::placeholder { color: rgba(255, 255, 255, 0.6) !important; } .gr-textbox input:focus, .gr-textbox textarea:focus { border-color: #64b5f6 !important; box-shadow: 0 0 10px rgba(100, 181, 246, 0.3) !important; background: rgba(255, 255, 255, 0.15) !important; } /* Buttons */ .gr-button { border-radius: 25px !important; padding: 12px 24px !important; font-weight: 600 !important; text-transform: uppercase !important; letter-spacing: 0.5px !important; transition: all 0.3s ease !important; border: none !important; color: white !important; } .gr-button-primary { background: linear-gradient(135deg, #64b5f6, #42a5f5) !important; box-shadow: 0 4px 15px rgba(100, 181, 246, 0.4) !important; } .gr-button-primary:hover { background: linear-gradient(135deg, #42a5f5, #2196f3) !important; transform: translateY(-2px) !important; box-shadow: 0 6px 20px rgba(100, 181, 246, 0.6) !important; } .gr-button-secondary { background: linear-gradient(135deg, #546e7a, #37474f) !important; box-shadow: 0 4px 15px rgba(84, 110, 122, 0.4) !important; } .gr-button-secondary:hover { background: linear-gradient(135deg, #37474f, #263238) !important; transform: translateY(-2px) !important; } /* Accordion */ .gr-accordion { background: rgba(255, 255, 255, 0.05) !important; border: 1px solid rgba(255, 255, 255, 0.1) !important; border-radius: 12px !important; } .gr-accordion summary { color: white !important; background: rgba(255, 255, 255, 0.1) !important; padding: 1rem !important; border-radius: 10px !important; } /* Feature cards */ .feature-card { background: rgba(100, 181, 246, 0.1) !important; border: 1px solid rgba(100, 181, 246, 0.3) !important; border-radius: 12px !important; padding: 1.5rem !important; margin: 1rem 0 !important; border-left: 4px solid #64b5f6 !important; backdrop-filter: blur(10px) !important; } .feature-card h3, .feature-card h4 { color: #64b5f6 !important; margin-bottom: 1rem !important; } .feature-card ul li { color: rgba(255, 255, 255, 0.9) !important; margin-bottom: 0.5rem !important; } /* Status indicators */ .status-success { background: rgba(76, 175, 80, 0.2) !important; border: 1px solid #4caf50 !important; border-left: 4px solid #4caf50 !important; color: #a5d6a7 !important; } .status-error { background: rgba(244, 67, 54, 0.2) !important; border: 1px solid #f44336 !important; border-left: 4px solid #f44336 !important; color: #ef9a9a !important; } /* Hero section */ .hero-section { background: linear-gradient(135deg, #1565c0, #1976d2, #1e88e5) !important; border-radius: 15px !important; padding: 2rem !important; margin-bottom: 2rem !important; color: white !important; box-shadow: 0 10px 30px rgba(0, 0, 0, 0.3) !important; text-align: center !important; } /* Download section */ .download-section { background: rgba(100, 181, 246, 0.1) !important; border: 1px solid rgba(100, 181, 246, 0.3) !important; border-radius: 12px !important; padding: 1.5rem !important; text-align: center !important; color: white !important; } /* Markdown content area */ .gr-markdown { background: rgba(255, 255, 255, 0.05) !important; border: 1px solid rgba(255, 255, 255, 0.1) !important; border-radius: 10px !important; padding: 1.5rem !important; max-height: 500px !important; overflow-y: auto !important; } /* Responsive design */ @media (max-width: 768px) { .gradio-container { padding: 0.5rem !important; } .block { margin: 0.25rem !important; padding: 1rem !important; } .hero-section { padding: 1rem !important; } .feature-card { padding: 1rem !important; margin: 0.5rem 0 !important; } } /* Scrollbar styling */ ::-webkit-scrollbar { width: 8px; } ::-webkit-scrollbar-track { background: rgba(255, 255, 255, 0.1); border-radius: 4px; } ::-webkit-scrollbar-thumb { background: rgba(100, 181, 246, 0.6); border-radius: 4px; } ::-webkit-scrollbar-thumb:hover { background: rgba(100, 181, 246, 0.8); } """ with gr.Blocks( title=f"{APP_NAME} | Advanced AI Research Assistant", theme=gr.themes.Base( primary_hue="blue", secondary_hue="gray", neutral_hue="slate", text_size="md", radius_size="lg", spacing_size="lg" ).set( body_background_fill="linear-gradient(135deg, #1a1a2e 0%, #16213e 50%, #0f3460 100%)", block_background_fill="rgba(255, 255, 255, 0.05)", block_border_color="rgba(255, 255, 255, 0.1)", block_radius="15px", button_primary_background_fill="linear-gradient(135deg, #64b5f6, #42a5f5)", button_primary_text_color="white", input_background_fill="rgba(255, 255, 255, 0.1)", input_border_color="rgba(255, 255, 255, 0.3)", body_text_color="white", block_label_text_color="white" ), css=dark_css ) as demo: # Hero Section with gr.Row(): with gr.Column(): gr.HTML(f"""

🔬 {APP_NAME}

{APP_DESCRIPTION}

Powered by Google Gemini AI & Advanced Web Research

""") # Features Overview with gr.Row(): with gr.Column(): gr.HTML("""

🎯 What this tool does:

""") # Simple API Key Section with gr.Row(): with gr.Column(): gr.HTML("""

� API Key Setup

Get your free Gemini API key from Google AI Studio

""") with gr.Row(): with gr.Column(scale=3): gemini_key = gr.Textbox( label="🔐 Enter your Gemini API Key", type="password", placeholder="Paste your API key here...", container=True ) with gr.Column(scale=1): validate_btn = gr.Button( "🔍 Validate", variant="secondary", size="lg" ) validation_output = gr.HTML(visible=False) # Main Research Interface gr.HTML("

🔬 Start Your Research

") with gr.Row(): with gr.Column(scale=2): research_topic = gr.Textbox( label="🎯 Research Topic", placeholder="Enter your research topic here... (e.g., 'Latest developments in quantum computing', 'Climate change solutions 2024', 'AI trends in healthcare')", lines=3, container=True ) with gr.Row(): research_btn = gr.Button( "🚀 Start Deep Research", variant="primary", size="lg", scale=2 ) with gr.Column(scale=1): gr.HTML("
") with gr.Column(scale=1): gr.HTML("""

💡 Research Tips:

📊 Research Power:
10+ sources • Topic categorization • Authoritative domains • AI synthesis
""") # Progress and Results Section with gr.Row(): with gr.Column(): progress_html = gr.HTML(visible=False) output = gr.Markdown( value="Your comprehensive research report will appear here...", label="📊 Research Report", container=True, height=400 ) # Download Section with gr.Row(): with gr.Column(): download_section = gr.HTML(visible=False) with gr.Row(): with gr.Column(): download_md_btn = gr.DownloadButton( "📝 Download Markdown", visible=False, variant="secondary", size="lg" ) with gr.Column(): download_pdf_btn = gr.DownloadButton( "📄 Download PDF Report", visible=False, variant="primary", size="lg" ) # Footer gr.HTML(f"""

🔬 {APP_NAME} {APP_VERSION} | Advanced AI Research Assistant

Powered by Google Gemini AI • Built with ❤️ for researchers worldwide

""") # Event Handlers def validate_key_handler(api_key): if not api_key: return gr.update( visible=True, value='

❌ API Key Required

Please enter your Gemini API key above.

' ) is_valid, message = validate_api_key(api_key) if is_valid: return gr.update( visible=True, value=f'

✅ API Key Valid!

{message}

You\'re ready to start researching!

' ) else: return gr.update( visible=True, value=f'

❌ API Key Issue

{message}
' ) def research_handler(topic, api_key): if not api_key.strip(): return ( "❌ Please enter and validate your Gemini API key first.", None, None, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) ) if not topic.strip(): return ( "❌ Please enter a research topic.", None, None, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) ) # Show progress progress_msg = f"""

🔄 Research in Progress...

📊 Analyzing: {topic}

⏳ This may take 1-2 minutes. Please wait...

""" return run_research(topic, api_key) # Wire up events validate_btn.click( fn=validate_key_handler, inputs=[gemini_key], outputs=[validation_output] ) research_btn.click( fn=run_research, inputs=[research_topic, gemini_key], outputs=[output, download_md_btn, download_pdf_btn, download_md_btn, download_pdf_btn] ) # Download handlers def create_md_file(content): if content and content.strip(): return content return "No content available" def get_pdf_file(pdf_path): if pdf_path and os.path.exists(pdf_path): return pdf_path return None download_md_btn.click( fn=create_md_file, inputs=[output], outputs=[download_md_btn] ) download_pdf_btn.click( fn=get_pdf_file, inputs=[download_pdf_btn], outputs=[download_pdf_btn] ) return demo # Main execution if __name__ == "__main__": demo = create_interface() demo.launch()