#!/usr/bin/env python """ API Key Loading Diagnostic Script for GAIA This script diagnoses API key loading issues by: 1. Detecting where API keys are being loaded from (.env file, environment variables, config files) 2. Showing the actual loading paths and mechanisms being used 3. Verifying the existence of .env files in all expected locations 4. Testing loading each API key using the exact same code path used by the main system """ import os import sys import json import logging from pathlib import Path from typing import Dict, Any, List, Optional, Tuple, Set import importlib.util # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("api_key_diagnostic") # Add the project root to sys.path sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) # Try to import GAIA configuration components try: from src.gaia.config.default import DEFAULT_CONFIG from src.gaia.config.env import load_env_config, ENV_CONFIG_MAPPING from src.gaia.config.loader import ConfigLoader, Configuration CONFIG_IMPORTS_SUCCESSFUL = True except ImportError as e: logger.error(f"Failed to import GAIA configuration modules: {e}") logger.warning("Will attempt to load configuration modules dynamically") CONFIG_IMPORTS_SUCCESSFUL = False # List of key API providers and their configuration paths API_PROVIDERS = { "openai": "api.openai.api_key", "serper": "api.serper.api_key", "perplexity": "api.perplexity.api_key", "supabase": "api.supabase.key", "huggingface": "api.huggingface.token", } # Environment variable names for API keys API_ENV_VARS = { "openai": "OPENAI_API_KEY", "serper": "SERPER_API_KEY", "perplexity": "PERPLEXITY_API_KEY", "supabase": "SUPABASE_KEY", "huggingface": "HF_TOKEN", } class ApiKeyDiagnostic: """Diagnostic tool for checking API key loading in GAIA.""" def __init__(self): """Initialize the diagnostic tool.""" self.results = { "env_vars_present": {}, "dotenv_files": {}, "config_files": {}, "loaded_config": {}, "final_api_keys": {}, "issues": [], } # Get current working directory and project root self.cwd = os.getcwd() self.project_root = self._find_project_root() # State for tracking loaded configuration self.config_loader = None self.config = None def _find_project_root(self) -> str: """Find the project root directory by looking for indicators like pyproject.toml, setup.py, etc.""" current_dir = Path(self.cwd) # Try to find project root indicators root_indicators = ["setup.py", "pyproject.toml", "README.md", "src/gaia"] while current_dir != current_dir.parent: for indicator in root_indicators: if (current_dir / indicator).exists(): return str(current_dir) current_dir = current_dir.parent # If no root found, return current directory return self.cwd def check_env_variables(self): """Check which API-related environment variables are present.""" logger.info("Checking environment variables...") # Check all environment variables related to API keys for provider, env_var in API_ENV_VARS.items(): value = os.environ.get(env_var) self.results["env_vars_present"][env_var] = { "present": value is not None, "value": f"{value[:5]}..." if value else None } logger.info(f"Environment variable {env_var}: {'PRESENT' if value else 'NOT PRESENT'}") # Check additional environment variables used for configuration for env_var in ENV_CONFIG_MAPPING.keys(): if env_var not in self.results["env_vars_present"]: value = os.environ.get(env_var) self.results["env_vars_present"][env_var] = { "present": value is not None, "value": f"{value[:5]}..." if value else None } if value: logger.info(f"Additional env var {env_var}: PRESENT (maps to {ENV_CONFIG_MAPPING[env_var]})") def find_dotenv_files(self): """Find .env files in common locations.""" logger.info("Searching for .env files...") # Common locations for .env files locations = [ self.project_root, # Project root os.path.join(self.project_root, "src"), # src directory os.path.join(self.project_root, "src/gaia"), # src/gaia directory os.path.join(self.project_root, "config"), # config directory os.path.dirname(os.path.abspath(__file__)), # Current script directory self.cwd, # Current working directory ] for location in locations: if os.path.isdir(location): dotenv_path = os.path.join(location, ".env") exists = os.path.isfile(dotenv_path) self.results["dotenv_files"][dotenv_path] = exists if exists: logger.info(f"Found .env file: {dotenv_path}") self._check_dotenv_content(dotenv_path) else: logger.info(f"No .env file found at: {dotenv_path}") # Check if any .env files were found if not any(self.results["dotenv_files"].values()): self.results["issues"].append({ "type": "missing_dotenv", "message": "No .env files found in common locations", "severity": "WARNING" }) logger.warning("No .env files found in any of the checked locations!") def _check_dotenv_content(self, dotenv_path: str): """Check the content of a .env file for API keys.""" try: with open(dotenv_path, 'r') as f: content = f.read() # Check for API key entries api_keys_present = {} for provider, env_var in API_ENV_VARS.items(): if f"{env_var}=" in content or f"{env_var} =" in content: api_keys_present[env_var] = True logger.info(f" - Found {env_var} in {dotenv_path}") else: api_keys_present[env_var] = False self.results["dotenv_files"][dotenv_path] = { "exists": True, "api_keys_present": api_keys_present } # Add warning if .env file exists but no API keys if not any(api_keys_present.values()): self.results["issues"].append({ "type": "empty_dotenv", "message": f".env file at {dotenv_path} exists but contains no API keys", "severity": "WARNING" }) logger.warning(f".env file at {dotenv_path} exists but contains no API keys") except Exception as e: logger.error(f"Error reading .env file {dotenv_path}: {e}") self.results["dotenv_files"][dotenv_path] = { "exists": True, "error": str(e) } def find_config_files(self): """Find configuration files (JSON/YAML) that might contain API settings.""" logger.info("Searching for configuration files...") # Common directories to look for configuration files config_dirs = [ self.project_root, os.path.join(self.project_root, "config"), os.path.join(self.project_root, "src/gaia/config"), ] # Config file patterns to look for config_patterns = ["*config*.json", "*config*.yaml", "*config*.yml", "*.env.json"] config_files = [] for directory in config_dirs: if os.path.isdir(directory): for pattern in config_patterns: for file_path in Path(directory).glob(pattern): if file_path.is_file(): config_files.append(str(file_path)) # Check each config file for API keys for file_path in config_files: try: self._check_config_file(file_path) except Exception as e: logger.error(f"Error checking config file {file_path}: {e}") self.results["config_files"][file_path] = { "exists": True, "error": str(e) } def _check_config_file(self, file_path: str): """Check a config file for API key configurations.""" try: # Determine file type by extension if file_path.endswith('.json'): with open(file_path, 'r') as f: config_data = json.load(f) elif file_path.endswith(('.yaml', '.yml')): import yaml with open(file_path, 'r') as f: config_data = yaml.safe_load(f) else: logger.warning(f"Unsupported config file format: {file_path}") return # Check for API key entries in the config file api_keys_present = {} for provider, config_path in API_PROVIDERS.items(): # Parse the config path parts = config_path.split('.') value = config_data try: for part in parts: value = value.get(part, {}) # Check if an API key exists at this path if isinstance(value, str) and value: api_keys_present[config_path] = True logger.info(f" - Found {config_path} in {file_path}") else: api_keys_present[config_path] = False except (AttributeError, TypeError): api_keys_present[config_path] = False self.results["config_files"][file_path] = { "exists": True, "api_keys_present": api_keys_present } except Exception as e: logger.error(f"Error processing config file {file_path}: {e}") self.results["config_files"][file_path] = { "exists": True, "error": str(e) } def load_config_with_gaia_loader(self): """Load configuration using GAIA's own config loader.""" logger.info("Loading configuration using GAIA's config loader...") try: if CONFIG_IMPORTS_SUCCESSFUL: # Use imported config loader self.config_loader = ConfigLoader() self.config = self.config_loader.load_config() logger.info("Successfully loaded config with GAIA's ConfigLoader") else: # Try to dynamically load the config loader self._load_config_dynamically() # Check for each API key in the loaded configuration if self.config: for provider, config_path in API_PROVIDERS.items(): value = self.config.get(config_path) self.results["loaded_config"][config_path] = { "present": value is not None and value != "", "value": f"{value[:5]}..." if value else None } # Log the result if value: logger.info(f"API key for {provider} loaded successfully via {config_path}") else: logger.warning(f"No API key found for {provider} at {config_path}") self.results["issues"].append({ "type": "missing_api_key", "provider": provider, "config_path": config_path, "message": f"No API key found for {provider} at {config_path}", "severity": "WARNING" }) except Exception as e: logger.error(f"Error loading configuration: {e}") self.results["issues"].append({ "type": "config_loading_error", "message": f"Error loading configuration: {e}", "severity": "ERROR" }) def _load_config_dynamically(self): """Attempt to dynamically load the configuration system if imports failed.""" logger.info("Attempting to dynamically load configuration modules...") try: # First, try to load default config default_path = os.path.join(self.project_root, "src/gaia/config/default.py") if os.path.exists(default_path): default_module = self._import_module_from_path(default_path, "default") default_config = getattr(default_module, "DEFAULT_CONFIG", {}) # Now try to load the config loader loader_path = os.path.join(self.project_root, "src/gaia/config/loader.py") if os.path.exists(loader_path): loader_module = self._import_module_from_path(loader_path, "loader") ConfigLoader = getattr(loader_module, "ConfigLoader", None) if ConfigLoader: self.config_loader = ConfigLoader() self.config = self.config_loader.load_config() logger.info("Successfully loaded config dynamically") else: logger.error("Failed to find ConfigLoader class in loader module") else: logger.error(f"Config loader module not found at {loader_path}") else: logger.error(f"Default config module not found at {default_path}") except Exception as e: logger.error(f"Error in dynamic loading of config modules: {e}") def _import_module_from_path(self, path, module_name): """Dynamically import a module from a file path.""" spec = importlib.util.spec_from_file_location(module_name, path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module def verify_direct_key_loading(self): """Verify that each API key can be loaded directly from environment variables.""" logger.info("Testing direct API key loading from environment variables...") # Test loading each API key directly from environment variables for provider, env_var in API_ENV_VARS.items(): value = os.environ.get(env_var) self.results["final_api_keys"][provider] = { "from_env": value is not None and value != "", "env_var": env_var, "value": f"{value[:5]}..." if value else None } if value: logger.info(f"API key for {provider} found in environment variable {env_var}") else: logger.warning(f"API key for {provider} not found in environment variable {env_var}") def check_working_directory(self): """Check the working directory and its relationship to config files.""" logger.info(f"Current working directory: {self.cwd}") logger.info(f"Detected project root: {self.project_root}") # Check if CWD is different from project root if os.path.normpath(self.cwd) != os.path.normpath(self.project_root): logger.warning(f"Working directory is different from project root. " f"This might affect relative path resolution.") self.results["issues"].append({ "type": "working_directory", "message": "Working directory is different from project root", "cwd": self.cwd, "project_root": self.project_root, "severity": "WARNING" }) def check_path_differences(self): """Check for absolute vs. relative path differences in credential loading.""" # Look for potential path issues in the .env files and config files for dotenv_path, exists in self.results["dotenv_files"].items(): if isinstance(exists, dict) and exists.get("exists"): if os.path.isabs(dotenv_path): # Check if the absolute path would be resolvable from a relative context rel_path = os.path.relpath(dotenv_path, self.project_root) if rel_path.startswith('..'): self.results["issues"].append({ "type": "path_resolution", "message": f".env file at {dotenv_path} might not be resolvable from project root", "abs_path": dotenv_path, "rel_path": rel_path, "severity": "WARNING" }) logger.warning(f".env file at {dotenv_path} might not be resolvable from project root") def run_diagnostics(self): """Run all diagnostic checks.""" logger.info("Starting GAIA API key loading diagnostics...") # Get working directory info self.check_working_directory() # Check environment variables self.check_env_variables() # Find .env files self.find_dotenv_files() # Find configuration files self.find_config_files() # Load config with GAIA loader self.load_config_with_gaia_loader() # Verify direct key loading self.verify_direct_key_loading() # Check path differences self.check_path_differences() # Summarize issues self._summarize_issues() logger.info("Diagnostic checks completed") return self.results def _summarize_issues(self): """Summarize and categorize the detected issues.""" if not self.results["issues"]: logger.info("No issues detected") return logger.info(f"Found {len(self.results['issues'])} potential issues:") for i, issue in enumerate(self.results["issues"], 1): logger.info(f"{i}. [{issue['severity']}] {issue['message']}") def generate_report(self, output_file="api_key_loading_report.json"): """Generate a JSON report of the diagnostics.""" report_path = os.path.join(self.cwd, output_file) with open(report_path, 'w') as f: json.dump(self.results, f, indent=2) logger.info(f"Diagnostic report saved to {report_path}") def main(): """Main function to run the diagnostics.""" diagnostic = ApiKeyDiagnostic() results = diagnostic.run_diagnostics() diagnostic.generate_report() # Print a summary to the console print("\n========== API KEY LOADING DIAGNOSTIC SUMMARY ==========") print(f"Working directory: {diagnostic.cwd}") print(f"Project root: {diagnostic.project_root}") print("\nAPI KEY AVAILABILITY:") for provider, env_var in API_ENV_VARS.items(): env_value = os.environ.get(env_var) config_value = None if diagnostic.config: config_value = diagnostic.config.get(API_PROVIDERS[provider]) if env_value: print(f" {provider}: AVAILABLE (from environment variable {env_var})") elif config_value: print(f" {provider}: AVAILABLE (from configuration)") else: print(f" {provider}: NOT AVAILABLE") print("\nDOTENV FILES:") dotenv_found = False for dotenv_path, exists in diagnostic.results["dotenv_files"].items(): if isinstance(exists, dict) and exists.get("exists"): dotenv_found = True print(f" Found: {dotenv_path}") if exists.get("api_keys_present"): keys_found = [k for k, v in exists["api_keys_present"].items() if v] if keys_found: print(f" Contains keys: {', '.join(keys_found)}") else: print(f" No API keys found in this file") if not dotenv_found: print(" No .env files found") print("\nISSUES:") if not diagnostic.results["issues"]: print(" No issues detected") else: for i, issue in enumerate(diagnostic.results["issues"], 1): print(f" {i}. [{issue['severity']}] {issue['message']}") print("\nFull diagnostic results saved to api_key_loading_report.json") print("===========================================================") if __name__ == "__main__": main()