import gradio as gr from search import search_google from scraper import scrape_url from summarizer import summarize_text from rag import VectorStore from llm import generate_answer import asyncio from functools import lru_cache from concurrent.futures import ThreadPoolExecutor # Initialize vector store vs = VectorStore() # Cached scraping function @lru_cache(maxsize=100) def cached_scrape(url): return scrape_url(url) async def process_search_results(query): """Search and scrape in parallel""" # Step 1: Search Google for URLs search_results = search_google(query, num_results=5) if not search_results: return None, None # Step 2: Scrape text from each URL in parallel with ThreadPoolExecutor(max_workers=5) as executor: loop = asyncio.get_running_loop() scrape_tasks = [ loop.run_in_executor(executor, cached_scrape, result['url']) for result in search_results ] texts = await asyncio.gather(*scrape_tasks) return search_results, texts async def ask_agent(question, progress=gr.Progress()): progress(0.1, desc="🔍 Searching the web...") # Process search results search_results, texts = await process_search_results(question) if not search_results: return "I couldn't find any relevant information. Please try a different question." progress(0.3, desc="📚 Processing content...") # Step 3: Summarize each text with ThreadPoolExecutor(max_workers=3) as executor: loop = asyncio.get_running_loop() summarize_tasks = [ loop.run_in_executor(executor, summarize_text, text, 100) for text in texts ] summaries = await asyncio.gather(*summarize_tasks) # Step 4: Add to vector store vs.add_texts(summaries) progress(0.6, desc="🧠 Finding relevant information...") # Step 5: Retrieve top 3 most relevant texts relevant_texts, indices = vs.retrieve(question, top_k=3) context = "\n\n".join(relevant_texts) progress(0.8, desc="💡 Generating answer...") # Step 6: Generate final answer answer = generate_answer(context, question) # Format response response = f"### 🤖 Assistant\n{answer}\n\n" response += "### 🔍 Sources Used in This Answer:\n" # Add sources used in answer for idx in indices: result = search_results[idx] response += f"- [{result['title']}]({result['url']})\n" # Add other sources other_indices = [i for i in range(len(search_results)) if i not in indices] if other_indices: response += "\n### 📚 Other Useful Sources:\n" for idx in other_indices: result = search_results[idx] response += f"- [{result['title']}]({result['url']})\n" progress(1.0, desc="✅ Response ready") return response # Gradio interface with progress tracking with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 800px}") as demo: gr.Markdown("""

🔍 AI Research Assistant

I'll search the web and summarize information for you!

""") chatbot = gr.Chatbot(height=400, bubble_full_width=False) msg = gr.Textbox(label="Your Question", placeholder="Ask me anything...") clear = gr.Button("Clear Conversation") status = gr.Textbox("", label="Status", interactive=False) async def respond(message, chat_history): tracker = [] try: response = await ask_agent( message, progress=lambda p, d, t=tracker: tracker.append((p, d))) # Update status if tracker: status.value = tracker[-1][1] except Exception as e: response = f"⚠️ Sorry, I encountered an error: {str(e)[:100]}" status.value = "Error occurred" chat_history.append((message, response)) return "", chat_history msg.submit(respond, [msg, chatbot], [msg, chatbot]) clear.click(lambda: (vs.clear(), None), None, chatbot, queue=False) if __name__ == "__main__": demo.queue(concurrency_count=4).launch()