|
""" |
|
GAIA Agent Implementation |
|
|
|
This module provides the main GAIA Agent class that interfaces with the app.py file. |
|
It implements a simplified version of the agent that works independently. |
|
""" |
|
|
|
import logging |
|
import time |
|
import os |
|
import json |
|
import re |
|
import traceback |
|
import hashlib |
|
from typing import Dict, Any, List, Optional, Union, Tuple, Literal, TypedDict |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger("gaia_agent") |
|
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") |
|
SERPER_API_KEY = os.getenv("SERPER_API_KEY", "") |
|
|
|
|
|
class SearchResult(TypedDict, total=False): |
|
title: str |
|
link: str |
|
snippet: str |
|
|
|
|
|
class SimpleMemory: |
|
"""Simple in-memory storage for conversation history and results""" |
|
|
|
def __init__(self): |
|
self.conversations = {} |
|
self.result_cache = {} |
|
|
|
def add_conversation(self, session_id: str, role: str, content: str): |
|
"""Add a message to the conversation history""" |
|
if session_id not in self.conversations: |
|
self.conversations[session_id] = [] |
|
|
|
self.conversations[session_id].append({ |
|
"role": role, |
|
"content": content, |
|
"timestamp": time.time() |
|
}) |
|
|
|
def get_conversation(self, session_id: str, max_messages: int = 10) -> List[Dict[str, Any]]: |
|
"""Get the conversation history for a session""" |
|
if session_id not in self.conversations: |
|
return [] |
|
|
|
|
|
return self.conversations[session_id][-max_messages:] |
|
|
|
def cache_result(self, key: str, value: Any): |
|
"""Store a result in the cache""" |
|
self.result_cache[key] = { |
|
"value": value, |
|
"timestamp": time.time() |
|
} |
|
|
|
def get_cached_result(self, key: str, max_age_seconds: int = 3600) -> Optional[Any]: |
|
"""Get a result from the cache if it exists and is not too old""" |
|
if key not in self.result_cache: |
|
return None |
|
|
|
cache_entry = self.result_cache[key] |
|
age = time.time() - cache_entry["timestamp"] |
|
|
|
if age > max_age_seconds: |
|
|
|
return None |
|
|
|
return cache_entry["value"] |
|
|
|
def clear(self, session_id: Optional[str] = None): |
|
"""Clear memory for a session or all sessions if not specified""" |
|
if session_id: |
|
if session_id in self.conversations: |
|
del self.conversations[session_id] |
|
else: |
|
self.conversations = {} |
|
self.result_cache = {} |
|
|
|
class GAIAAgent: |
|
""" |
|
GAIA (Grounded AI Assistant) agent with simplified implementation. |
|
This class provides a simplified interface for the app.py file to interact with. |
|
""" |
|
|
|
def __init__(self): |
|
"""Initialize the GAIA agent with simplified configuration""" |
|
self.memory = SimpleMemory() |
|
logger.info("GAIA Agent initialized") |
|
|
|
def __call__(self, question: str) -> str: |
|
""" |
|
Process a question and generate an answer. |
|
Compatible with the interface expected by app.py. |
|
|
|
Args: |
|
question (str): The question to process |
|
|
|
Returns: |
|
str: The answer to the question |
|
""" |
|
return self.process_question(question) |
|
|
|
def process_question(self, question: str) -> str: |
|
""" |
|
Process a question and generate an answer. |
|
|
|
Args: |
|
question (str): The question to process |
|
|
|
Returns: |
|
str: The answer to the question |
|
""" |
|
|
|
cache_key = f"question_{hashlib.md5(question.encode()).hexdigest()}" |
|
|
|
|
|
cached_answer = self.memory.get_cached_result(cache_key) |
|
if cached_answer: |
|
logger.info(f"Using cached answer for question: {question[:50]}...") |
|
return cached_answer |
|
|
|
try: |
|
|
|
if OPENAI_API_KEY: |
|
logger.info("Using OpenAI for processing") |
|
answer = self._process_with_openai(question) |
|
else: |
|
|
|
logger.warning("Using fallback mode (no OpenAI API key provided)") |
|
answer = self._fallback_processing(question) |
|
|
|
|
|
self.memory.cache_result(cache_key, answer) |
|
|
|
return answer |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing question: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
|
|
return f"I apologize, but I encountered an error while processing your question: {str(e)}" |
|
|
|
def _process_with_openai(self, question: str) -> str: |
|
"""Basic processing using OpenAI""" |
|
try: |
|
|
|
return f"This is a GAIA Agent response to: '{question}'\n\nIn a real implementation, this would use OpenAI's API to generate a more sophisticated response based on web search results and other tools." |
|
|
|
except Exception as e: |
|
logger.error(f"Error in OpenAI processing: {str(e)}") |
|
logger.error(traceback.format_exc()) |
|
return f"I encountered an error while processing your question: {str(e)}" |
|
|
|
def _fallback_processing(self, question: str) -> str: |
|
"""Simple fallback implementation when APIs are not available""" |
|
try: |
|
|
|
if "how" in question.lower(): |
|
answer = f"To address '{question.strip('?')}', I would recommend following these steps: 1) Understand the core concepts, 2) Apply a structured approach, 3) Evaluate results, and 4) Refine as needed. Without being able to access external knowledge at the moment, this is a general framework for addressing how-to questions." |
|
elif "what" in question.lower(): |
|
answer = f"Regarding '{question.strip('?')}', this typically involves understanding several key factors. While I don't have access to external knowledge at the moment, this type of question usually requires defining terms, establishing context, and examining relevant concepts." |
|
elif "why" in question.lower(): |
|
answer = f"The question '{question.strip('?')}' relates to causality and explanation. Such questions typically involve understanding underlying mechanisms, historical context, and logical relationships between factors." |
|
else: |
|
answer = f"I understand you're asking about '{question.strip('?')}'. To provide a comprehensive answer, I would need to access web search and other tools, which are currently unavailable. In a full implementation, I would search for relevant information and synthesize it into a coherent response." |
|
|
|
return answer |
|
except Exception as e: |
|
logger.error(f"Error in fallback processing: {str(e)}") |
|
return f"I apologize, but I'm currently unable to process your question due to system limitations." |
|
|
|
def query(self, question: str) -> Dict[str, Any]: |
|
""" |
|
Query the agent with a question to get an answer with metadata. |
|
|
|
Args: |
|
question (str): The question to answer |
|
|
|
Returns: |
|
Dict[str, Any]: Dictionary containing the answer and metadata |
|
""" |
|
try: |
|
|
|
start_time = time.time() |
|
|
|
|
|
answer = self.process_question(question) |
|
|
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
return { |
|
"question": question, |
|
"answer": answer, |
|
"processing_time": processing_time, |
|
"timestamp": time.time(), |
|
"status": "success" |
|
} |
|
except Exception as e: |
|
logger.error(f"Error in query: {str(e)}") |
|
return { |
|
"question": question, |
|
"answer": f"Error processing query: {str(e)}", |
|
"processing_time": time.time() - start_time, |
|
"timestamp": time.time(), |
|
"status": "error", |
|
"error": str(e) |
|
} |
|
|
|
def clear_memory(self): |
|
"""Clear the agent's memory""" |
|
self.memory.clear() |
|
logger.info("Agent memory cleared") |