VayuChat / src.py
Nipun's picture
Fix pandas syntax errors in system prompt
e4a1677
import os
import pandas as pd
from typing import Tuple
from PIL import Image
from dotenv import load_dotenv
from langchain_groq import ChatGroq
from langchain_google_genai import ChatGoogleGenerativeAI
import matplotlib.pyplot as plt
import json
from datetime import datetime
from huggingface_hub import HfApi
import uuid
# FORCE reload environment variables
load_dotenv(override=True)
# Get API keys with explicit None handling and debugging
Groq_Token = os.getenv("GROQ_API_KEY")
hf_token = os.getenv("HF_TOKEN")
gemini_token = os.getenv("GEMINI_TOKEN")
# Debug print (remove in production)
print(f"Debug - Groq Token: {'Present' if Groq_Token else 'Missing'}")
print(f"Debug - Groq Token Value: {Groq_Token[:10] + '...' if Groq_Token else 'None'}")
print(f"Debug - Gemini Token: {'Present' if gemini_token else 'Missing'}")
models = {
"gpt-oss-20b": "openai/gpt-oss-20b",
"gpt-oss-120b": "openai/gpt-oss-120b",
"llama3.1": "llama-3.1-8b-instant",
"llama3.3": "llama-3.3-70b-versatile",
"deepseek-R1": "deepseek-r1-distill-llama-70b",
"llama4 maverik":"meta-llama/llama-4-maverick-17b-128e-instruct",
"llama4 scout":"meta-llama/llama-4-scout-17b-16e-instruct",
"gemini-pro": "gemini-1.5-pro"
}
def log_interaction(user_query, model_name, response_content, generated_code, execution_time, error_message=None, is_image=False):
"""Log user interactions to Hugging Face dataset"""
try:
if not hf_token or hf_token.strip() == "":
print("Warning: HF_TOKEN not available, skipping logging")
return
# Create log entry
log_entry = {
"timestamp": datetime.now().isoformat(),
"session_id": str(uuid.uuid4()),
"user_query": user_query,
"model_name": model_name,
"response_content": str(response_content),
"generated_code": generated_code or "",
"execution_time_seconds": execution_time,
"error_message": error_message or "",
"is_image_output": is_image,
"success": error_message is None
}
# Create DataFrame
df = pd.DataFrame([log_entry])
# Create unique filename with timestamp
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
random_id = str(uuid.uuid4())[:8]
filename = f"interaction_log_{timestamp_str}_{random_id}.parquet"
# Save locally first
local_path = f"/tmp/{filename}"
df.to_parquet(local_path, index=False)
# Upload to Hugging Face
api = HfApi(token=hf_token)
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=f"data/{filename}",
repo_id="SustainabilityLabIITGN/VayuChat_logs",
repo_type="dataset",
)
# Clean up local file
if os.path.exists(local_path):
os.remove(local_path)
print(f"Successfully logged interaction to HuggingFace: {filename}")
except Exception as e:
print(f"Error logging interaction: {e}")
def preprocess_and_load_df(path: str) -> pd.DataFrame:
"""Load and preprocess the dataframe"""
try:
df = pd.read_csv(path)
df["Timestamp"] = pd.to_datetime(df["Timestamp"])
return df
except Exception as e:
raise Exception(f"Error loading dataframe: {e}")
def get_from_user(prompt):
"""Format user prompt"""
return {"role": "user", "content": prompt}
def ask_question(model_name, question):
"""Ask question with comprehensive error handling and logging"""
start_time = datetime.now()
try:
# Reload environment variables to get fresh values
load_dotenv(override=True)
fresh_groq_token = os.getenv("GROQ_API_KEY")
fresh_gemini_token = os.getenv("GEMINI_TOKEN")
print(f"ask_question - Fresh Groq Token: {'Present' if fresh_groq_token else 'Missing'}")
# Check API availability with fresh tokens
if model_name == "gemini-pro":
if not fresh_gemini_token or fresh_gemini_token.strip() == "":
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Missing or empty API token"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="Gemini API token not available or empty",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "Gemini API token not available or empty. Please set GEMINI_TOKEN in your environment variables.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
llm = ChatGoogleGenerativeAI(
model=models[model_name],
google_api_key=fresh_gemini_token,
temperature=0
)
else:
if not fresh_groq_token or fresh_groq_token.strip() == "":
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Missing or empty API token"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="Groq API token not available or empty",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "Groq API token not available or empty. Please set GROQ_API_KEY in your environment variables and restart the application.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
# Test the API key by trying to create the client
try:
llm = ChatGroq(
model=models[model_name],
api_key=fresh_groq_token,
temperature=0.1
)
# Test with a simple call to verify the API key works
test_response = llm.invoke("Test")
print("API key test successful")
except Exception as api_error:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(api_error)
if "organization_restricted" in error_msg.lower() or "unauthorized" in error_msg.lower():
response_content = "API Key Error: Your Groq API key appears to be invalid, expired, or restricted. Please check your API key in the .env file."
log_error_msg = f"API key validation failed: {error_msg}"
else:
response_content = f"API Connection Error: {error_msg}"
log_error_msg = error_msg
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=response_content,
generated_code="",
execution_time=execution_time,
error_message=log_error_msg,
is_image=False
)
return {
"role": "assistant",
"content": response_content,
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": log_error_msg
}
# Check if data file exists
if not os.path.exists("Data.csv"):
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = "Data file not found"
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content="Data.csv file not found",
generated_code="",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": "Data.csv file not found. Please ensure the data file is in the correct location.",
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": error_msg
}
df_check = pd.read_csv("Data.csv")
df_check["Timestamp"] = pd.to_datetime(df_check["Timestamp"])
df_check = df_check.head(5)
new_line = "\n"
template = f"""```python
import pandas as pd
import matplotlib.pyplot as plt
import uuid
import calendar
import numpy as np
# Set professional matplotlib styling
plt.rcParams.update({{
'font.size': 12,
'figure.dpi': 400,
'figure.facecolor': 'white',
'axes.facecolor': 'white',
'axes.edgecolor': '#e2e8f0',
'axes.linewidth': 1.2,
'axes.labelcolor': '#374151',
'axes.spines.top': False,
'axes.spines.right': False,
'axes.spines.left': True,
'axes.spines.bottom': True,
'axes.grid': True,
'grid.color': '#f1f5f9',
'grid.linewidth': 0.8,
'grid.alpha': 0.7,
'xtick.color': '#6b7280',
'ytick.color': '#6b7280',
'text.color': '#374151',
'figure.figsize': [12, 6],
'axes.prop_cycle': plt.cycler('color', ['#3b82f6', '#ef4444', '#10b981', '#f59e0b', '#8b5cf6', '#06b6d4'])
}})
df = pd.read_csv("Data.csv")
df["Timestamp"] = pd.to_datetime(df["Timestamp"])
# Available columns and data types:
{new_line.join(map(lambda x: '# '+x, str(df_check.dtypes).split(new_line)))}
# Question: {question.strip()}
# Generate code to answer the question and save result in 'answer' variable
# If creating a plot, save it with a unique filename and store the filename in 'answer'
# If returning text/numbers, store the result directly in 'answer'
```"""
system_prompt = """Generate Python code to answer the user's question about air quality data.
IMPORTANT: Only generate Python code - no explanations, no thinking, just clean code.
WHEN TO USE DIFFERENT OUTPUT TYPES:
- Simple questions asking "Which city", "What month" (1-2 values) → TEXT ANSWERS (store text in 'answer')
- Questions asking "Plot", "Show chart", "Visualize" → PLOTS (store filename in 'answer')
- Questions with tabular data (lists of cities, rates, rankings, comparisons) → DATAFRAMES (store dataframe in 'answer')
- Examples of DATAFRAME outputs:
* Lists of cities with values (pollution levels, improvement rates)
* Rankings or comparisons across multiple entities
* Any result that would be >5 rows of data
* Calculate/List/Compare operations with multiple results
SAFETY & ROBUSTNESS RULES:
- Always check if data exists before processing: if df.empty: answer = "No data available"
- Handle missing values: use .dropna() or .fillna() appropriately
- Use try-except blocks for risky operations like indexing
- Validate city/location names exist in data before filtering
- Check for empty results after filtering: if filtered_df.empty: answer = "No data found for specified criteria"
- Use .round(2) for numerical results to avoid long decimals
- Handle division by zero: check denominators before division
- Validate date ranges exist in data
- Use proper string formatting for answers with units (μg/m³)
CRITICAL: PANDAS SYNTAX FIXES:
- ALWAYS convert pandas/numpy values to int before using as list indices
- Example: calendar.month_name[int(month_value)] NOT calendar.month_name[month_value]
- Use int() conversion for ANY value used as index: int(row['month']), int(max_idx), etc.
- When accessing pandas iloc results, wrap in int(): int(df.loc[idx, 'column'])
- CORRECT groupby syntax: df.groupby([df['col1'], df['col2'].dt.year]) NOT df.groupby(['col1', 'col2'].dt.year)
- Always reference DataFrame when accessing columns: df['column'].dt.year NOT 'column'].dt.year
- Use proper DataFrame column references in all operations
TECHNICAL REQUIREMENTS:
- Save final result in variable called 'answer'
- For TEXT: Store the direct answer as a string in 'answer'
- For PLOTS: Save with unique filename f"plot_{{uuid.uuid4().hex[:8]}}.png" and store filename in 'answer'
- For DATAFRAMES: Store the pandas DataFrame directly in 'answer' (e.g., answer = result_df)
- Always use .iloc or .loc properly for pandas indexing
- Close matplotlib figures with plt.close() to prevent memory leaks
- Use proper column name checks before accessing columns
- For dataframes, ensure proper column names and sorting for readability
"""
query = f"""{system_prompt}
Complete the following code to answer the user's question:
{template}
"""
# Make API call
if model_name == "gemini-pro":
response = llm.invoke(query)
answer = response.content
else:
response = llm.invoke(query)
answer = response.content
# Extract and execute code
try:
if "```python" in answer:
code_part = answer.split("```python")[1].split("```")[0]
else:
code_part = answer
full_code = f"""
{template.split("```python")[1].split("```")[0]}
{code_part}
"""
# Execute code in a controlled environment
local_vars = {}
global_vars = {
'pd': pd,
'plt': plt,
'os': os,
'uuid': __import__('uuid'),
'calendar': __import__('calendar'),
'np': __import__('numpy')
}
exec(full_code, global_vars, local_vars)
# Get the answer
if 'answer' in local_vars:
answer_result = local_vars['answer']
else:
answer_result = "No answer variable found in generated code"
execution_time = (datetime.now() - start_time).total_seconds()
# Determine if output is an image
is_image = isinstance(answer_result, str) and any(answer_result.endswith(ext) for ext in ['.png', '.jpg', '.jpeg'])
# Log successful interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=str(answer_result),
generated_code=full_code,
execution_time=execution_time,
error_message=None,
is_image=is_image
)
return {
"role": "assistant",
"content": answer_result,
"gen_code": full_code,
"ex_code": full_code,
"last_prompt": question,
"error": None
}
except Exception as code_error:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(code_error)
# Log the failed code execution
log_interaction(
user_query=question,
model_name=model_name,
response_content=f"Error executing generated code: {error_msg}",
generated_code=full_code if 'full_code' in locals() else "",
execution_time=execution_time,
error_message=error_msg,
is_image=False
)
return {
"role": "assistant",
"content": f"Error executing generated code: {error_msg}",
"gen_code": full_code if 'full_code' in locals() else "",
"ex_code": full_code if 'full_code' in locals() else "",
"last_prompt": question,
"error": error_msg
}
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
error_msg = str(e)
# Handle specific API errors
if "organization_restricted" in error_msg:
response_content = "API Organization Restricted: Your API key access has been restricted. Please check your Groq API key or try generating a new one."
log_error_msg = "API access restricted"
elif "rate_limit" in error_msg.lower():
response_content = "Rate limit exceeded. Please wait a moment and try again."
log_error_msg = "Rate limit exceeded"
else:
response_content = f"Error: {error_msg}"
log_error_msg = error_msg
# Log the failed interaction
log_interaction(
user_query=question,
model_name=model_name,
response_content=response_content,
generated_code="",
execution_time=execution_time,
error_message=log_error_msg,
is_image=False
)
return {
"role": "assistant",
"content": response_content,
"gen_code": "",
"ex_code": "",
"last_prompt": question,
"error": log_error_msg
}