Spaces:
Running
Running
import gradio as gr | |
import asyncio | |
import threading | |
import queue | |
import os | |
import time | |
import json | |
from datetime import datetime | |
from modules.input_handler import validate_input | |
from modules.retriever import perform_search | |
from modules.context_enhancer import add_weather_context, add_space_weather_context | |
from modules.analyzer import analyze_with_model | |
from modules.formatter import format_output | |
from modules.citation import generate_citations, format_citations | |
from modules.server_cache import get_cached_result, cache_result | |
from modules.status_logger import log_request | |
from modules.server_monitor import ServerMonitor | |
from modules.rag.rag_chain import RAGChain | |
from modules.rag.vector_store import VectorStore | |
from langchain.docstore.document import Document | |
server_monitor = ServerMonitor() | |
rag_chain = RAGChain() | |
vector_store = VectorStore() | |
# Cat-themed greeting function | |
def get_cat_greeting(): | |
"""Generate a cat-themed greeting to test if the AI is operational""" | |
return ( | |
"Hello there! I'm a sophisticated AI research assistant, but right now I'm just a random cat preparing to make biscuits " | |
"(that's cat slang for getting ready to do something awesome!). Today is " + datetime.now().strftime("%A, %B %d, %Y") + ". " | |
"I'm purring with excitement to help you with your research questions! " | |
"Meow... what delicious knowledge shall we hunt down today? " | |
"Please ask me anything, and I'll pounce on the best information for you!" | |
) | |
# Startup check function optimized for Hugging Face endpoint | |
async def perform_startup_check(): | |
"""Perform startup checks to verify Hugging Face endpoint status""" | |
try: | |
# Check 1: Verify Hugging Face endpoint is responding | |
test_prompt = "Hello, this is a startup check. Please respond with 'OK' if you're operational." | |
# Use a short timeout for the startup check | |
stream = analyze_with_model(test_prompt) | |
response_parts = [] | |
# Collect first few chunks to verify operation | |
chunks_received = 0 | |
for chunk in stream: | |
response_parts.append(chunk) | |
chunks_received += 1 | |
if chunks_received >= 3: # Just need a few chunks to confirm operation | |
break | |
full_response = "".join(response_parts) | |
# If we got a response, server is likely operational | |
if full_response: | |
return { | |
"status": "operational", | |
"message": "β Hugging Face endpoint is operational and ready to assist!", | |
"details": f"Received response: {full_response[:50]}..." | |
} | |
else: | |
return { | |
"status": "warning", | |
"message": "β οΈ Endpoint responded but with empty content. May need attention.", | |
"details": "Endpoint connection established but no content returned." | |
} | |
except Exception as e: | |
error_msg = str(e) | |
if "503" in error_msg: | |
return { | |
"status": "initializing", | |
"message": "β³ Hugging Face endpoint is currently initializing (503 error detected)", | |
"details": "The model server is warming up. Please wait approximately 5 minutes before asking questions." | |
} | |
elif "timeout" in error_msg.lower(): | |
return { | |
"status": "timeout", | |
"message": "β° Endpoint connection timed out", | |
"details": "Connection to the Hugging Face model timed out. This may indicate server initialization." | |
} | |
else: | |
return { | |
"status": "error", | |
"message": "β Endpoint check failed", | |
"details": f"Error during startup check: {error_msg}" | |
} | |
# Thread-safe wrapper for startup check | |
class StartupCheckWrapper: | |
def __init__(self, coroutine): | |
self.coroutine = coroutine | |
self.result = None | |
self.exception = None | |
self.completed = False | |
self.thread = threading.Thread(target=self._run) | |
self.thread.daemon = True | |
self.thread.start() | |
def _run(self): | |
try: | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
self.result = loop.run_until_complete(self.coroutine) | |
except Exception as e: | |
self.exception = e | |
except Exception as e: | |
self.exception = e | |
finally: | |
self.completed = True | |
def get_result(self): | |
if not self.completed: | |
return {"status": "checking", "message": "π Performing startup checks...", "details": "Please wait while we verify system status."} | |
if self.exception: | |
return {"status": "error", "message": "β Startup check failed", "details": str(self.exception)} | |
return self.result | |
def run_startup_check(): | |
"""Run the startup check asynchronously""" | |
coroutine = perform_startup_check() | |
wrapper = StartupCheckWrapper(coroutine) | |
return wrapper | |
# Enhanced streaming with markdown support | |
async def research_assistant(query, history, use_rag=False): | |
log_request("Research started", query=query, use_rag=use_rag) | |
# Add typing indicator | |
history.append((query, "π Searching for information...")) | |
yield history | |
cached = get_cached_result(query) | |
if cached: | |
log_request("Cache hit", query=query) | |
history[-1] = (query, cached) | |
yield history | |
return | |
try: | |
validated_query = validate_input(query) | |
except ValueError as e: | |
error_msg = f"β οΈ Input Error: {str(e)}" | |
history[-1] = (query, error_msg) | |
yield history | |
return | |
# Run context enhancement and search in parallel | |
history[-1] = (query, "π Gathering context...") | |
yield history | |
# Get weather and space weather context (but don't include in prompt yet) | |
weather_task = asyncio.create_task(add_weather_context()) | |
space_weather_task = asyncio.create_task(add_space_weather_context()) | |
search_task = asyncio.create_task(asyncio.to_thread(perform_search, validated_query)) | |
weather_data = await weather_task | |
space_weather_data = await space_weather_task | |
search_results = await search_task | |
# Handle search errors | |
if isinstance(search_results, list) and len(search_results) > 0 and "error" in search_results[0]: | |
error_msg = f"π Search Error: {search_results[0]['error']}" | |
history[-1] = (query, error_msg) | |
yield history | |
return | |
# Format search content for LLM | |
search_content = "" | |
answer_content = "" | |
for result in search_results: | |
if result.get("type") == "answer": | |
answer_content = f"Direct Answer: {result['content']}\n\n" | |
elif result.get("type") == "source": | |
search_content += f"Source: {result['content']}\n\n" | |
# Only include context if it seems relevant to the query | |
context_section = "" | |
lower_query = validated_query.lower() | |
# Check if weather might be relevant | |
weather_keywords = ["weather", "temperature", "climate", "rain", "snow", "sun", "storm", "wind", "humidity"] | |
if any(keyword in lower_query for keyword in weather_keywords): | |
context_section += f"\nCurrent Weather Context: {weather_data}" | |
# Check if space weather might be relevant | |
space_keywords = ["space", "solar", "sun", "satellite", "astronomy", "cosmic", "radiation", "flare"] | |
if any(keyword in lower_query for keyword in space_keywords): | |
context_section += f"\nSpace Weather Context: {space_weather_data}" | |
# Build the enriched input | |
enriched_input = f"{validated_query}\n\n{answer_content}Search Results:\n{search_content}{context_section}" | |
# If RAG is enabled, use it | |
if use_rag: | |
history[-1] = (query, "π Searching document database...") | |
yield history | |
rag_result = rag_chain.query(validated_query) | |
if rag_result["status"] == "success": | |
enriched_input = rag_result["prompt"] | |
context_section += f"\n\nDocument Context:\n" + "\n\n".join([doc.page_content for doc in rag_result["context_docs"][:2]]) | |
server_status = server_monitor.check_server_status() | |
if not server_status["available"]: | |
wait_time = server_status["estimated_wait"] | |
response = ( | |
f"β³ **Server Initializing** β³\n\n" | |
f"The Hugging Face model server is currently starting up. This happens automatically after periods of inactivity.\n\n" | |
f"**Estimated wait time: {wait_time} minutes**\n\n" | |
f"**What you can do:**\n" | |
f"- Wait for {wait_time} minutes and try again\n" | |
f"- Try a simpler query which might process faster\n" | |
f"- Check back shortly - the server will be ready soon!\n\n" | |
f"*Technical Details: {server_status['message']}*" | |
) | |
history[-1] = (query, response) | |
yield history | |
return | |
try: | |
history[-1] = (query, "π§ Analyzing information with Hugging Face model...") | |
yield history | |
stream = analyze_with_model(enriched_input) | |
full_response = "" | |
# Buffer for smoother streaming | |
buffer = "" | |
buffer_threshold = 20 # Characters before yielding | |
for chunk in stream: | |
buffer += chunk | |
# Yield when buffer is large enough or we have a complete line | |
if len(buffer) > buffer_threshold or '\n' in buffer: | |
full_response += buffer | |
history[-1] = (query, full_response) | |
yield history | |
buffer = "" | |
# Small delay for smoother streaming | |
await asyncio.sleep(0.01) | |
# Flush remaining buffer | |
if buffer: | |
full_response += buffer | |
history[-1] = (query, full_response) | |
yield history | |
citations = generate_citations(search_results) | |
citation_text = format_citations(citations) | |
full_output = full_response + citation_text | |
cache_result(query, full_output) | |
server_monitor.report_success() | |
log_request("Research completed", result_length=len(full_output)) | |
history[-1] = (query, full_output) | |
yield history | |
except Exception as e: | |
server_monitor.report_failure() | |
error_response = f"π€ **Unexpected Error** π€\n\nAn unexpected error occurred:\n\n{str(e)}" | |
history[-1] = (query, error_response) | |
yield history | |
# Thread-safe wrapper for async generator | |
class AsyncGeneratorWrapper: | |
def __init__(self, async_gen): | |
self.async_gen = async_gen | |
self.queue = queue.Queue() | |
self.thread = threading.Thread(target=self._run) | |
self.thread.daemon = True | |
self.thread.start() | |
def _run(self): | |
try: | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
async def consume(): | |
try: | |
async for item in self.async_gen: | |
self.queue.put(("item", item)) | |
except Exception as e: | |
self.queue.put(("error", e)) | |
finally: | |
self.queue.put(("done", None)) | |
loop.run_until_complete(consume()) | |
except Exception as e: | |
self.queue.put(("error", e)) | |
finally: | |
if not self.queue.empty(): | |
_, item = self.queue.queue[-1] | |
if item != ("done", None): | |
self.queue.put(("done", None)) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
item_type, item = self.queue.get() | |
if item_type == "item": | |
return item | |
elif item_type == "error": | |
raise item | |
elif item_type == "done": | |
raise StopIteration | |
return item | |
def research_assistant_wrapper(query, history, use_rag): | |
async_gen = research_assistant(query, history, use_rag) | |
wrapper = AsyncGeneratorWrapper(async_gen) | |
return wrapper | |
# Document upload function | |
def upload_documents(files): | |
"""Upload and process documents for RAG""" | |
try: | |
documents = [] | |
for file in files: | |
# For PDF files | |
if file.name.endswith('.pdf'): | |
from PyPDF2 import PdfReader | |
reader = PdfReader(file.name) | |
text = "" | |
for page in reader.pages: | |
text += page.extract_text() | |
documents.append(Document(page_content=text, metadata={"source": file.name})) | |
# For text files | |
else: | |
with open(file.name, 'r') as f: | |
text = f.read() | |
documents.append(Document(page_content=text, metadata={"source": file.name})) | |
result = vector_store.add_documents(documents) | |
if result["status"] == "success": | |
return f"β Successfully added {result['count']} document chunks to the knowledge base!" | |
else: | |
return f"β Error adding documents: {result['message']}" | |
except Exception as e: | |
return f"β Error processing documents: {str(e)}" | |
# Performance dashboard data | |
def get_performance_stats(): | |
"""Get performance statistics from Redis""" | |
try: | |
stats = server_monitor.get_system_stats() | |
if "error" in stats: | |
return {"status": "error", "message": stats["error"]} | |
# Add more detailed stats | |
stats["current_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
stats["uptime"] = "Calculating..." | |
return stats | |
except Exception as e: | |
return {"status": "error", "message": str(e)} | |
# Global variable to store startup check result | |
startup_check_result = None | |
# Gradio Interface with all enhancements | |
with gr.Blocks( | |
theme=gr.themes.Soft(primary_hue="amber", secondary_hue="orange"), | |
title="AI Research Assistant" | |
) as demo: | |
# State management | |
chat_history = gr.State([]) | |
gr.Markdown("# π§ AI Research Assistant") | |
gr.Markdown("This advanced AI assistant combines web search with contextual awareness to answer complex questions. " | |
"It provides weather and space weather context only when relevant to your query.") | |
with gr.Tabs(): | |
with gr.TabItem("π¬ Chat"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("## System Status") | |
status_display = gr.Markdown("π Checking system status...") | |
check_btn = gr.Button("π Refresh Status") | |
gr.Markdown("## How to Use") | |
gr.Markdown(""" | |
1. Enter a research question in the input box | |
2. Toggle 'Use Document Knowledge' to enable RAG | |
3. Click Submit or press Enter | |
4. Watch as the response streams in real-time | |
5. Review sources at the end of each response | |
## Features | |
- π Web search integration | |
- π€οΈ Context-aware weather data (only when relevant) | |
- π Context-aware space weather data (only when relevant) | |
- π RAG (Retrieval-Augmented Generation) with document database | |
- β‘ Real-time streaming from Hugging Face endpoint | |
- π Real-time citations | |
""") | |
with gr.Column(scale=2): | |
chatbot = gr.Chatbot( | |
height=500, | |
label="Research Conversation", | |
latex_delimiters=[{"left": "$$", "right": "$$", "display": True}], | |
bubble_full_width=False | |
) | |
msg = gr.Textbox( | |
label="Research Question", | |
placeholder="Ask a complex research question...", | |
lines=3 | |
) | |
use_rag = gr.Checkbox( | |
label="π Use Document Knowledge (RAG)", | |
value=False, | |
info="Enable to search uploaded documents for context" | |
) | |
with gr.Row(): | |
submit_btn = gr.Button("Submit Research Query", variant="primary") | |
clear_btn = gr.Button("Clear Conversation") | |
examples = gr.Examples( | |
examples=[ | |
"What are the latest developments in quantum computing?", | |
"How does climate change affect ocean currents?", | |
"Explain the significance of the James Webb Space Telescope findings", | |
"What are the economic implications of renewable energy adoption?", | |
"How do solar flares affect satellite communications?" | |
], | |
inputs=msg, | |
label="Example Questions" | |
) | |
with gr.TabItem("π Document Management"): | |
gr.Markdown("## Upload Documents for RAG") | |
gr.Markdown("Upload PDF or text files to add them to the knowledge base for document-based queries.") | |
file_upload = gr.File( | |
file_types=[".pdf", ".txt"], | |
file_count="multiple", | |
label="Upload Documents" | |
) | |
upload_btn = gr.Button("π€ Upload Documents") | |
upload_output = gr.Textbox(label="Upload Status", interactive=False) | |
clear_docs_btn = gr.Button("ποΈ Clear All Documents") | |
gr.Markdown("## Current Documents") | |
doc_list = gr.Textbox( | |
label="Document List", | |
value="No documents uploaded yet", | |
interactive=False | |
) | |
with gr.TabItem("π Performance"): | |
perf_refresh_btn = gr.Button("π Refresh Stats") | |
perf_display = gr.JSON(label="System Statistics") | |
def update_status(): | |
"""Update the system status display""" | |
global startup_check_result | |
if startup_check_result is None: | |
startup_check_result = run_startup_check() | |
result = startup_check_result.get_result() | |
# Format status display based on result | |
if result["status"] == "operational": | |
cat_greeting = get_cat_greeting() | |
status_md = f""" | |
β **Hugging Face endpoint is operational and ready to assist!** | |
πΎ **Cat Greeting:** | |
*{cat_greeting}* | |
β **Ready for your questions!** Ask anything and I'll pounce on the best information for you. | |
""" | |
elif result["status"] == "initializing": | |
status_md = f""" | |
β³ **Hugging Face endpoint is currently initializing (503 error detected)** | |
β³ **Estimated wait time:** 5 minutes | |
While you wait, why not prepare some treats? I'll be ready to hunt for knowledge soon! | |
""" | |
elif result["status"] == "checking": | |
status_md = "π Performing startup checks..." | |
else: | |
status_md = f""" | |
β **Endpoint check failed** | |
π **Details:** {result["details"]} | |
""" | |
return status_md | |
def refresh_status(): | |
"""Refresh the startup check""" | |
global startup_check_result | |
startup_check_result = run_startup_check() | |
return update_status() | |
def respond(message, history, use_rag_flag): | |
# Get streaming response | |
for updated_history in research_assistant_wrapper(message, history, use_rag_flag): | |
yield updated_history, update_status() | |
def clear_conversation(): | |
return [], [] | |
def update_performance_stats(): | |
stats = get_performance_stats() | |
return stats | |
# Set initial status on load | |
demo.load(update_status, outputs=status_display) | |
demo.load(update_performance_stats, outputs=perf_display) | |
# Button interactions | |
check_btn.click(refresh_status, outputs=status_display) | |
submit_btn.click( | |
respond, | |
[msg, chat_history, use_rag], | |
[chatbot, status_display] | |
) | |
msg.submit( | |
respond, | |
[msg, chat_history, use_rag], | |
[chatbot, status_display] | |
) | |
clear_btn.click(clear_conversation, outputs=[chat_history, chatbot]) | |
# Document management | |
upload_btn.click(upload_documents, file_upload, upload_output) | |
clear_docs_btn.click(lambda: vector_store.delete_collection(), None, upload_output) | |
# Performance dashboard | |
perf_refresh_btn.click(update_performance_stats, outputs=perf_display) | |
if __name__ == "__main__": | |
# Print public link information to logs | |
print("===== Application Starting =====") | |
print("Creating public link for Hugging Face Space...") | |
print("Using Hugging Face Inference API endpoint for optimal performance") | |
print("Once the app launches, a public link will be available") | |
print("================================") | |
# Launch with public sharing enabled | |
demo.launch(share=True) | |