|
import os |
|
import pandas as pd |
|
from typing import Tuple |
|
from PIL import Image |
|
from dotenv import load_dotenv |
|
from langchain_groq import ChatGroq |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
import matplotlib.pyplot as plt |
|
import json |
|
from datetime import datetime |
|
from huggingface_hub import HfApi |
|
import uuid |
|
|
|
|
|
load_dotenv(override=True) |
|
|
|
|
|
Groq_Token = os.getenv("GROQ_API_KEY") |
|
hf_token = os.getenv("HF_TOKEN") |
|
gemini_token = os.getenv("GEMINI_TOKEN") |
|
|
|
|
|
print(f"Debug - Groq Token: {'Present' if Groq_Token else 'Missing'}") |
|
print(f"Debug - Groq Token Value: {Groq_Token[:10] + '...' if Groq_Token else 'None'}") |
|
print(f"Debug - Gemini Token: {'Present' if gemini_token else 'Missing'}") |
|
|
|
models = { |
|
"gpt-oss-20b": "openai/gpt-oss-20b", |
|
"gpt-oss-120b": "openai/gpt-oss-120b", |
|
"llama3.1": "llama-3.1-8b-instant", |
|
"llama3.3": "llama-3.3-70b-versatile", |
|
"deepseek-R1": "deepseek-r1-distill-llama-70b", |
|
"llama4 maverik":"meta-llama/llama-4-maverick-17b-128e-instruct", |
|
"llama4 scout":"meta-llama/llama-4-scout-17b-16e-instruct", |
|
"gemini-pro": "gemini-1.5-pro" |
|
} |
|
|
|
def log_interaction(user_query, model_name, response_content, generated_code, execution_time, error_message=None, is_image=False): |
|
"""Log user interactions to Hugging Face dataset""" |
|
try: |
|
if not hf_token or hf_token.strip() == "": |
|
print("Warning: HF_TOKEN not available, skipping logging") |
|
return |
|
|
|
|
|
log_entry = { |
|
"timestamp": datetime.now().isoformat(), |
|
"session_id": str(uuid.uuid4()), |
|
"user_query": user_query, |
|
"model_name": model_name, |
|
"response_content": str(response_content), |
|
"generated_code": generated_code or "", |
|
"execution_time_seconds": execution_time, |
|
"error_message": error_message or "", |
|
"is_image_output": is_image, |
|
"success": error_message is None |
|
} |
|
|
|
|
|
df = pd.DataFrame([log_entry]) |
|
|
|
|
|
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
random_id = str(uuid.uuid4())[:8] |
|
filename = f"interaction_log_{timestamp_str}_{random_id}.parquet" |
|
|
|
|
|
local_path = f"/tmp/{filename}" |
|
df.to_parquet(local_path, index=False) |
|
|
|
|
|
api = HfApi(token=hf_token) |
|
api.upload_file( |
|
path_or_fileobj=local_path, |
|
path_in_repo=f"data/{filename}", |
|
repo_id="SustainabilityLabIITGN/VayuChat_logs", |
|
repo_type="dataset", |
|
) |
|
|
|
|
|
if os.path.exists(local_path): |
|
os.remove(local_path) |
|
|
|
print(f"Successfully logged interaction to HuggingFace: {filename}") |
|
|
|
except Exception as e: |
|
print(f"Error logging interaction: {e}") |
|
|
|
def preprocess_and_load_df(path: str) -> pd.DataFrame: |
|
"""Load and preprocess the dataframe""" |
|
try: |
|
df = pd.read_csv(path) |
|
df["Timestamp"] = pd.to_datetime(df["Timestamp"]) |
|
return df |
|
except Exception as e: |
|
raise Exception(f"Error loading dataframe: {e}") |
|
|
|
|
|
|
|
def get_from_user(prompt): |
|
"""Format user prompt""" |
|
return {"role": "user", "content": prompt} |
|
|
|
|
|
|
|
|
|
def ask_question(model_name, question): |
|
"""Ask question with comprehensive error handling and logging""" |
|
start_time = datetime.now() |
|
try: |
|
|
|
load_dotenv(override=True) |
|
fresh_groq_token = os.getenv("GROQ_API_KEY") |
|
fresh_gemini_token = os.getenv("GEMINI_TOKEN") |
|
|
|
print(f"ask_question - Fresh Groq Token: {'Present' if fresh_groq_token else 'Missing'}") |
|
|
|
|
|
if model_name == "gemini-pro": |
|
if not fresh_gemini_token or fresh_gemini_token.strip() == "": |
|
execution_time = (datetime.now() - start_time).total_seconds() |
|
error_msg = "Missing or empty API token" |
|
|
|
|
|
log_interaction( |
|
user_query=question, |
|
model_name=model_name, |
|
response_content="Gemini API token not available or empty", |
|
generated_code="", |
|
execution_time=execution_time, |
|
error_message=error_msg, |
|
is_image=False |
|
) |
|
|
|
return { |
|
"role": "assistant", |
|
"content": "Gemini API token not available or empty. Please set GEMINI_TOKEN in your environment variables.", |
|
"gen_code": "", |
|
"ex_code": "", |
|
"last_prompt": question, |
|
"error": error_msg |
|
} |
|
llm = ChatGoogleGenerativeAI( |
|
model=models[model_name], |
|
google_api_key=fresh_gemini_token, |
|
temperature=0 |
|
) |
|
else: |
|
if not fresh_groq_token or fresh_groq_token.strip() == "": |
|
execution_time = (datetime.now() - start_time).total_seconds() |
|
error_msg = "Missing or empty API token" |
|
|
|
|
|
log_interaction( |
|
user_query=question, |
|
model_name=model_name, |
|
response_content="Groq API token not available or empty", |
|
generated_code="", |
|
execution_time=execution_time, |
|
error_message=error_msg, |
|
is_image=False |
|
) |
|
|
|
return { |
|
"role": "assistant", |
|
"content": "Groq API token not available or empty. Please set GROQ_API_KEY in your environment variables and restart the application.", |
|
"gen_code": "", |
|
"ex_code": "", |
|
"last_prompt": question, |
|
"error": error_msg |
|
} |
|
|
|
|
|
try: |
|
llm = ChatGroq( |
|
model=models[model_name], |
|
api_key=fresh_groq_token, |
|
temperature=0.1 |
|
) |
|
|
|
test_response = llm.invoke("Test") |
|
print("API key test successful") |
|
except Exception as api_error: |
|
execution_time = (datetime.now() - start_time).total_seconds() |
|
error_msg = str(api_error) |
|
|
|
if "organization_restricted" in error_msg.lower() or "unauthorized" in error_msg.lower(): |
|
response_content = "API Key Error: Your Groq API key appears to be invalid, expired, or restricted. Please check your API key in the .env file." |
|
log_error_msg = f"API key validation failed: {error_msg}" |
|
else: |
|
response_content = f"API Connection Error: {error_msg}" |
|
log_error_msg = error_msg |
|
|
|
|
|
log_interaction( |
|
user_query=question, |
|
model_name=model_name, |
|
response_content=response_content, |
|
generated_code="", |
|
execution_time=execution_time, |
|
error_message=log_error_msg, |
|
is_image=False |
|
) |
|
|
|
return { |
|
"role": "assistant", |
|
"content": response_content, |
|
"gen_code": "", |
|
"ex_code": "", |
|
"last_prompt": question, |
|
"error": log_error_msg |
|
} |
|
|
|
|
|
if not os.path.exists("Data.csv"): |
|
execution_time = (datetime.now() - start_time).total_seconds() |
|
error_msg = "Data file not found" |
|
|
|
|
|
log_interaction( |
|
user_query=question, |
|
model_name=model_name, |
|
response_content="Data.csv file not found", |
|
generated_code="", |
|
execution_time=execution_time, |
|
error_message=error_msg, |
|
is_image=False |
|
) |
|
|
|
return { |
|
"role": "assistant", |
|
"content": "Data.csv file not found. Please ensure the data file is in the correct location.", |
|
"gen_code": "", |
|
"ex_code": "", |
|
"last_prompt": question, |
|
"error": error_msg |
|
} |
|
|
|
df_check = pd.read_csv("Data.csv") |
|
df_check["Timestamp"] = pd.to_datetime(df_check["Timestamp"]) |
|
df_check = df_check.head(5) |
|
|
|
new_line = "\n" |
|
|
|
template = f"""```python |
|
import pandas as pd |
|
import matplotlib.pyplot as plt |
|
import uuid |
|
import calendar |
|
import numpy as np |
|
|
|
# Set professional matplotlib styling |
|
plt.rcParams.update({{ |
|
'font.size': 12, |
|
'figure.dpi': 400, |
|
'figure.facecolor': 'white', |
|
'axes.facecolor': 'white', |
|
'axes.edgecolor': '#e2e8f0', |
|
'axes.linewidth': 1.2, |
|
'axes.labelcolor': '#374151', |
|
'axes.spines.top': False, |
|
'axes.spines.right': False, |
|
'axes.spines.left': True, |
|
'axes.spines.bottom': True, |
|
'axes.grid': True, |
|
'grid.color': '#f1f5f9', |
|
'grid.linewidth': 0.8, |
|
'grid.alpha': 0.7, |
|
'xtick.color': '#6b7280', |
|
'ytick.color': '#6b7280', |
|
'text.color': '#374151', |
|
'figure.figsize': [12, 6], |
|
'axes.prop_cycle': plt.cycler('color', ['#3b82f6', '#ef4444', '#10b981', '#f59e0b', '#8b5cf6', '#06b6d4']) |
|
}}) |
|
|
|
df = pd.read_csv("Data.csv") |
|
df["Timestamp"] = pd.to_datetime(df["Timestamp"]) |
|
|
|
# Available columns and data types: |
|
{new_line.join(map(lambda x: '# '+x, str(df_check.dtypes).split(new_line)))} |
|
|
|
# Question: {question.strip()} |
|
# Generate code to answer the question and save result in 'answer' variable |
|
# If creating a plot, save it with a unique filename and store the filename in 'answer' |
|
# If returning text/numbers, store the result directly in 'answer' |
|
```""" |
|
|
|
system_prompt = """Generate Python code to answer the user's question about air quality data. |
|
|
|
IMPORTANT: Only generate Python code - no explanations, no thinking, just clean code. |
|
|
|
WHEN TO USE DIFFERENT OUTPUT TYPES: |
|
- Simple questions asking "Which city", "What month" (1-2 values) → TEXT ANSWERS (store text in 'answer') |
|
- Questions asking "Plot", "Show chart", "Visualize" → PLOTS (store filename in 'answer') |
|
- Questions with tabular data (lists of cities, rates, rankings, comparisons) → DATAFRAMES (store dataframe in 'answer') |
|
- Examples of DATAFRAME outputs: |
|
* Lists of cities with values (pollution levels, improvement rates) |
|
* Rankings or comparisons across multiple entities |
|
* Any result that would be >5 rows of data |
|
* Calculate/List/Compare operations with multiple results |
|
|
|
SAFETY & ROBUSTNESS RULES: |
|
- Always check if data exists before processing: if df.empty: answer = "No data available" |
|
- Handle missing values: use .dropna() or .fillna() appropriately |
|
- Use try-except blocks for risky operations like indexing |
|
- Validate city/location names exist in data before filtering |
|
- Check for empty results after filtering: if filtered_df.empty: answer = "No data found for specified criteria" |
|
- Use .round(2) for numerical results to avoid long decimals |
|
- Handle division by zero: check denominators before division |
|
- Validate date ranges exist in data |
|
- Use proper string formatting for answers with units (μg/m³) |
|
|
|
CRITICAL: PANDAS SYNTAX FIXES: |
|
- ALWAYS convert pandas/numpy values to int before using as list indices |
|
- Example: calendar.month_name[int(month_value)] NOT calendar.month_name[month_value] |
|
- Use int() conversion for ANY value used as index: int(row['month']), int(max_idx), etc. |
|
- When accessing pandas iloc results, wrap in int(): int(df.loc[idx, 'column']) |
|
- CORRECT groupby syntax: df.groupby([df['col1'], df['col2'].dt.year]) NOT df.groupby(['col1', 'col2'].dt.year) |
|
- Always reference DataFrame when accessing columns: df['column'].dt.year NOT 'column'].dt.year |
|
- Use proper DataFrame column references in all operations |
|
|
|
TECHNICAL REQUIREMENTS: |
|
- Save final result in variable called 'answer' |
|
- For TEXT: Store the direct answer as a string in 'answer' |
|
- For PLOTS: Save with unique filename f"plot_{{uuid.uuid4().hex[:8]}}.png" and store filename in 'answer' |
|
- For DATAFRAMES: Store the pandas DataFrame directly in 'answer' (e.g., answer = result_df) |
|
- Always use .iloc or .loc properly for pandas indexing |
|
- Close matplotlib figures with plt.close() to prevent memory leaks |
|
- Use proper column name checks before accessing columns |
|
- For dataframes, ensure proper column names and sorting for readability |
|
""" |
|
|
|
query = f"""{system_prompt} |
|
|
|
Complete the following code to answer the user's question: |
|
|
|
{template} |
|
""" |
|
|
|
|
|
if model_name == "gemini-pro": |
|
response = llm.invoke(query) |
|
answer = response.content |
|
else: |
|
response = llm.invoke(query) |
|
answer = response.content |
|
|
|
|
|
try: |
|
if "```python" in answer: |
|
code_part = answer.split("```python")[1].split("```")[0] |
|
else: |
|
code_part = answer |
|
|
|
full_code = f""" |
|
{template.split("```python")[1].split("```")[0]} |
|
{code_part} |
|
""" |
|
|
|
|
|
local_vars = {} |
|
global_vars = { |
|
'pd': pd, |
|
'plt': plt, |
|
'os': os, |
|
'uuid': __import__('uuid'), |
|
'calendar': __import__('calendar'), |
|
'np': __import__('numpy') |
|
} |
|
|
|
exec(full_code, global_vars, local_vars) |
|
|
|
|
|
if 'answer' in local_vars: |
|
answer_result = local_vars['answer'] |
|
else: |
|
answer_result = "No answer variable found in generated code" |
|
|
|
execution_time = (datetime.now() - start_time).total_seconds() |
|
|
|
|
|
is_image = isinstance(answer_result, str) and any(answer_result.endswith(ext) for ext in ['.png', '.jpg', '.jpeg']) |
|
|
|
|
|
log_interaction( |
|
user_query=question, |
|
model_name=model_name, |
|
response_content=str(answer_result), |
|
generated_code=full_code, |
|
execution_time=execution_time, |
|
error_message=None, |
|
is_image=is_image |
|
) |
|
|
|
return { |
|
"role": "assistant", |
|
"content": answer_result, |
|
"gen_code": full_code, |
|
"ex_code": full_code, |
|
"last_prompt": question, |
|
"error": None |
|
} |
|
|
|
except Exception as code_error: |
|
execution_time = (datetime.now() - start_time).total_seconds() |
|
error_msg = str(code_error) |
|
|
|
|
|
log_interaction( |
|
user_query=question, |
|
model_name=model_name, |
|
response_content=f"Error executing generated code: {error_msg}", |
|
generated_code=full_code if 'full_code' in locals() else "", |
|
execution_time=execution_time, |
|
error_message=error_msg, |
|
is_image=False |
|
) |
|
|
|
return { |
|
"role": "assistant", |
|
"content": f"Error executing generated code: {error_msg}", |
|
"gen_code": full_code if 'full_code' in locals() else "", |
|
"ex_code": full_code if 'full_code' in locals() else "", |
|
"last_prompt": question, |
|
"error": error_msg |
|
} |
|
|
|
except Exception as e: |
|
execution_time = (datetime.now() - start_time).total_seconds() |
|
error_msg = str(e) |
|
|
|
|
|
if "organization_restricted" in error_msg: |
|
response_content = "API Organization Restricted: Your API key access has been restricted. Please check your Groq API key or try generating a new one." |
|
log_error_msg = "API access restricted" |
|
elif "rate_limit" in error_msg.lower(): |
|
response_content = "Rate limit exceeded. Please wait a moment and try again." |
|
log_error_msg = "Rate limit exceeded" |
|
else: |
|
response_content = f"Error: {error_msg}" |
|
log_error_msg = error_msg |
|
|
|
|
|
log_interaction( |
|
user_query=question, |
|
model_name=model_name, |
|
response_content=response_content, |
|
generated_code="", |
|
execution_time=execution_time, |
|
error_message=log_error_msg, |
|
is_image=False |
|
) |
|
|
|
return { |
|
"role": "assistant", |
|
"content": response_content, |
|
"gen_code": "", |
|
"ex_code": "", |
|
"last_prompt": question, |
|
"error": log_error_msg |
|
} |