import os
import pandas as pd
from pandasai import Agent, SmartDataframe
from typing import Tuple
from PIL import Image
from pandasai.llm import HuggingFaceTextGen
from dotenv import load_dotenv
from langchain_groq import ChatGroq
from langchain_google_genai import ChatGoogleGenerativeAI
import matplotlib.pyplot as plt
import json
from datetime import datetime
from huggingface_hub import HfApi
import uuid
# FORCE reload environment variables
load_dotenv(override=True)
# Get API keys with explicit None handling and debugging
Groq_Token = os.getenv("GROQ_API_KEY")
hf_token = os.getenv("HF_TOKEN")
gemini_token = os.getenv("GEMINI_TOKEN")
# Debug print (remove in production)
print(f"Debug - Groq Token: {'Present' if Groq_Token else 'Missing'}")
print(f"Debug - Groq Token Value: {Groq_Token[:10] + '...' if Groq_Token else 'None'}")
print(f"Debug - Gemini Token: {'Present' if gemini_token else 'Missing'}")
models = {
"mistral": "mistral-saba-24b",
"llama3.3": "llama-3.3-70b-versatile",
"llama3.1": "llama-3.1-8b-instant",
"gemma2": "gemma2-9b-it",
"gemini-pro": "gemini-1.5-pro"
}
def log_interaction(user_query, model_name, response_content, generated_code, execution_time, error_message=None, is_image=False):
"""Log user interactions to Hugging Face dataset"""
try:
if not hf_token or hf_token.strip() == "":
print("Warning: HF_TOKEN not available, skipping logging")
return
# Create log entry
log_entry = {
"timestamp": datetime.now().isoformat(),
"session_id": str(uuid.uuid4()),
"user_query": user_query,
"model_name": model_name,
"response_content": str(response_content),
"generated_code": generated_code or "",
"execution_time_seconds": execution_time,
"error_message": error_message or "",
"is_image_output": is_image,
"success": error_message is None
}
# Create DataFrame
df = pd.DataFrame([log_entry])
# Create unique filename with timestamp
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
random_id = str(uuid.uuid4())[:8]
filename = f"interaction_log_{timestamp_str}_{random_id}.parquet"
# Save locally first
local_path = f"/tmp/{filename}"
df.to_parquet(local_path, index=False)
# Upload to Hugging Face
api = HfApi(token=hf_token)
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=f"data/{filename}",
repo_id="SustainabilityLabIITGN/VayuBuddy_logs",
repo_type="dataset",
)
# Clean up local file
if os.path.exists(local_path):
os.remove(local_path)
print(f"Successfully logged interaction to HuggingFace: {filename}")
except Exception as e:
print(f"Error logging interaction: {e}")
def preprocess_and_load_df(path: str) -> pd.DataFrame:
"""Load and preprocess the dataframe"""
try:
df = pd.read_csv(path)
df["Timestamp"] = pd.to_datetime(df["Timestamp"])
return df
except Exception as e:
raise Exception(f"Error loading dataframe: {e}")
def load_agent(df: pd.DataFrame, context: str, inference_server: str, name="mistral") -> Agent:
"""Load pandas AI agent with error handling"""
try:
if name == "gemini-pro":
if not gemini_token or gemini_token.strip() == "":
raise ValueError("Gemini API token not available or empty")
llm = ChatGoogleGenerativeAI(
model=models[name],
google_api_key=gemini_token,
temperature=0.1
)
else:
if not Groq_Token or Groq_Token.strip() == "":
raise ValueError("Groq API token not available or empty")
llm = ChatGroq(
model=models[name],
api_key=Groq_Token,
temperature=0.1
)
agent = Agent(df, config={"llm": llm, "enable_cache": False, "options": {"wait_for_model": True}})
if context:
agent.add_message(context)
return agent
except Exception as e:
raise Exception(f"Error loading agent: {e}")
def load_smart_df(df: pd.DataFrame, inference_server: str, name="mistral") -> SmartDataframe:
"""Load smart dataframe with error handling"""
try:
if name == "gemini-pro":
if not gemini_token or gemini_token.strip() == "":
raise ValueError("Gemini API token not available or empty")
llm = ChatGoogleGenerativeAI(
model=models[name],
google_api_key=gemini_token,
temperature=0.1
)
else:
if not Groq_Token or Groq_Token.strip() == "":
raise ValueError("Groq API token not available or empty")
llm = ChatGroq(
model=models[name],
api_key=Groq_Token,
temperature=0.1
)
df = SmartDataframe(df, config={"llm": llm, "max_retries": 5, "enable_cache": False})
return df
except Exception as e:
raise Exception(f"Error loading smart dataframe: {e}")
def get_from_user(prompt):
"""Format user prompt"""
return {"role": "user", "content": prompt}
def ask_agent(agent: Agent, prompt: str) -> dict:
"""Ask agent with comprehensive error handling"""
start_time = datetime.now()
try:
response = agent.chat(prompt)
execution_time = (datetime.now() - start_time).total_seconds()
gen_code = getattr(agent, 'last_code_generated', '')
ex_code = getattr(agent, 'last_code_executed', '')
last_prompt = getattr(agent, 'last_prompt', prompt)
# Log the interaction
log_interaction(
user_query=prompt,
model_name="pandas_ai_agent",
response_content=response,
generated_code=gen_code,
execution_time=execution_time,
error_message=None,
is_image=isinstance(response, str) and any(response.endswith(ext) for ext in ['.png', '.jpg', '.jpeg'])
)
return {
"role": "assistant",
"content": response,
"gen_code": gen_code,
"ex_code": ex_code,
"last_prompt": last_prompt,
"error": None
}
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(e)
# Log the failed interaction
log_interaction(
user_query=prompt,
model_name="pandas_ai_agent",
response_content=f"Error: {error_msg}",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": f"Error: {error_msg}",
"gen_code": "",
"ex_code": "",
"last_prompt": prompt,
"error": error_msg
}
def decorate_with_code(response: dict) -> str:
"""Decorate response with code details"""
gen_code = response.get("gen_code", "No code generated")
last_prompt = response.get("last_prompt", "No prompt")
return f"""
Generated Code
```python
{gen_code}
```
Prompt
{last_prompt}
"""
def show_response(st, response):
"""Display response with error handling"""
try:
with st.chat_message(response["role"]):
content = response.get("content", "No content")
try:
# Try to open as image
image = Image.open(content)
if response.get("gen_code"):
st.markdown(decorate_with_code(response), unsafe_allow_html=True)
st.image(image)
return {"is_image": True}
except:
# Not an image, display as text
if response.get("gen_code"):
display_content = decorate_with_code(response) + f"""
{content}"""
else:
display_content = content
st.markdown(display_content, unsafe_allow_html=True)
return {"is_image": False}
except Exception as e:
st.error(f"Error displaying response: {e}")
return {"is_image": False}
def ask_question(model_name, question):
"""Ask question with comprehensive error handling and logging"""
start_time = datetime.now()
try:
# Reload environment variables to get fresh values
load_dotenv(override=True)
fresh_groq_token = os.getenv("GROQ_API_KEY")
fresh_gemini_token = os.getenv("GEMINI_TOKEN")
print(f"ask_question - Fresh Groq Token: {'Present' if fresh_groq_token else 'Missing'}")
# Check API availability with fresh tokens
if model_name == "gemini-pro":
if not fresh_gemini_token or fresh_gemini_token.strip() == "":
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Missing or empty API token"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="❌ Gemini API token not available or empty",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "❌ Gemini API token not available or empty. Please set GEMINI_TOKEN in your environment variables.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
llm = ChatGoogleGenerativeAI(
model=models[model_name],
google_api_key=fresh_gemini_token,
temperature=0
)
else:
if not fresh_groq_token or fresh_groq_token.strip() == "":
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Missing or empty API token"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="❌ Groq API token not available or empty",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "❌ Groq API token not available or empty. Please set GROQ_API_KEY in your environment variables and restart the application.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
# Test the API key by trying to create the client
try:
llm = ChatGroq(
model=models[model_name],
api_key=fresh_groq_token,
temperature=0.1
)
# Test with a simple call to verify the API key works
test_response = llm.invoke("Test")
print("API key test successful")
except Exception as api_error:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(api_error)
if "organization_restricted" in error_msg.lower() or "unauthorized" in error_msg.lower():
response_content = "❌ API Key Error: Your Groq API key appears to be invalid, expired, or restricted. Please check your API key in the .env file."
log_error_msg = f"API key validation failed: {error_msg}"
else:
response_content = f"❌ API Connection Error: {error_msg}"
log_error_msg = error_msg
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=response_content,
generated_code="",
execution_time=execution_time,
error_message=log_error_msg,
is_image=False
)
return {
"role": "assistant",
"content": response_content,
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": log_error_msg
}
# Check if data file exists
if not os.path.exists("Data.csv"):
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Data file not found"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="❌ Data.csv file not found",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "❌ Data.csv file not found. Please ensure the data file is in the correct location.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
df_check = pd.read_csv("Data.csv")
df_check["Timestamp"] = pd.to_datetime(df_check["Timestamp"])
df_check = df_check.head(5)
new_line = "\n"
parameters = {"font.size": 12, "figure.dpi": 600}
template = f"""```python
import pandas as pd
import matplotlib.pyplot as plt
import uuid
plt.rcParams.update({parameters})
df = pd.read_csv("Data.csv")
df["Timestamp"] = pd.to_datetime(df["Timestamp"])
# Available columns and data types:
{new_line.join(map(lambda x: '# '+x, str(df_check.dtypes).split(new_line)))}
# Question: {question.strip()}
# Generate code to answer the question and save result in 'answer' variable
# If creating a plot, save it with a unique filename and store the filename in 'answer'
# If returning text/numbers, store the result directly in 'answer'
```"""
system_prompt = """You are a helpful assistant that generates Python code for data analysis.
Rules:
1. Always save your final result in a variable called 'answer'
2. If creating a plot, save it with plt.savefig() and store the filename in 'answer'
3. If returning text/numbers, store the result directly in 'answer'
4. Use descriptive variable names and add comments
5. Handle potential errors gracefully
6. For plots, use unique filenames to avoid conflicts
"""
query = f"""{system_prompt}
Complete the following code to answer the user's question:
{template}
"""
# Make API call
if model_name == "gemini-pro":
response = llm.invoke(query)
answer = response.content
else:
response = llm.invoke(query)
answer = response.content
# Extract and execute code
try:
if "```python" in answer:
code_part = answer.split("```python")[1].split("```")[0]
else:
code_part = answer
full_code = f"""
{template.split("```python")[1].split("```")[0]}
{code_part}
"""
# Execute code in a controlled environment
local_vars = {}
global_vars = {
'pd': pd,
'plt': plt,
'os': os,
'uuid': __import__('uuid')
}
exec(full_code, global_vars, local_vars)
# Get the answer
if 'answer' in local_vars:
answer_result = local_vars['answer']
else:
answer_result = "No answer variable found in generated code"
execution_time = (datetime.now() - start_time).total_seconds()
# Determine if output is an image
is_image = isinstance(answer_result, str) and any(answer_result.endswith(ext) for ext in ['.png', '.jpg', '.jpeg'])
# Log successful interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=str(answer_result),
generated_code=full_code,
execution_time=execution_time,
error_message=None,
is_image=is_image
)
return {
"role": "assistant",
"content": answer_result,
"gen_code": full_code,
"ex_code": full_code,
"last_prompt": question,
"error": None
}
except Exception as code_error:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(code_error)
# Log the failed code execution
log_interaction(
user_query=question,
model_name=model_name,
response_content=f"❌ Error executing generated code: {error_msg}",
generated_code=full_code if 'full_code' in locals() else "",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": f"❌ Error executing generated code: {error_msg}",
"gen_code": full_code if 'full_code' in locals() else "",
"ex_code": full_code if 'full_code' in locals() else "",
"last_prompt": question,
"error": error_msg
}
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(e)
# Handle specific API errors
if "organization_restricted" in error_msg:
response_content = "❌ API Organization Restricted: Your API key access has been restricted. Please check your Groq API key or try generating a new one."
log_error_msg = "API access restricted"
elif "rate_limit" in error_msg.lower():
response_content = "❌ Rate limit exceeded. Please wait a moment and try again."
log_error_msg = "Rate limit exceeded"
else:
response_content = f"❌ Error: {error_msg}"
log_error_msg = error_msg
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=response_content,
generated_code="",
execution_time=execution_time,
error_message=log_error_msg,
is_image=False
)
return {
"role": "assistant",
"content": response_content,
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": log_error_msg
}