# ResearchCopilot - Multi-Agent Research System # Track 3: Agentic Demo Showcase - Gradio MCP Hackathon 2025 import gradio as gr import asyncio import json import time import os from datetime import datetime from typing import Dict, List, Optional, Tuple from dataclasses import dataclass, asdict from enum import Enum import logging import re from abc import ABC, abstractmethod # Load environment variables from .env file # try: # from dotenv import load_dotenv # load_dotenv() # print("✅ Environment variables loaded from .env file") # except ImportError: # print("⚠️ python-dotenv not installed. Install with: pip install python-dotenv") # except Exception as e: # print(f"⚠️ Could not load .env file: {e}") # Import enhanced agents with real API integrations try: from enhanced_agents import EnhancedRetrieverAgent, EnhancedSummarizerAgent, EnhancedCitationAgent, SearchResult ENHANCED_AGENTS_AVAILABLE = True print("✅ Enhanced agents loaded successfully") except ImportError: print("❌ Enhanced agents not found - using basic agents with mock data") ENHANCED_AGENTS_AVAILABLE = False # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Debug: Check if API keys are loaded print("\n🔑 API Key Status:") print(f"Perplexity API: {'✅ Loaded' if os.getenv('PERPLEXITY_API_KEY') else '❌ Missing'}") print(f"Google API: {'✅ Loaded' if os.getenv('GOOGLE_API_KEY') else '❌ Missing'}") print(f"Google Search ID: {'✅ Loaded' if os.getenv('GOOGLE_SEARCH_ENGINE_ID') else '❌ Missing'}") print(f"Claude API: {'✅ Loaded' if os.getenv('ANTHROPIC_API_KEY') else '❌ Missing'}") print(f"OpenAI API: {'✅ Loaded (fallback)' if os.getenv('OPENAI_API_KEY') else '❌ Missing'}") print("=" * 50) class AgentStatus(Enum): IDLE = "idle" THINKING = "thinking" WORKING = "working" COMPLETED = "completed" ERROR = "error" @dataclass class ResearchTask: id: str description: str priority: int dependencies: List[str] status: str = "pending" results: Optional[Dict] = None created_at: str = None def __post_init__(self): if self.created_at is None: self.created_at = datetime.now().isoformat() @dataclass class AgentMessage: agent_id: str message: str timestamp: str status: AgentStatus data: Optional[Dict] = None class BaseAgent(ABC): def __init__(self, agent_id: str, name: str): self.agent_id = agent_id self.name = name self.status = AgentStatus.IDLE self.messages = [] def log_message(self, message: str, data: Optional[Dict] = None): msg = AgentMessage( agent_id=self.agent_id, message=message, timestamp=datetime.now().isoformat(), status=self.status, data=data ) self.messages.append(msg) logger.info(f"[{self.name}] {message}") return msg @abstractmethod async def process(self, input_data: Dict) -> Dict: pass class PlannerAgent(BaseAgent): def __init__(self): super().__init__("planner", "Research Planner") async def process(self, input_data: Dict) -> Dict: self.status = AgentStatus.THINKING query = input_data.get("query", "") self.log_message(f"Analyzing research query: {query}") await asyncio.sleep(1) # Simulate thinking time self.status = AgentStatus.WORKING # Simulate intelligent task breakdown tasks = self._create_research_plan(query) self.log_message(f"Created research plan with {len(tasks)} tasks") self.status = AgentStatus.COMPLETED return { "tasks": tasks, "strategy": self._generate_strategy(query), "estimated_time": len(tasks) * 2, "complexity": self._assess_complexity(query) } def _create_research_plan(self, query: str) -> List[ResearchTask]: # Intelligent task decomposition based on query analysis tasks = [] # Core research task tasks.append(ResearchTask( id="core_search", description=f"Primary research on: {query}", priority=1, dependencies=[] )) # If query mentions specific domains, add specialized searches if any(term in query.lower() for term in ["academic", "paper", "study", "research"]): tasks.append(ResearchTask( id="academic_search", description="Search academic databases and papers", priority=2, dependencies=["core_search"] )) # If query is about recent events, add news search if any(term in query.lower() for term in ["recent", "latest", "current", "2024", "2025"]): tasks.append(ResearchTask( id="news_search", description="Search for recent news and updates", priority=2, dependencies=["core_search"] )) # Always add background context tasks.append(ResearchTask( id="context_search", description="Gather background context and definitions", priority=3, dependencies=["core_search"] )) return tasks def _generate_strategy(self, query: str) -> str: if len(query.split()) < 5: return "Focused search strategy for specific topic" elif any(word in query.lower() for word in ["compare", "vs", "versus", "difference"]): return "Comparative analysis strategy" elif "how" in query.lower(): return "Process-oriented research strategy" else: return "Comprehensive exploratory strategy" def _assess_complexity(self, query: str) -> str: word_count = len(query.split()) if word_count < 5: return "Low" elif word_count < 10: return "Medium" else: return "High" class RetrieverAgent(BaseAgent): def __init__(self): super().__init__("retriever", "Information Retriever") self.search_apis = ["perplexity", "google", "academic"] # Use enhanced agent if available if ENHANCED_AGENTS_AVAILABLE: self.enhanced_agent = None async def process(self, input_data: Dict) -> Dict: self.status = AgentStatus.THINKING task = input_data.get("task") self.log_message(f"Processing retrieval task: {task.description}") self.status = AgentStatus.WORKING # Use enhanced agents with real APIs if available if ENHANCED_AGENTS_AVAILABLE: try: async with EnhancedRetrieverAgent() as enhanced_retriever: # Try real API search first if "academic" in task.id: sources = await enhanced_retriever.search_academic(task.description, 5) elif "news" in task.id: sources = await enhanced_retriever.search_google(f"recent news {task.description}", 5) else: # Use Perplexity for main searches sources = await enhanced_retriever.search_perplexity(task.description, 5) if not sources: # Fallback to Google sources = await enhanced_retriever.search_google(task.description, 5) if sources: self.log_message(f"Retrieved {len(sources)} sources using real APIs") self.status = AgentStatus.COMPLETED # Convert SearchResult objects to dict format results = [] for source in sources: results.append({ "title": source.title, "url": source.url, "snippet": source.snippet, "source_type": source.source_type, "relevance": source.relevance }) return { "sources": results, "search_strategy": self._get_search_strategy(task), "confidence": self._calculate_confidence(results) } except Exception as e: self.log_message(f"API search failed, using mock data: {str(e)}") # Fallback to mock data results = await self._perform_searches(task) self.log_message(f"Retrieved {len(results)} sources (mock data)") self.status = AgentStatus.COMPLETED return { "sources": results, "search_strategy": self._get_search_strategy(task), "confidence": self._calculate_confidence(results) } async def _perform_searches(self, task: ResearchTask) -> List[Dict]: # Simulate different search strategies based on task type await asyncio.sleep(2) # Simulate API call time # Mock search results with realistic structure results = [] if "academic" in task.id: results.extend([ { "title": "Academic Paper on Topic", "url": "https://arxiv.org/paper/123", "snippet": "Comprehensive study showing key findings...", "source_type": "academic", "relevance": 0.95 }, { "title": "Research Publication", "url": "https://journals.example.com/article/456", "snippet": "Peer-reviewed research demonstrating...", "source_type": "academic", "relevance": 0.88 } ]) if "news" in task.id: results.extend([ { "title": "Recent Development in Field", "url": "https://news.example.com/article/789", "snippet": "Latest updates show significant progress...", "source_type": "news", "relevance": 0.82 } ]) # Always add some general results results.extend([ { "title": "Comprehensive Overview", "url": "https://example.com/overview", "snippet": "Detailed analysis covering multiple aspects...", "source_type": "general", "relevance": 0.79 }, { "title": "Expert Analysis", "url": "https://expert.example.com/analysis", "snippet": "Professional insights and recommendations...", "source_type": "expert", "relevance": 0.85 } ]) return results def _get_search_strategy(self, task: ResearchTask) -> str: if "academic" in task.id: return "Academic database search with peer-review filter" elif "news" in task.id: return "Recent news aggregation with date filtering" else: return "Multi-source comprehensive search" def _calculate_confidence(self, results: List[Dict]) -> float: if not results: return 0.0 avg_relevance = sum(r.get("relevance", 0) for r in results) / len(results) source_diversity = len(set(r.get("source_type") for r in results)) return min(1.0, avg_relevance * 0.7 + (source_diversity / 5) * 0.3) class SummarizerAgent(BaseAgent): def __init__(self): super().__init__("summarizer", "Content Summarizer") async def process(self, input_data: Dict) -> Dict: self.status = AgentStatus.THINKING sources = input_data.get("sources", []) self.log_message(f"Summarizing {len(sources)} sources") self.status = AgentStatus.WORKING # Use enhanced agents with real APIs if available if ENHANCED_AGENTS_AVAILABLE: try: # Create enhanced summarizer (no async context manager needed) enhanced_summarizer = EnhancedSummarizerAgent() # Convert dict sources to SearchResult objects search_results = [] for source in sources: search_results.append(SearchResult( title=source.get("title", ""), url=source.get("url", ""), snippet=source.get("snippet", ""), source_type=source.get("source_type", "web"), relevance=source.get("relevance", 0.5) )) # Use synchronous call (KarmaCheck style) summary_result = enhanced_summarizer.summarize_with_claude( search_results, "Research query analysis" ) if summary_result and "summary" in summary_result: # Get the actual API used from the result api_used = summary_result.get("api_used", "AI API") self.log_message(f"Summary generated using {api_used}") self.status = AgentStatus.COMPLETED return summary_result except Exception as e: self.log_message(f"API summarization failed, using mock summary: {str(e)}") # Fallback to mock summary await asyncio.sleep(2) # Simulate processing time summary = self._generate_summary(sources) key_points = self._extract_key_points(sources) self.log_message("Summary generation completed (mock data)") self.status = AgentStatus.COMPLETED return { "summary": summary, "key_points": key_points, "word_count": len(summary.split()), "coverage_score": self._calculate_coverage(sources) } def _generate_summary(self, sources: List[Dict]) -> str: # Simulate intelligent summarization if not sources: return "No sources available for summarization." summary_parts = [] # Group sources by type academic_sources = [s for s in sources if s.get("source_type") == "academic"] news_sources = [s for s in sources if s.get("source_type") == "news"] general_sources = [s for s in sources if s.get("source_type") == "general"] if academic_sources: summary_parts.append( "Academic research indicates significant developments in this field. " "Peer-reviewed studies demonstrate consistent findings across multiple " "research groups, with high confidence in the methodological approaches used." ) if news_sources: summary_parts.append( "Recent developments show ongoing progress and public interest. " "Current trends suggest continued evolution in this area with " "practical implications for stakeholders." ) if general_sources: summary_parts.append( "Comprehensive analysis reveals multiple perspectives and approaches. " "Expert opinions converge on key principles while acknowledging " "areas that require further investigation." ) return " ".join(summary_parts) def _extract_key_points(self, sources: List[Dict]) -> List[str]: key_points = [] if any(s.get("source_type") == "academic" for s in sources): key_points.append("Peer-reviewed research supports main conclusions") if any(s.get("relevance", 0) > 0.9 for s in sources): key_points.append("High-relevance sources identified") if len(sources) > 3: key_points.append("Multiple independent sources confirm findings") key_points.extend([ "Cross-referenced information for accuracy", "Balanced perspective from diverse sources", "Current information reflects latest developments" ]) return key_points def _calculate_coverage(self, sources: List[Dict]) -> float: if not sources: return 0.0 source_types = set(s.get("source_type") for s in sources) high_relevance = sum(1 for s in sources if s.get("relevance", 0) > 0.8) coverage = (len(source_types) / 4) * 0.5 + (high_relevance / len(sources)) * 0.5 return min(1.0, coverage) class CitationAgent(BaseAgent): def __init__(self): super().__init__("citation", "Citation Generator") async def process(self, input_data: Dict) -> Dict: self.status = AgentStatus.THINKING sources = input_data.get("sources", []) self.log_message(f"Generating citations for {len(sources)} sources") self.status = AgentStatus.WORKING # Use enhanced citation agent if available if ENHANCED_AGENTS_AVAILABLE: try: enhanced_citation = EnhancedCitationAgent() # Convert dict sources to SearchResult objects search_results = [] for source in sources: search_results.append(SearchResult( title=source.get("title", ""), url=source.get("url", ""), snippet=source.get("snippet", ""), source_type=source.get("source_type", "web"), relevance=source.get("relevance", 0.5) )) citation_result = enhanced_citation.generate_citations(search_results) if citation_result: self.log_message("Citations generated with multiple formats") self.status = AgentStatus.COMPLETED return citation_result except Exception as e: self.log_message(f"Enhanced citation failed, using basic: {str(e)}") # Fallback to basic citation await asyncio.sleep(1) # Simulate processing time citations = self._generate_citations(sources) bibliography = self._create_bibliography(sources) self.log_message("Citation generation completed") self.status = AgentStatus.COMPLETED return { "citations": citations, "bibliography": bibliography, "citation_count": len(citations), "formats": ["APA", "MLA", "Chicago"] } def _generate_citations(self, sources: List[Dict]) -> List[Dict]: citations = [] for i, source in enumerate(sources, 1): citation = { "id": i, "apa": self._format_apa(source), "mla": self._format_mla(source), "chicago": self._format_chicago(source), "source": source } citations.append(citation) return citations def _format_apa(self, source: Dict) -> str: title = source.get("title", "Unknown Title") url = source.get("url", "") return f"{title}. Retrieved from {url}" def _format_mla(self, source: Dict) -> str: title = source.get("title", "Unknown Title") url = source.get("url", "") return f'"{title}." Web. {datetime.now().strftime("%d %b %Y")}. <{url}>' def _format_chicago(self, source: Dict) -> str: title = source.get("title", "Unknown Title") url = source.get("url", "") return f'"{title}." Accessed {datetime.now().strftime("%B %d, %Y")}. {url}.' def _create_bibliography(self, sources: List[Dict]) -> str: if not sources: return "No sources to cite." bib_entries = [] for source in sources: bib_entries.append(self._format_apa(source)) return "\n\n".join(bib_entries) class ResearchOrchestrator: def __init__(self): self.planner = PlannerAgent() self.retriever = RetrieverAgent() self.summarizer = SummarizerAgent() self.citation_gen = CitationAgent() self.research_state = {} self.activity_log = [] async def conduct_research(self, query: str, progress_callback=None) -> Dict: """Main research orchestration method""" self.activity_log = [] self.research_state = {"query": query, "start_time": datetime.now().isoformat()} try: # Step 1: Planning if progress_callback: progress_callback("🎯 Planning research approach...", 10) plan_result = await self.planner.process({"query": query}) self.research_state["plan"] = plan_result self._log_activity("Planning completed", self.planner.messages[-1]) # Step 2: Information Retrieval if progress_callback: progress_callback("🔍 Gathering information...", 30) all_sources = [] tasks = plan_result["tasks"] for i, task in enumerate(tasks): if progress_callback: progress_callback(f"🔍 Processing: {task.description}", 30 + (i * 20)) retrieval_result = await self.retriever.process({"task": task}) all_sources.extend(retrieval_result["sources"]) self._log_activity(f"Retrieved sources for: {task.description}", self.retriever.messages[-1]) self.research_state["sources"] = all_sources # Step 3: Summarization if progress_callback: progress_callback("📝 Analyzing and summarizing...", 70) summary_result = await self.summarizer.process({"sources": all_sources}) self.research_state["summary"] = summary_result self._log_activity("Summarization completed", self.summarizer.messages[-1]) # Step 4: Citation Generation if progress_callback: progress_callback("📚 Generating citations...", 90) citation_result = await self.citation_gen.process({"sources": all_sources}) self.research_state["citations"] = citation_result self._log_activity("Citations generated", self.citation_gen.messages[-1]) if progress_callback: progress_callback("✅ Research completed!", 100) self.research_state["completion_time"] = datetime.now().isoformat() self.research_state["status"] = "completed" return self.research_state except Exception as e: logger.error(f"Research failed: {str(e)}") self.research_state["status"] = "error" self.research_state["error"] = str(e) return self.research_state def _log_activity(self, description: str, agent_message: AgentMessage): activity = { "timestamp": datetime.now().isoformat(), "description": description, "agent": agent_message.agent_id, "details": agent_message.message } self.activity_log.append(activity) def get_activity_log(self) -> List[Dict]: return self.activity_log # Global orchestrator instance orchestrator = ResearchOrchestrator() def format_research_results(research_state: Dict) -> Tuple[str, str, str, str]: """Format research results for Gradio display""" if research_state.get("status") == "error": error_msg = f"❌ Research failed: {research_state.get('error', 'Unknown error')}" return error_msg, "", "", "" if research_state.get("status") != "completed": return "Research in progress...", "", "", "" # Format summary summary_data = research_state.get("summary", {}) summary_text = f"""# Research Summary {summary_data.get('summary', 'No summary available')} ## Key Findings """ for point in summary_data.get('key_points', []): summary_text += f"• {point}\n" summary_text += f""" ## Research Metrics - Sources analyzed: {len(research_state.get('sources', []))} - Summary length: {summary_data.get('word_count', 0)} words - Coverage score: {summary_data.get('coverage_score', 0):.2f} """ # Format sources sources = research_state.get("sources", []) sources_text = "# Sources Found\n\n" for i, source in enumerate(sources, 1): sources_text += f"""## {i}. {source.get('title', 'Unknown Title')} **URL:** {source.get('url', 'N/A')} **Type:** {source.get('source_type', 'Unknown')} **Relevance:** {source.get('relevance', 0):.2f} **Summary:** {source.get('snippet', 'No summary available')} --- """ # Format citations citations_data = research_state.get("citations", {}) citations_text = "" # Check if we have citations data if citations_data and isinstance(citations_data, dict): bibliography = citations_data.get('bibliography') if bibliography and bibliography.strip(): citations_text += bibliography else: # Fallback: create bibliography from sources if citations failed sources = research_state.get("sources", []) if sources: citations_text += "## Sources Referenced:\n\n" for i, source in enumerate(sources, 1): title = source.get("title", "Unknown Title") url = source.get("url", "") source_type = source.get("source_type", "web") citations_text += f"**[{i}]** {title} \n" citations_text += f"*Source:* {source_type.title()} \n" citations_text += f"*URL:* {url} \n\n" else: citations_text += "No sources available for citation." else: # Create citations from sources directly sources = research_state.get("sources", []) if sources: citations_text += "## Research Sources:\n\n" for i, source in enumerate(sources, 1): title = source.get("title", "Unknown Title") url = source.get("url", "") source_type = source.get("source_type", "web") relevance = source.get("relevance", 0) citations_text += f"**{i}.** {title} \n" citations_text += f"**Type:** {source_type.title()} | **Relevance:** {relevance:.2f} \n" citations_text += f"**URL:** {url} \n\n" else: citations_text += "No sources available for citation." # Format activity log activity_text = "# Research Process Log\n\n" for activity in orchestrator.get_activity_log(): timestamp = datetime.fromisoformat(activity['timestamp']).strftime("%H:%M:%S") activity_text += f"**{timestamp}** - {activity['description']}\n" activity_text += f"*{activity['details']}*\n\n" return summary_text, sources_text, citations_text, activity_text async def conduct_research_async(query: str, progress=gr.Progress()) -> Tuple[str, str, str, str]: """Async wrapper for research with progress updates""" def update_progress(message: str, percent: int): progress(percent/100, desc=message) research_result = await orchestrator.conduct_research(query, update_progress) return format_research_results(research_result) def conduct_research_sync(query: str, progress=gr.Progress()) -> Tuple[str, str, str, str]: """Synchronous wrapper for Gradio""" if not query.strip(): return "Please enter a research query.", "", "", "" # Run async function in event loop try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(conduct_research_async(query, progress)) def create_interface(): """Create the Gradio interface""" with gr.Blocks( title="ResearchCopilot - Multi-Agent Research System", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 1200px !important; margin: 0 auto !important; } .research-header { text-align: center; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 2rem; border-radius: 10px; margin-bottom: 2rem; } .agent-status { background: #ffffff !important; border: 2px solid #e0e0e0; border-radius: 8px; padding: 1.5rem; margin: 1rem 0; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } .agent-status h3 { color: #2c3e50 !important; margin-bottom: 1rem; font-size: 1.2rem; } .agent-status ul { color: #2c3e50 !important; list-style-type: none; padding-left: 0; } .agent-status li { color: #2c3e50 !important; margin-bottom: 0.8rem; padding: 0.5rem; background: #f8f9fa; border-radius: 4px; border-left: 4px solid #667eea; } .agent-status strong { color: #667eea !important; } """ ) as interface: # Header gr.HTML("""

🤖 ResearchCopilot

Multi-Agent Research System

Powered by AI agents working together to conduct comprehensive research

Track 3: Agentic Demo Showcase - Gradio MCP Hackathon 2025

""") # Agent Status Overview with gr.Row(): gr.HTML("""

🎯 Research Agents

""") # Main Interface with gr.Row(): with gr.Column(scale=1): query_input = gr.Textbox( label="Research Query", placeholder="Enter your research question (e.g., 'Latest developments in quantum computing for drug discovery')", lines=3 ) research_btn = gr.Button( "🚀 Start Research", variant="primary", size="lg" ) gr.Examples( examples=[ "Impact of artificial intelligence on healthcare diagnostics", "Sustainable energy solutions for urban environments", "Recent advances in quantum computing applications", "Climate change effects on global food security", "Blockchain technology in supply chain management" ], inputs=query_input, label="Example Research Queries" ) # Results Display with gr.Row(): with gr.Column(): with gr.Tabs(): with gr.TabItem("📊 Summary"): summary_output = gr.Markdown( label="Research Summary", value="Enter a research query and click 'Start Research' to begin." ) with gr.TabItem("📚 Sources"): sources_output = gr.Markdown( label="Sources Found", value="Sources will appear here after research is completed." ) with gr.TabItem("📖 Citations"): citations_output = gr.Markdown( label="Citations & Bibliography", value="Citations will be generated automatically." ) with gr.TabItem("🔍 Process Log"): activity_output = gr.Markdown( label="Agent Activity Log", value="Research process will be logged here." ) # Event Handlers research_btn.click( fn=conduct_research_sync, inputs=[query_input], outputs=[summary_output, sources_output, citations_output, activity_output], show_progress=True ) # Footer gr.HTML("""

ResearchCopilot - Demonstrating multi-agent AI collaboration for research tasks

Built for the Gradio Agents & MCP Hackathon 2025 - Track 3: Agentic Demo Showcase

Built with ❤️ using Gradio, Modal, Perplexity API, Claude API, and Multi-Agent Architecture.

""") return interface # Launch the application if __name__ == "__main__": app = create_interface() app.launch( share=False, # Creates public URL for sharing server_name="0.0.0.0", # Localhost access server_port=7860, show_error=True, inbrowser=True # Automatically opens browser )