Final_Assignment_GAIAAgent / src /gaia /utils /check_api_key_loading.py
JoachimVC's picture
Upload GAIA agent implementation files for assessment
c922f8b
#!/usr/bin/env python
"""
API Key Loading Diagnostic Script for GAIA
This script diagnoses API key loading issues by:
1. Detecting where API keys are being loaded from (.env file, environment variables, config files)
2. Showing the actual loading paths and mechanisms being used
3. Verifying the existence of .env files in all expected locations
4. Testing loading each API key using the exact same code path used by the main system
"""
import os
import sys
import json
import logging
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple, Set
import importlib.util
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("api_key_diagnostic")
# Add the project root to sys.path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
# Try to import GAIA configuration components
try:
from src.gaia.config.default import DEFAULT_CONFIG
from src.gaia.config.env import load_env_config, ENV_CONFIG_MAPPING
from src.gaia.config.loader import ConfigLoader, Configuration
CONFIG_IMPORTS_SUCCESSFUL = True
except ImportError as e:
logger.error(f"Failed to import GAIA configuration modules: {e}")
logger.warning("Will attempt to load configuration modules dynamically")
CONFIG_IMPORTS_SUCCESSFUL = False
# List of key API providers and their configuration paths
API_PROVIDERS = {
"openai": "api.openai.api_key",
"serper": "api.serper.api_key",
"perplexity": "api.perplexity.api_key",
"supabase": "api.supabase.key",
"huggingface": "api.huggingface.token",
}
# Environment variable names for API keys
API_ENV_VARS = {
"openai": "OPENAI_API_KEY",
"serper": "SERPER_API_KEY",
"perplexity": "PERPLEXITY_API_KEY",
"supabase": "SUPABASE_KEY",
"huggingface": "HF_TOKEN",
}
class ApiKeyDiagnostic:
"""Diagnostic tool for checking API key loading in GAIA."""
def __init__(self):
"""Initialize the diagnostic tool."""
self.results = {
"env_vars_present": {},
"dotenv_files": {},
"config_files": {},
"loaded_config": {},
"final_api_keys": {},
"issues": [],
}
# Get current working directory and project root
self.cwd = os.getcwd()
self.project_root = self._find_project_root()
# State for tracking loaded configuration
self.config_loader = None
self.config = None
def _find_project_root(self) -> str:
"""Find the project root directory by looking for indicators like pyproject.toml, setup.py, etc."""
current_dir = Path(self.cwd)
# Try to find project root indicators
root_indicators = ["setup.py", "pyproject.toml", "README.md", "src/gaia"]
while current_dir != current_dir.parent:
for indicator in root_indicators:
if (current_dir / indicator).exists():
return str(current_dir)
current_dir = current_dir.parent
# If no root found, return current directory
return self.cwd
def check_env_variables(self):
"""Check which API-related environment variables are present."""
logger.info("Checking environment variables...")
# Check all environment variables related to API keys
for provider, env_var in API_ENV_VARS.items():
value = os.environ.get(env_var)
self.results["env_vars_present"][env_var] = {
"present": value is not None,
"value": f"{value[:5]}..." if value else None
}
logger.info(f"Environment variable {env_var}: {'PRESENT' if value else 'NOT PRESENT'}")
# Check additional environment variables used for configuration
for env_var in ENV_CONFIG_MAPPING.keys():
if env_var not in self.results["env_vars_present"]:
value = os.environ.get(env_var)
self.results["env_vars_present"][env_var] = {
"present": value is not None,
"value": f"{value[:5]}..." if value else None
}
if value:
logger.info(f"Additional env var {env_var}: PRESENT (maps to {ENV_CONFIG_MAPPING[env_var]})")
def find_dotenv_files(self):
"""Find .env files in common locations."""
logger.info("Searching for .env files...")
# Common locations for .env files
locations = [
self.project_root, # Project root
os.path.join(self.project_root, "src"), # src directory
os.path.join(self.project_root, "src/gaia"), # src/gaia directory
os.path.join(self.project_root, "config"), # config directory
os.path.dirname(os.path.abspath(__file__)), # Current script directory
self.cwd, # Current working directory
]
for location in locations:
if os.path.isdir(location):
dotenv_path = os.path.join(location, ".env")
exists = os.path.isfile(dotenv_path)
self.results["dotenv_files"][dotenv_path] = exists
if exists:
logger.info(f"Found .env file: {dotenv_path}")
self._check_dotenv_content(dotenv_path)
else:
logger.info(f"No .env file found at: {dotenv_path}")
# Check if any .env files were found
if not any(self.results["dotenv_files"].values()):
self.results["issues"].append({
"type": "missing_dotenv",
"message": "No .env files found in common locations",
"severity": "WARNING"
})
logger.warning("No .env files found in any of the checked locations!")
def _check_dotenv_content(self, dotenv_path: str):
"""Check the content of a .env file for API keys."""
try:
with open(dotenv_path, 'r') as f:
content = f.read()
# Check for API key entries
api_keys_present = {}
for provider, env_var in API_ENV_VARS.items():
if f"{env_var}=" in content or f"{env_var} =" in content:
api_keys_present[env_var] = True
logger.info(f" - Found {env_var} in {dotenv_path}")
else:
api_keys_present[env_var] = False
self.results["dotenv_files"][dotenv_path] = {
"exists": True,
"api_keys_present": api_keys_present
}
# Add warning if .env file exists but no API keys
if not any(api_keys_present.values()):
self.results["issues"].append({
"type": "empty_dotenv",
"message": f".env file at {dotenv_path} exists but contains no API keys",
"severity": "WARNING"
})
logger.warning(f".env file at {dotenv_path} exists but contains no API keys")
except Exception as e:
logger.error(f"Error reading .env file {dotenv_path}: {e}")
self.results["dotenv_files"][dotenv_path] = {
"exists": True,
"error": str(e)
}
def find_config_files(self):
"""Find configuration files (JSON/YAML) that might contain API settings."""
logger.info("Searching for configuration files...")
# Common directories to look for configuration files
config_dirs = [
self.project_root,
os.path.join(self.project_root, "config"),
os.path.join(self.project_root, "src/gaia/config"),
]
# Config file patterns to look for
config_patterns = ["*config*.json", "*config*.yaml", "*config*.yml", "*.env.json"]
config_files = []
for directory in config_dirs:
if os.path.isdir(directory):
for pattern in config_patterns:
for file_path in Path(directory).glob(pattern):
if file_path.is_file():
config_files.append(str(file_path))
# Check each config file for API keys
for file_path in config_files:
try:
self._check_config_file(file_path)
except Exception as e:
logger.error(f"Error checking config file {file_path}: {e}")
self.results["config_files"][file_path] = {
"exists": True,
"error": str(e)
}
def _check_config_file(self, file_path: str):
"""Check a config file for API key configurations."""
try:
# Determine file type by extension
if file_path.endswith('.json'):
with open(file_path, 'r') as f:
config_data = json.load(f)
elif file_path.endswith(('.yaml', '.yml')):
import yaml
with open(file_path, 'r') as f:
config_data = yaml.safe_load(f)
else:
logger.warning(f"Unsupported config file format: {file_path}")
return
# Check for API key entries in the config file
api_keys_present = {}
for provider, config_path in API_PROVIDERS.items():
# Parse the config path
parts = config_path.split('.')
value = config_data
try:
for part in parts:
value = value.get(part, {})
# Check if an API key exists at this path
if isinstance(value, str) and value:
api_keys_present[config_path] = True
logger.info(f" - Found {config_path} in {file_path}")
else:
api_keys_present[config_path] = False
except (AttributeError, TypeError):
api_keys_present[config_path] = False
self.results["config_files"][file_path] = {
"exists": True,
"api_keys_present": api_keys_present
}
except Exception as e:
logger.error(f"Error processing config file {file_path}: {e}")
self.results["config_files"][file_path] = {
"exists": True,
"error": str(e)
}
def load_config_with_gaia_loader(self):
"""Load configuration using GAIA's own config loader."""
logger.info("Loading configuration using GAIA's config loader...")
try:
if CONFIG_IMPORTS_SUCCESSFUL:
# Use imported config loader
self.config_loader = ConfigLoader()
self.config = self.config_loader.load_config()
logger.info("Successfully loaded config with GAIA's ConfigLoader")
else:
# Try to dynamically load the config loader
self._load_config_dynamically()
# Check for each API key in the loaded configuration
if self.config:
for provider, config_path in API_PROVIDERS.items():
value = self.config.get(config_path)
self.results["loaded_config"][config_path] = {
"present": value is not None and value != "",
"value": f"{value[:5]}..." if value else None
}
# Log the result
if value:
logger.info(f"API key for {provider} loaded successfully via {config_path}")
else:
logger.warning(f"No API key found for {provider} at {config_path}")
self.results["issues"].append({
"type": "missing_api_key",
"provider": provider,
"config_path": config_path,
"message": f"No API key found for {provider} at {config_path}",
"severity": "WARNING"
})
except Exception as e:
logger.error(f"Error loading configuration: {e}")
self.results["issues"].append({
"type": "config_loading_error",
"message": f"Error loading configuration: {e}",
"severity": "ERROR"
})
def _load_config_dynamically(self):
"""Attempt to dynamically load the configuration system if imports failed."""
logger.info("Attempting to dynamically load configuration modules...")
try:
# First, try to load default config
default_path = os.path.join(self.project_root, "src/gaia/config/default.py")
if os.path.exists(default_path):
default_module = self._import_module_from_path(default_path, "default")
default_config = getattr(default_module, "DEFAULT_CONFIG", {})
# Now try to load the config loader
loader_path = os.path.join(self.project_root, "src/gaia/config/loader.py")
if os.path.exists(loader_path):
loader_module = self._import_module_from_path(loader_path, "loader")
ConfigLoader = getattr(loader_module, "ConfigLoader", None)
if ConfigLoader:
self.config_loader = ConfigLoader()
self.config = self.config_loader.load_config()
logger.info("Successfully loaded config dynamically")
else:
logger.error("Failed to find ConfigLoader class in loader module")
else:
logger.error(f"Config loader module not found at {loader_path}")
else:
logger.error(f"Default config module not found at {default_path}")
except Exception as e:
logger.error(f"Error in dynamic loading of config modules: {e}")
def _import_module_from_path(self, path, module_name):
"""Dynamically import a module from a file path."""
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def verify_direct_key_loading(self):
"""Verify that each API key can be loaded directly from environment variables."""
logger.info("Testing direct API key loading from environment variables...")
# Test loading each API key directly from environment variables
for provider, env_var in API_ENV_VARS.items():
value = os.environ.get(env_var)
self.results["final_api_keys"][provider] = {
"from_env": value is not None and value != "",
"env_var": env_var,
"value": f"{value[:5]}..." if value else None
}
if value:
logger.info(f"API key for {provider} found in environment variable {env_var}")
else:
logger.warning(f"API key for {provider} not found in environment variable {env_var}")
def check_working_directory(self):
"""Check the working directory and its relationship to config files."""
logger.info(f"Current working directory: {self.cwd}")
logger.info(f"Detected project root: {self.project_root}")
# Check if CWD is different from project root
if os.path.normpath(self.cwd) != os.path.normpath(self.project_root):
logger.warning(f"Working directory is different from project root. "
f"This might affect relative path resolution.")
self.results["issues"].append({
"type": "working_directory",
"message": "Working directory is different from project root",
"cwd": self.cwd,
"project_root": self.project_root,
"severity": "WARNING"
})
def check_path_differences(self):
"""Check for absolute vs. relative path differences in credential loading."""
# Look for potential path issues in the .env files and config files
for dotenv_path, exists in self.results["dotenv_files"].items():
if isinstance(exists, dict) and exists.get("exists"):
if os.path.isabs(dotenv_path):
# Check if the absolute path would be resolvable from a relative context
rel_path = os.path.relpath(dotenv_path, self.project_root)
if rel_path.startswith('..'):
self.results["issues"].append({
"type": "path_resolution",
"message": f".env file at {dotenv_path} might not be resolvable from project root",
"abs_path": dotenv_path,
"rel_path": rel_path,
"severity": "WARNING"
})
logger.warning(f".env file at {dotenv_path} might not be resolvable from project root")
def run_diagnostics(self):
"""Run all diagnostic checks."""
logger.info("Starting GAIA API key loading diagnostics...")
# Get working directory info
self.check_working_directory()
# Check environment variables
self.check_env_variables()
# Find .env files
self.find_dotenv_files()
# Find configuration files
self.find_config_files()
# Load config with GAIA loader
self.load_config_with_gaia_loader()
# Verify direct key loading
self.verify_direct_key_loading()
# Check path differences
self.check_path_differences()
# Summarize issues
self._summarize_issues()
logger.info("Diagnostic checks completed")
return self.results
def _summarize_issues(self):
"""Summarize and categorize the detected issues."""
if not self.results["issues"]:
logger.info("No issues detected")
return
logger.info(f"Found {len(self.results['issues'])} potential issues:")
for i, issue in enumerate(self.results["issues"], 1):
logger.info(f"{i}. [{issue['severity']}] {issue['message']}")
def generate_report(self, output_file="api_key_loading_report.json"):
"""Generate a JSON report of the diagnostics."""
report_path = os.path.join(self.cwd, output_file)
with open(report_path, 'w') as f:
json.dump(self.results, f, indent=2)
logger.info(f"Diagnostic report saved to {report_path}")
def main():
"""Main function to run the diagnostics."""
diagnostic = ApiKeyDiagnostic()
results = diagnostic.run_diagnostics()
diagnostic.generate_report()
# Print a summary to the console
print("\n========== API KEY LOADING DIAGNOSTIC SUMMARY ==========")
print(f"Working directory: {diagnostic.cwd}")
print(f"Project root: {diagnostic.project_root}")
print("\nAPI KEY AVAILABILITY:")
for provider, env_var in API_ENV_VARS.items():
env_value = os.environ.get(env_var)
config_value = None
if diagnostic.config:
config_value = diagnostic.config.get(API_PROVIDERS[provider])
if env_value:
print(f" {provider}: AVAILABLE (from environment variable {env_var})")
elif config_value:
print(f" {provider}: AVAILABLE (from configuration)")
else:
print(f" {provider}: NOT AVAILABLE")
print("\nDOTENV FILES:")
dotenv_found = False
for dotenv_path, exists in diagnostic.results["dotenv_files"].items():
if isinstance(exists, dict) and exists.get("exists"):
dotenv_found = True
print(f" Found: {dotenv_path}")
if exists.get("api_keys_present"):
keys_found = [k for k, v in exists["api_keys_present"].items() if v]
if keys_found:
print(f" Contains keys: {', '.join(keys_found)}")
else:
print(f" No API keys found in this file")
if not dotenv_found:
print(" No .env files found")
print("\nISSUES:")
if not diagnostic.results["issues"]:
print(" No issues detected")
else:
for i, issue in enumerate(diagnostic.results["issues"], 1):
print(f" {i}. [{issue['severity']}] {issue['message']}")
print("\nFull diagnostic results saved to api_key_loading_report.json")
print("===========================================================")
if __name__ == "__main__":
main()