VayuChat / src.py
AbhayVG's picture
Update src.py
0a856ce verified
raw
history blame
20.3 kB
import os
import pandas as pd
from pandasai import Agent, SmartDataframe
from typing import Tuple
from PIL import Image
from pandasai.llm import HuggingFaceTextGen
from dotenv import load_dotenv
from langchain_groq import ChatGroq
from langchain_google_genai import ChatGoogleGenerativeAI
import matplotlib.pyplot as plt
import json
from datetime import datetime
from huggingface_hub import HfApi
import uuid
# FORCE reload environment variables
load_dotenv(override=True)
# Get API keys with explicit None handling and debugging
Groq_Token = os.getenv("GROQ_API_KEY")
hf_token = os.getenv("HF_TOKEN")
gemini_token = os.getenv("GEMINI_TOKEN")
# Debug print (remove in production)
print(f"Debug - Groq Token: {'Present' if Groq_Token else 'Missing'}")
print(f"Debug - Groq Token Value: {Groq_Token[:10] + '...' if Groq_Token else 'None'}")
print(f"Debug - Gemini Token: {'Present' if gemini_token else 'Missing'}")
models = {
"mistral": "mistral-saba-24b",
"llama3.3": "llama-3.3-70b-versatile",
"llama3.1": "llama-3.1-8b-instant",
"gemma2": "gemma2-9b-it",
"gemini-pro": "gemini-1.5-pro"
}
def log_interaction(user_query, model_name, response_content, generated_code, execution_time, error_message=None, is_image=False):
"""Log user interactions to Hugging Face dataset"""
try:
if not hf_token or hf_token.strip() == "":
print("Warning: HF_TOKEN not available, skipping logging")
return
# Create log entry
log_entry = {
"timestamp": datetime.now().isoformat(),
"session_id": str(uuid.uuid4()),
"user_query": user_query,
"model_name": model_name,
"response_content": str(response_content),
"generated_code": generated_code or "",
"execution_time_seconds": execution_time,
"error_message": error_message or "",
"is_image_output": is_image,
"success": error_message is None
}
# Create DataFrame
df = pd.DataFrame([log_entry])
# Create unique filename with timestamp
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
random_id = str(uuid.uuid4())[:8]
filename = f"interaction_log_{timestamp_str}_{random_id}.parquet"
# Save locally first
local_path = f"/tmp/{filename}"
df.to_parquet(local_path, index=False)
# Upload to Hugging Face
api = HfApi(token=hf_token)
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=f"data/{filename}",
repo_id="SustainabilityLabIITGN/VayuBuddy_logs",
repo_type="dataset",
)
# Clean up local file
if os.path.exists(local_path):
os.remove(local_path)
print(f"Successfully logged interaction to HuggingFace: {filename}")
except Exception as e:
print(f"Error logging interaction: {e}")
def preprocess_and_load_df(path: str) -> pd.DataFrame:
"""Load and preprocess the dataframe"""
try:
df = pd.read_csv(path)
df["Timestamp"] = pd.to_datetime(df["Timestamp"])
return df
except Exception as e:
raise Exception(f"Error loading dataframe: {e}")
def load_agent(df: pd.DataFrame, context: str, inference_server: str, name="mistral") -> Agent:
"""Load pandas AI agent with error handling"""
try:
if name == "gemini-pro":
if not gemini_token or gemini_token.strip() == "":
raise ValueError("Gemini API token not available or empty")
llm = ChatGoogleGenerativeAI(
model=models[name],
google_api_key=gemini_token,
temperature=0.1
)
else:
if not Groq_Token or Groq_Token.strip() == "":
raise ValueError("Groq API token not available or empty")
llm = ChatGroq(
model=models[name],
api_key=Groq_Token,
temperature=0.1
)
agent = Agent(df, config={"llm": llm, "enable_cache": False, "options": {"wait_for_model": True}})
if context:
agent.add_message(context)
return agent
except Exception as e:
raise Exception(f"Error loading agent: {e}")
def load_smart_df(df: pd.DataFrame, inference_server: str, name="mistral") -> SmartDataframe:
"""Load smart dataframe with error handling"""
try:
if name == "gemini-pro":
if not gemini_token or gemini_token.strip() == "":
raise ValueError("Gemini API token not available or empty")
llm = ChatGoogleGenerativeAI(
model=models[name],
google_api_key=gemini_token,
temperature=0.1
)
else:
if not Groq_Token or Groq_Token.strip() == "":
raise ValueError("Groq API token not available or empty")
llm = ChatGroq(
model=models[name],
api_key=Groq_Token,
temperature=0.1
)
df = SmartDataframe(df, config={"llm": llm, "max_retries": 5, "enable_cache": False})
return df
except Exception as e:
raise Exception(f"Error loading smart dataframe: {e}")
def get_from_user(prompt):
"""Format user prompt"""
return {"role": "user", "content": prompt}
def ask_agent(agent: Agent, prompt: str) -> dict:
"""Ask agent with comprehensive error handling"""
start_time = datetime.now()
try:
response = agent.chat(prompt)
execution_time = (datetime.now() - start_time).total_seconds()
gen_code = getattr(agent, 'last_code_generated', '')
ex_code = getattr(agent, 'last_code_executed', '')
last_prompt = getattr(agent, 'last_prompt', prompt)
# Log the interaction
log_interaction(
user_query=prompt,
model_name="pandas_ai_agent",
response_content=response,
generated_code=gen_code,
execution_time=execution_time,
error_message=None,
is_image=isinstance(response, str) and any(response.endswith(ext) for ext in ['.png', '.jpg', '.jpeg'])
)
return {
"role": "assistant",
"content": response,
"gen_code": gen_code,
"ex_code": ex_code,
"last_prompt": last_prompt,
"error": None
}
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(e)
# Log the failed interaction
log_interaction(
user_query=prompt,
model_name="pandas_ai_agent",
response_content=f"Error: {error_msg}",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": f"Error: {error_msg}",
"gen_code": "",
"ex_code": "",
"last_prompt": prompt,
"error": error_msg
}
def decorate_with_code(response: dict) -> str:
"""Decorate response with code details"""
gen_code = response.get("gen_code", "No code generated")
last_prompt = response.get("last_prompt", "No prompt")
return f"""<details>
<summary>Generated Code</summary>
```python
{gen_code}
```
</details>
<details>
<summary>Prompt</summary>
{last_prompt}
"""
def show_response(st, response):
"""Display response with error handling"""
try:
with st.chat_message(response["role"]):
content = response.get("content", "No content")
try:
# Try to open as image
image = Image.open(content)
if response.get("gen_code"):
st.markdown(decorate_with_code(response), unsafe_allow_html=True)
st.image(image)
return {"is_image": True}
except:
# Not an image, display as text
if response.get("gen_code"):
display_content = decorate_with_code(response) + f"""</details>
{content}"""
else:
display_content = content
st.markdown(display_content, unsafe_allow_html=True)
return {"is_image": False}
except Exception as e:
st.error(f"Error displaying response: {e}")
return {"is_image": False}
def ask_question(model_name, question):
"""Ask question with comprehensive error handling and logging"""
start_time = datetime.now()
try:
# Reload environment variables to get fresh values
load_dotenv(override=True)
fresh_groq_token = os.getenv("GROQ_API_KEY")
fresh_gemini_token = os.getenv("GEMINI_TOKEN")
print(f"ask_question - Fresh Groq Token: {'Present' if fresh_groq_token else 'Missing'}")
# Check API availability with fresh tokens
if model_name == "gemini-pro":
if not fresh_gemini_token or fresh_gemini_token.strip() == "":
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Missing or empty API token"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="❌ Gemini API token not available or empty",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "❌ Gemini API token not available or empty. Please set GEMINI_TOKEN in your environment variables.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
llm = ChatGoogleGenerativeAI(
model=models[model_name],
google_api_key=fresh_gemini_token,
temperature=0
)
else:
if not fresh_groq_token or fresh_groq_token.strip() == "":
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Missing or empty API token"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="❌ Groq API token not available or empty",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "❌ Groq API token not available or empty. Please set GROQ_API_KEY in your environment variables and restart the application.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
# Test the API key by trying to create the client
try:
llm = ChatGroq(
model=models[model_name],
api_key=fresh_groq_token,
temperature=0.1
)
# Test with a simple call to verify the API key works
test_response = llm.invoke("Test")
print("API key test successful")
except Exception as api_error:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(api_error)
if "organization_restricted" in error_msg.lower() or "unauthorized" in error_msg.lower():
response_content = "❌ API Key Error: Your Groq API key appears to be invalid, expired, or restricted. Please check your API key in the .env file."
log_error_msg = f"API key validation failed: {error_msg}"
else:
response_content = f"❌ API Connection Error: {error_msg}"
log_error_msg = error_msg
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=response_content,
generated_code="",
execution_time=execution_time,
error_message=log_error_msg,
is_image=False
)
return {
"role": "assistant",
"content": response_content,
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": log_error_msg
}
# Check if data file exists
if not os.path.exists("Data.csv"):
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Data file not found"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="❌ Data.csv file not found",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "❌ Data.csv file not found. Please ensure the data file is in the correct location.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
df_check = pd.read_csv("Data.csv")
df_check["Timestamp"] = pd.to_datetime(df_check["Timestamp"])
df_check = df_check.head(5)
new_line = "\n"
parameters = {"font.size": 12, "figure.dpi": 600}
template = f"""```python
import pandas as pd
import matplotlib.pyplot as plt
import uuid
plt.rcParams.update({parameters})
df = pd.read_csv("Data.csv")
df["Timestamp"] = pd.to_datetime(df["Timestamp"])
# Available columns and data types:
{new_line.join(map(lambda x: '# '+x, str(df_check.dtypes).split(new_line)))}
# Question: {question.strip()}
# Generate code to answer the question and save result in 'answer' variable
# If creating a plot, save it with a unique filename and store the filename in 'answer'
# If returning text/numbers, store the result directly in 'answer'
```"""
system_prompt = """You are a helpful assistant that generates Python code for data analysis.
Rules:
1. Always save your final result in a variable called 'answer'
2. If creating a plot, save it with plt.savefig() and store the filename in 'answer'
3. If returning text/numbers, store the result directly in 'answer'
4. Use descriptive variable names and add comments
5. Handle potential errors gracefully
6. For plots, use unique filenames to avoid conflicts
"""
query = f"""{system_prompt}
Complete the following code to answer the user's question:
{template}
"""
# Make API call
if model_name == "gemini-pro":
response = llm.invoke(query)
answer = response.content
else:
response = llm.invoke(query)
answer = response.content
# Extract and execute code
try:
if "```python" in answer:
code_part = answer.split("```python")[1].split("```")[0]
else:
code_part = answer
full_code = f"""
{template.split("```python")[1].split("```")[0]}
{code_part}
"""
# Execute code in a controlled environment
local_vars = {}
global_vars = {
'pd': pd,
'plt': plt,
'os': os,
'uuid': __import__('uuid')
}
exec(full_code, global_vars, local_vars)
# Get the answer
if 'answer' in local_vars:
answer_result = local_vars['answer']
else:
answer_result = "No answer variable found in generated code"
execution_time = (datetime.now() - start_time).total_seconds()
# Determine if output is an image
is_image = isinstance(answer_result, str) and any(answer_result.endswith(ext) for ext in ['.png', '.jpg', '.jpeg'])
# Log successful interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=str(answer_result),
generated_code=full_code,
execution_time=execution_time,
error_message=None,
is_image=is_image
)
return {
"role": "assistant",
"content": answer_result,
"gen_code": full_code,
"ex_code": full_code,
"last_prompt": question,
"error": None
}
except Exception as code_error:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(code_error)
# Log the failed code execution
log_interaction(
user_query=question,
model_name=model_name,
response_content=f"❌ Error executing generated code: {error_msg}",
generated_code=full_code if 'full_code' in locals() else "",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": f"❌ Error executing generated code: {error_msg}",
"gen_code": full_code if 'full_code' in locals() else "",
"ex_code": full_code if 'full_code' in locals() else "",
"last_prompt": question,
"error": error_msg
}
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(e)
# Handle specific API errors
if "organization_restricted" in error_msg:
response_content = "❌ API Organization Restricted: Your API key access has been restricted. Please check your Groq API key or try generating a new one."
log_error_msg = "API access restricted"
elif "rate_limit" in error_msg.lower():
response_content = "❌ Rate limit exceeded. Please wait a moment and try again."
log_error_msg = "Rate limit exceeded"
else:
response_content = f"❌ Error: {error_msg}"
log_error_msg = error_msg
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=response_content,
generated_code="",
execution_time=execution_time,
error_message=log_error_msg,
is_image=False
)
return {
"role": "assistant",
"content": response_content,
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": log_error_msg
}