Spaces:
Runtime error
Runtime error
import gradio as gr | |
from search import search_google | |
from scraper import scrape_url | |
from summarizer import summarize_text | |
from rag import VectorStore | |
from llm import generate_answer | |
import asyncio | |
from functools import lru_cache | |
from concurrent.futures import ThreadPoolExecutor | |
# Initialize vector store | |
vs = VectorStore() | |
# Cached scraping function | |
def cached_scrape(url): | |
return scrape_url(url) | |
async def process_search_results(query): | |
"""Search and scrape in parallel""" | |
# Step 1: Search Google for URLs | |
search_results = search_google(query, num_results=5) | |
if not search_results: | |
return None, None | |
# Step 2: Scrape text from each URL in parallel | |
with ThreadPoolExecutor(max_workers=5) as executor: | |
loop = asyncio.get_running_loop() | |
scrape_tasks = [ | |
loop.run_in_executor(executor, cached_scrape, result['url']) | |
for result in search_results | |
] | |
texts = await asyncio.gather(*scrape_tasks) | |
return search_results, texts | |
async def ask_agent(question, progress=gr.Progress()): | |
progress(0.1, desc="π Searching the web...") | |
# Process search results | |
search_results, texts = await process_search_results(question) | |
if not search_results: | |
return "I couldn't find any relevant information. Please try a different question." | |
progress(0.3, desc="π Processing content...") | |
# Step 3: Summarize each text | |
with ThreadPoolExecutor(max_workers=3) as executor: | |
loop = asyncio.get_running_loop() | |
summarize_tasks = [ | |
loop.run_in_executor(executor, summarize_text, text, 100) | |
for text in texts | |
] | |
summaries = await asyncio.gather(*summarize_tasks) | |
# Step 4: Add to vector store | |
vs.add_texts(summaries) | |
progress(0.6, desc="π§ Finding relevant information...") | |
# Step 5: Retrieve top 3 most relevant texts | |
relevant_texts, indices = vs.retrieve(question, top_k=3) | |
context = "\n\n".join(relevant_texts) | |
progress(0.8, desc="π‘ Generating answer...") | |
# Step 6: Generate final answer | |
answer = generate_answer(context, question) | |
# Format response | |
response = f"### π€ Assistant\n{answer}\n\n" | |
response += "### π Sources Used in This Answer:\n" | |
# Add sources used in answer | |
for idx in indices: | |
result = search_results[idx] | |
response += f"- [{result['title']}]({result['url']})\n" | |
# Add other sources | |
other_indices = [i for i in range(len(search_results)) if i not in indices] | |
if other_indices: | |
response += "\n### π Other Useful Sources:\n" | |
for idx in other_indices: | |
result = search_results[idx] | |
response += f"- [{result['title']}]({result['url']})\n" | |
progress(1.0, desc="β Response ready") | |
return response | |
# Gradio interface with progress tracking | |
with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 800px}") as demo: | |
gr.Markdown(""" | |
<div style="text-align: center"> | |
<h1>π AI Research Assistant</h1> | |
<p>I'll search the web and summarize information for you!</p> | |
</div> | |
""") | |
chatbot = gr.Chatbot(height=400, bubble_full_width=False) | |
msg = gr.Textbox(label="Your Question", placeholder="Ask me anything...") | |
clear = gr.Button("Clear Conversation") | |
status = gr.Textbox("", label="Status", interactive=False) | |
async def respond(message, chat_history): | |
tracker = [] | |
try: | |
response = await ask_agent( | |
message, | |
progress=lambda p, d, t=tracker: tracker.append((p, d))) | |
# Update status | |
if tracker: | |
status.value = tracker[-1][1] | |
except Exception as e: | |
response = f"β οΈ Sorry, I encountered an error: {str(e)[:100]}" | |
status.value = "Error occurred" | |
chat_history.append((message, response)) | |
return "", chat_history | |
msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
clear.click(lambda: (vs.clear(), None), None, chatbot, queue=False) | |
if __name__ == "__main__": | |
demo.queue(concurrency_count=4).launch() |