import os import pandas as pd from typing import Tuple from PIL import Image from dotenv import load_dotenv from langchain_groq import ChatGroq from langchain_google_genai import ChatGoogleGenerativeAI import matplotlib.pyplot as plt import json from datetime import datetime from huggingface_hub import HfApi import uuid # FORCE reload environment variables load_dotenv(override=True) # Get API keys with explicit None handling and debugging Groq_Token = os.getenv("GROQ_API_KEY") hf_token = os.getenv("HF_TOKEN") gemini_token = os.getenv("GEMINI_TOKEN") # Debug print (remove in production) print(f"Debug - Groq Token: {'Present' if Groq_Token else 'Missing'}") print(f"Debug - Groq Token Value: {Groq_Token[:10] + '...' if Groq_Token else 'None'}") print(f"Debug - Gemini Token: {'Present' if gemini_token else 'Missing'}") models = { "gpt-oss-20b": "openai/gpt-oss-20b", "gpt-oss-120b": "openai/gpt-oss-120b", "llama3.1": "llama-3.1-8b-instant", "llama3.3": "llama-3.3-70b-versatile", "deepseek-R1": "deepseek-r1-distill-llama-70b", "llama4 maverik":"meta-llama/llama-4-maverick-17b-128e-instruct", "llama4 scout":"meta-llama/llama-4-scout-17b-16e-instruct", "gemini-pro": "gemini-1.5-pro" } def log_interaction(user_query, model_name, response_content, generated_code, execution_time, error_message=None, is_image=False): """Log user interactions to Hugging Face dataset""" try: if not hf_token or hf_token.strip() == "": print("Warning: HF_TOKEN not available, skipping logging") return # Create log entry log_entry = { "timestamp": datetime.now().isoformat(), "session_id": str(uuid.uuid4()), "user_query": user_query, "model_name": model_name, "response_content": str(response_content), "generated_code": generated_code or "", "execution_time_seconds": execution_time, "error_message": error_message or "", "is_image_output": is_image, "success": error_message is None } # Create DataFrame df = pd.DataFrame([log_entry]) # Create unique filename with timestamp timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") random_id = str(uuid.uuid4())[:8] filename = f"interaction_log_{timestamp_str}_{random_id}.parquet" # Save locally first local_path = f"/tmp/{filename}" df.to_parquet(local_path, index=False) # Upload to Hugging Face api = HfApi(token=hf_token) api.upload_file( path_or_fileobj=local_path, path_in_repo=f"data/{filename}", repo_id="SustainabilityLabIITGN/VayuChat_logs", repo_type="dataset", ) # Clean up local file if os.path.exists(local_path): os.remove(local_path) print(f"Successfully logged interaction to HuggingFace: {filename}") except Exception as e: print(f"Error logging interaction: {e}") def preprocess_and_load_df(path: str) -> pd.DataFrame: """Load and preprocess the dataframe""" try: df = pd.read_csv(path) df["Timestamp"] = pd.to_datetime(df["Timestamp"]) return df except Exception as e: raise Exception(f"Error loading dataframe: {e}") def get_from_user(prompt): """Format user prompt""" return {"role": "user", "content": prompt} def ask_question(model_name, question): """Ask question with comprehensive error handling and logging""" start_time = datetime.now() try: # Reload environment variables to get fresh values load_dotenv(override=True) fresh_groq_token = os.getenv("GROQ_API_KEY") fresh_gemini_token = os.getenv("GEMINI_TOKEN") print(f"ask_question - Fresh Groq Token: {'Present' if fresh_groq_token else 'Missing'}") # Check API availability with fresh tokens if model_name == "gemini-pro": if not fresh_gemini_token or fresh_gemini_token.strip() == "": execution_time = (datetime.now() - start_time).total_seconds() error_msg = "Missing or empty API token" # Log the failed interaction log_interaction( user_query=question, model_name=model_name, response_content="Gemini API token not available or empty", generated_code="", execution_time=execution_time, error_message=error_msg, is_image=False ) return { "role": "assistant", "content": "Gemini API token not available or empty. Please set GEMINI_TOKEN in your environment variables.", "gen_code": "", "ex_code": "", "last_prompt": question, "error": error_msg } llm = ChatGoogleGenerativeAI( model=models[model_name], google_api_key=fresh_gemini_token, temperature=0 ) else: if not fresh_groq_token or fresh_groq_token.strip() == "": execution_time = (datetime.now() - start_time).total_seconds() error_msg = "Missing or empty API token" # Log the failed interaction log_interaction( user_query=question, model_name=model_name, response_content="Groq API token not available or empty", generated_code="", execution_time=execution_time, error_message=error_msg, is_image=False ) return { "role": "assistant", "content": "Groq API token not available or empty. Please set GROQ_API_KEY in your environment variables and restart the application.", "gen_code": "", "ex_code": "", "last_prompt": question, "error": error_msg } # Test the API key by trying to create the client try: llm = ChatGroq( model=models[model_name], api_key=fresh_groq_token, temperature=0.1 ) # Test with a simple call to verify the API key works test_response = llm.invoke("Test") print("API key test successful") except Exception as api_error: execution_time = (datetime.now() - start_time).total_seconds() error_msg = str(api_error) if "organization_restricted" in error_msg.lower() or "unauthorized" in error_msg.lower(): response_content = "API Key Error: Your Groq API key appears to be invalid, expired, or restricted. Please check your API key in the .env file." log_error_msg = f"API key validation failed: {error_msg}" else: response_content = f"API Connection Error: {error_msg}" log_error_msg = error_msg # Log the failed interaction log_interaction( user_query=question, model_name=model_name, response_content=response_content, generated_code="", execution_time=execution_time, error_message=log_error_msg, is_image=False ) return { "role": "assistant", "content": response_content, "gen_code": "", "ex_code": "", "last_prompt": question, "error": log_error_msg } # Check if data file exists if not os.path.exists("Data.csv"): execution_time = (datetime.now() - start_time).total_seconds() error_msg = "Data file not found" # Log the failed interaction log_interaction( user_query=question, model_name=model_name, response_content="Data.csv file not found", generated_code="", execution_time=execution_time, error_message=error_msg, is_image=False ) return { "role": "assistant", "content": "Data.csv file not found. Please ensure the data file is in the correct location.", "gen_code": "", "ex_code": "", "last_prompt": question, "error": error_msg } df_check = pd.read_csv("Data.csv") df_check["Timestamp"] = pd.to_datetime(df_check["Timestamp"]) df_check = df_check.head(5) new_line = "\n" template = f"""```python import pandas as pd import matplotlib.pyplot as plt import uuid import calendar import numpy as np # Set professional matplotlib styling plt.rcParams.update({{ 'font.size': 12, 'figure.dpi': 400, 'figure.facecolor': 'white', 'axes.facecolor': 'white', 'axes.edgecolor': '#e2e8f0', 'axes.linewidth': 1.2, 'axes.labelcolor': '#374151', 'axes.spines.top': False, 'axes.spines.right': False, 'axes.spines.left': True, 'axes.spines.bottom': True, 'axes.grid': True, 'grid.color': '#f1f5f9', 'grid.linewidth': 0.8, 'grid.alpha': 0.7, 'xtick.color': '#6b7280', 'ytick.color': '#6b7280', 'text.color': '#374151', 'figure.figsize': [12, 6], 'axes.prop_cycle': plt.cycler('color', ['#3b82f6', '#ef4444', '#10b981', '#f59e0b', '#8b5cf6', '#06b6d4']) }}) df = pd.read_csv("Data.csv") df["Timestamp"] = pd.to_datetime(df["Timestamp"]) # Available columns and data types: {new_line.join(map(lambda x: '# '+x, str(df_check.dtypes).split(new_line)))} # Question: {question.strip()} # Generate code to answer the question and save result in 'answer' variable # If creating a plot, save it with a unique filename and store the filename in 'answer' # If returning text/numbers, store the result directly in 'answer' ```""" system_prompt = """Generate Python code to answer the user's question about air quality data. IMPORTANT: Only generate Python code - no explanations, no thinking, just clean code. WHEN TO USE DIFFERENT OUTPUT TYPES: - Simple questions asking "Which city", "What month" (1-2 values) → TEXT ANSWERS (store text in 'answer') - Questions asking "Plot", "Show chart", "Visualize" → PLOTS (store filename in 'answer') - Questions with tabular data (lists of cities, rates, rankings, comparisons) → DATAFRAMES (store dataframe in 'answer') - Examples of DATAFRAME outputs: * Lists of cities with values (pollution levels, improvement rates) * Rankings or comparisons across multiple entities * Any result that would be >5 rows of data * Calculate/List/Compare operations with multiple results SAFETY & ROBUSTNESS RULES: - Always check if data exists before processing: if df.empty: answer = "No data available" - Handle missing values: use .dropna() or .fillna() appropriately - Use try-except blocks for risky operations like indexing - Validate city/location names exist in data before filtering - Check for empty results after filtering: if filtered_df.empty: answer = "No data found for specified criteria" - Use .round(2) for numerical results to avoid long decimals - Handle division by zero: check denominators before division - Validate date ranges exist in data - Use proper string formatting for answers with units (μg/m³) CRITICAL: PANDAS SYNTAX FIXES: - ALWAYS convert pandas/numpy values to int before using as list indices - Example: calendar.month_name[int(month_value)] NOT calendar.month_name[month_value] - Use int() conversion for ANY value used as index: int(row['month']), int(max_idx), etc. - When accessing pandas iloc results, wrap in int(): int(df.loc[idx, 'column']) - CORRECT groupby syntax: df.groupby([df['col1'], df['col2'].dt.year]) NOT df.groupby(['col1', 'col2'].dt.year) - Always reference DataFrame when accessing columns: df['column'].dt.year NOT 'column'].dt.year - Use proper DataFrame column references in all operations TECHNICAL REQUIREMENTS: - Save final result in variable called 'answer' - For TEXT: Store the direct answer as a string in 'answer' - For PLOTS: Save with unique filename f"plot_{{uuid.uuid4().hex[:8]}}.png" and store filename in 'answer' - For DATAFRAMES: Store the pandas DataFrame directly in 'answer' (e.g., answer = result_df) - Always use .iloc or .loc properly for pandas indexing - Close matplotlib figures with plt.close() to prevent memory leaks - Use proper column name checks before accessing columns - For dataframes, ensure proper column names and sorting for readability """ query = f"""{system_prompt} Complete the following code to answer the user's question: {template} """ # Make API call if model_name == "gemini-pro": response = llm.invoke(query) answer = response.content else: response = llm.invoke(query) answer = response.content # Extract and execute code try: if "```python" in answer: code_part = answer.split("```python")[1].split("```")[0] else: code_part = answer full_code = f""" {template.split("```python")[1].split("```")[0]} {code_part} """ # Execute code in a controlled environment local_vars = {} global_vars = { 'pd': pd, 'plt': plt, 'os': os, 'uuid': __import__('uuid'), 'calendar': __import__('calendar'), 'np': __import__('numpy') } exec(full_code, global_vars, local_vars) # Get the answer if 'answer' in local_vars: answer_result = local_vars['answer'] else: answer_result = "No answer variable found in generated code" execution_time = (datetime.now() - start_time).total_seconds() # Determine if output is an image is_image = isinstance(answer_result, str) and any(answer_result.endswith(ext) for ext in ['.png', '.jpg', '.jpeg']) # Log successful interaction log_interaction( user_query=question, model_name=model_name, response_content=str(answer_result), generated_code=full_code, execution_time=execution_time, error_message=None, is_image=is_image ) return { "role": "assistant", "content": answer_result, "gen_code": full_code, "ex_code": full_code, "last_prompt": question, "error": None } except Exception as code_error: execution_time = (datetime.now() - start_time).total_seconds() error_msg = str(code_error) # Log the failed code execution log_interaction( user_query=question, model_name=model_name, response_content=f"Error executing generated code: {error_msg}", generated_code=full_code if 'full_code' in locals() else "", execution_time=execution_time, error_message=error_msg, is_image=False ) return { "role": "assistant", "content": f"Error executing generated code: {error_msg}", "gen_code": full_code if 'full_code' in locals() else "", "ex_code": full_code if 'full_code' in locals() else "", "last_prompt": question, "error": error_msg } except Exception as e: execution_time = (datetime.now() - start_time).total_seconds() error_msg = str(e) # Handle specific API errors if "organization_restricted" in error_msg: response_content = "API Organization Restricted: Your API key access has been restricted. Please check your Groq API key or try generating a new one." log_error_msg = "API access restricted" elif "rate_limit" in error_msg.lower(): response_content = "Rate limit exceeded. Please wait a moment and try again." log_error_msg = "Rate limit exceeded" else: response_content = f"Error: {error_msg}" log_error_msg = error_msg # Log the failed interaction log_interaction( user_query=question, model_name=model_name, response_content=response_content, generated_code="", execution_time=execution_time, error_message=log_error_msg, is_image=False ) return { "role": "assistant", "content": response_content, "gen_code": "", "ex_code": "", "last_prompt": question, "error": log_error_msg }