|
|
|
""" |
|
API Key Loading Diagnostic Script for GAIA |
|
|
|
This script diagnoses API key loading issues by: |
|
1. Detecting where API keys are being loaded from (.env file, environment variables, config files) |
|
2. Showing the actual loading paths and mechanisms being used |
|
3. Verifying the existence of .env files in all expected locations |
|
4. Testing loading each API key using the exact same code path used by the main system |
|
""" |
|
|
|
import os |
|
import sys |
|
import json |
|
import logging |
|
from pathlib import Path |
|
from typing import Dict, Any, List, Optional, Tuple, Set |
|
import importlib.util |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
|
) |
|
logger = logging.getLogger("api_key_diagnostic") |
|
|
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) |
|
|
|
|
|
try: |
|
from src.gaia.config.default import DEFAULT_CONFIG |
|
from src.gaia.config.env import load_env_config, ENV_CONFIG_MAPPING |
|
from src.gaia.config.loader import ConfigLoader, Configuration |
|
CONFIG_IMPORTS_SUCCESSFUL = True |
|
except ImportError as e: |
|
logger.error(f"Failed to import GAIA configuration modules: {e}") |
|
logger.warning("Will attempt to load configuration modules dynamically") |
|
CONFIG_IMPORTS_SUCCESSFUL = False |
|
|
|
|
|
API_PROVIDERS = { |
|
"openai": "api.openai.api_key", |
|
"serper": "api.serper.api_key", |
|
"perplexity": "api.perplexity.api_key", |
|
"supabase": "api.supabase.key", |
|
"huggingface": "api.huggingface.token", |
|
} |
|
|
|
|
|
API_ENV_VARS = { |
|
"openai": "OPENAI_API_KEY", |
|
"serper": "SERPER_API_KEY", |
|
"perplexity": "PERPLEXITY_API_KEY", |
|
"supabase": "SUPABASE_KEY", |
|
"huggingface": "HF_TOKEN", |
|
} |
|
|
|
class ApiKeyDiagnostic: |
|
"""Diagnostic tool for checking API key loading in GAIA.""" |
|
|
|
def __init__(self): |
|
"""Initialize the diagnostic tool.""" |
|
self.results = { |
|
"env_vars_present": {}, |
|
"dotenv_files": {}, |
|
"config_files": {}, |
|
"loaded_config": {}, |
|
"final_api_keys": {}, |
|
"issues": [], |
|
} |
|
|
|
|
|
self.cwd = os.getcwd() |
|
self.project_root = self._find_project_root() |
|
|
|
|
|
self.config_loader = None |
|
self.config = None |
|
|
|
def _find_project_root(self) -> str: |
|
"""Find the project root directory by looking for indicators like pyproject.toml, setup.py, etc.""" |
|
current_dir = Path(self.cwd) |
|
|
|
|
|
root_indicators = ["setup.py", "pyproject.toml", "README.md", "src/gaia"] |
|
|
|
while current_dir != current_dir.parent: |
|
for indicator in root_indicators: |
|
if (current_dir / indicator).exists(): |
|
return str(current_dir) |
|
current_dir = current_dir.parent |
|
|
|
|
|
return self.cwd |
|
|
|
def check_env_variables(self): |
|
"""Check which API-related environment variables are present.""" |
|
logger.info("Checking environment variables...") |
|
|
|
|
|
for provider, env_var in API_ENV_VARS.items(): |
|
value = os.environ.get(env_var) |
|
self.results["env_vars_present"][env_var] = { |
|
"present": value is not None, |
|
"value": f"{value[:5]}..." if value else None |
|
} |
|
logger.info(f"Environment variable {env_var}: {'PRESENT' if value else 'NOT PRESENT'}") |
|
|
|
|
|
for env_var in ENV_CONFIG_MAPPING.keys(): |
|
if env_var not in self.results["env_vars_present"]: |
|
value = os.environ.get(env_var) |
|
self.results["env_vars_present"][env_var] = { |
|
"present": value is not None, |
|
"value": f"{value[:5]}..." if value else None |
|
} |
|
if value: |
|
logger.info(f"Additional env var {env_var}: PRESENT (maps to {ENV_CONFIG_MAPPING[env_var]})") |
|
|
|
def find_dotenv_files(self): |
|
"""Find .env files in common locations.""" |
|
logger.info("Searching for .env files...") |
|
|
|
|
|
locations = [ |
|
self.project_root, |
|
os.path.join(self.project_root, "src"), |
|
os.path.join(self.project_root, "src/gaia"), |
|
os.path.join(self.project_root, "config"), |
|
os.path.dirname(os.path.abspath(__file__)), |
|
self.cwd, |
|
] |
|
|
|
for location in locations: |
|
if os.path.isdir(location): |
|
dotenv_path = os.path.join(location, ".env") |
|
exists = os.path.isfile(dotenv_path) |
|
self.results["dotenv_files"][dotenv_path] = exists |
|
|
|
if exists: |
|
logger.info(f"Found .env file: {dotenv_path}") |
|
self._check_dotenv_content(dotenv_path) |
|
else: |
|
logger.info(f"No .env file found at: {dotenv_path}") |
|
|
|
|
|
if not any(self.results["dotenv_files"].values()): |
|
self.results["issues"].append({ |
|
"type": "missing_dotenv", |
|
"message": "No .env files found in common locations", |
|
"severity": "WARNING" |
|
}) |
|
logger.warning("No .env files found in any of the checked locations!") |
|
|
|
def _check_dotenv_content(self, dotenv_path: str): |
|
"""Check the content of a .env file for API keys.""" |
|
try: |
|
with open(dotenv_path, 'r') as f: |
|
content = f.read() |
|
|
|
|
|
api_keys_present = {} |
|
for provider, env_var in API_ENV_VARS.items(): |
|
if f"{env_var}=" in content or f"{env_var} =" in content: |
|
api_keys_present[env_var] = True |
|
logger.info(f" - Found {env_var} in {dotenv_path}") |
|
else: |
|
api_keys_present[env_var] = False |
|
|
|
self.results["dotenv_files"][dotenv_path] = { |
|
"exists": True, |
|
"api_keys_present": api_keys_present |
|
} |
|
|
|
|
|
if not any(api_keys_present.values()): |
|
self.results["issues"].append({ |
|
"type": "empty_dotenv", |
|
"message": f".env file at {dotenv_path} exists but contains no API keys", |
|
"severity": "WARNING" |
|
}) |
|
logger.warning(f".env file at {dotenv_path} exists but contains no API keys") |
|
|
|
except Exception as e: |
|
logger.error(f"Error reading .env file {dotenv_path}: {e}") |
|
self.results["dotenv_files"][dotenv_path] = { |
|
"exists": True, |
|
"error": str(e) |
|
} |
|
|
|
def find_config_files(self): |
|
"""Find configuration files (JSON/YAML) that might contain API settings.""" |
|
logger.info("Searching for configuration files...") |
|
|
|
|
|
config_dirs = [ |
|
self.project_root, |
|
os.path.join(self.project_root, "config"), |
|
os.path.join(self.project_root, "src/gaia/config"), |
|
] |
|
|
|
|
|
config_patterns = ["*config*.json", "*config*.yaml", "*config*.yml", "*.env.json"] |
|
|
|
config_files = [] |
|
|
|
for directory in config_dirs: |
|
if os.path.isdir(directory): |
|
for pattern in config_patterns: |
|
for file_path in Path(directory).glob(pattern): |
|
if file_path.is_file(): |
|
config_files.append(str(file_path)) |
|
|
|
|
|
for file_path in config_files: |
|
try: |
|
self._check_config_file(file_path) |
|
except Exception as e: |
|
logger.error(f"Error checking config file {file_path}: {e}") |
|
self.results["config_files"][file_path] = { |
|
"exists": True, |
|
"error": str(e) |
|
} |
|
|
|
def _check_config_file(self, file_path: str): |
|
"""Check a config file for API key configurations.""" |
|
try: |
|
|
|
if file_path.endswith('.json'): |
|
with open(file_path, 'r') as f: |
|
config_data = json.load(f) |
|
elif file_path.endswith(('.yaml', '.yml')): |
|
import yaml |
|
with open(file_path, 'r') as f: |
|
config_data = yaml.safe_load(f) |
|
else: |
|
logger.warning(f"Unsupported config file format: {file_path}") |
|
return |
|
|
|
|
|
api_keys_present = {} |
|
for provider, config_path in API_PROVIDERS.items(): |
|
|
|
parts = config_path.split('.') |
|
value = config_data |
|
|
|
try: |
|
for part in parts: |
|
value = value.get(part, {}) |
|
|
|
|
|
if isinstance(value, str) and value: |
|
api_keys_present[config_path] = True |
|
logger.info(f" - Found {config_path} in {file_path}") |
|
else: |
|
api_keys_present[config_path] = False |
|
except (AttributeError, TypeError): |
|
api_keys_present[config_path] = False |
|
|
|
self.results["config_files"][file_path] = { |
|
"exists": True, |
|
"api_keys_present": api_keys_present |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing config file {file_path}: {e}") |
|
self.results["config_files"][file_path] = { |
|
"exists": True, |
|
"error": str(e) |
|
} |
|
|
|
def load_config_with_gaia_loader(self): |
|
"""Load configuration using GAIA's own config loader.""" |
|
logger.info("Loading configuration using GAIA's config loader...") |
|
|
|
try: |
|
if CONFIG_IMPORTS_SUCCESSFUL: |
|
|
|
self.config_loader = ConfigLoader() |
|
self.config = self.config_loader.load_config() |
|
|
|
logger.info("Successfully loaded config with GAIA's ConfigLoader") |
|
else: |
|
|
|
self._load_config_dynamically() |
|
|
|
|
|
if self.config: |
|
for provider, config_path in API_PROVIDERS.items(): |
|
value = self.config.get(config_path) |
|
self.results["loaded_config"][config_path] = { |
|
"present": value is not None and value != "", |
|
"value": f"{value[:5]}..." if value else None |
|
} |
|
|
|
|
|
if value: |
|
logger.info(f"API key for {provider} loaded successfully via {config_path}") |
|
else: |
|
logger.warning(f"No API key found for {provider} at {config_path}") |
|
self.results["issues"].append({ |
|
"type": "missing_api_key", |
|
"provider": provider, |
|
"config_path": config_path, |
|
"message": f"No API key found for {provider} at {config_path}", |
|
"severity": "WARNING" |
|
}) |
|
except Exception as e: |
|
logger.error(f"Error loading configuration: {e}") |
|
self.results["issues"].append({ |
|
"type": "config_loading_error", |
|
"message": f"Error loading configuration: {e}", |
|
"severity": "ERROR" |
|
}) |
|
|
|
def _load_config_dynamically(self): |
|
"""Attempt to dynamically load the configuration system if imports failed.""" |
|
logger.info("Attempting to dynamically load configuration modules...") |
|
|
|
try: |
|
|
|
default_path = os.path.join(self.project_root, "src/gaia/config/default.py") |
|
if os.path.exists(default_path): |
|
default_module = self._import_module_from_path(default_path, "default") |
|
default_config = getattr(default_module, "DEFAULT_CONFIG", {}) |
|
|
|
|
|
loader_path = os.path.join(self.project_root, "src/gaia/config/loader.py") |
|
if os.path.exists(loader_path): |
|
loader_module = self._import_module_from_path(loader_path, "loader") |
|
ConfigLoader = getattr(loader_module, "ConfigLoader", None) |
|
|
|
if ConfigLoader: |
|
self.config_loader = ConfigLoader() |
|
self.config = self.config_loader.load_config() |
|
logger.info("Successfully loaded config dynamically") |
|
else: |
|
logger.error("Failed to find ConfigLoader class in loader module") |
|
else: |
|
logger.error(f"Config loader module not found at {loader_path}") |
|
else: |
|
logger.error(f"Default config module not found at {default_path}") |
|
except Exception as e: |
|
logger.error(f"Error in dynamic loading of config modules: {e}") |
|
|
|
def _import_module_from_path(self, path, module_name): |
|
"""Dynamically import a module from a file path.""" |
|
spec = importlib.util.spec_from_file_location(module_name, path) |
|
module = importlib.util.module_from_spec(spec) |
|
spec.loader.exec_module(module) |
|
return module |
|
|
|
def verify_direct_key_loading(self): |
|
"""Verify that each API key can be loaded directly from environment variables.""" |
|
logger.info("Testing direct API key loading from environment variables...") |
|
|
|
|
|
for provider, env_var in API_ENV_VARS.items(): |
|
value = os.environ.get(env_var) |
|
self.results["final_api_keys"][provider] = { |
|
"from_env": value is not None and value != "", |
|
"env_var": env_var, |
|
"value": f"{value[:5]}..." if value else None |
|
} |
|
|
|
if value: |
|
logger.info(f"API key for {provider} found in environment variable {env_var}") |
|
else: |
|
logger.warning(f"API key for {provider} not found in environment variable {env_var}") |
|
|
|
def check_working_directory(self): |
|
"""Check the working directory and its relationship to config files.""" |
|
logger.info(f"Current working directory: {self.cwd}") |
|
logger.info(f"Detected project root: {self.project_root}") |
|
|
|
|
|
if os.path.normpath(self.cwd) != os.path.normpath(self.project_root): |
|
logger.warning(f"Working directory is different from project root. " |
|
f"This might affect relative path resolution.") |
|
self.results["issues"].append({ |
|
"type": "working_directory", |
|
"message": "Working directory is different from project root", |
|
"cwd": self.cwd, |
|
"project_root": self.project_root, |
|
"severity": "WARNING" |
|
}) |
|
|
|
def check_path_differences(self): |
|
"""Check for absolute vs. relative path differences in credential loading.""" |
|
|
|
for dotenv_path, exists in self.results["dotenv_files"].items(): |
|
if isinstance(exists, dict) and exists.get("exists"): |
|
if os.path.isabs(dotenv_path): |
|
|
|
rel_path = os.path.relpath(dotenv_path, self.project_root) |
|
if rel_path.startswith('..'): |
|
self.results["issues"].append({ |
|
"type": "path_resolution", |
|
"message": f".env file at {dotenv_path} might not be resolvable from project root", |
|
"abs_path": dotenv_path, |
|
"rel_path": rel_path, |
|
"severity": "WARNING" |
|
}) |
|
logger.warning(f".env file at {dotenv_path} might not be resolvable from project root") |
|
|
|
def run_diagnostics(self): |
|
"""Run all diagnostic checks.""" |
|
logger.info("Starting GAIA API key loading diagnostics...") |
|
|
|
|
|
self.check_working_directory() |
|
|
|
|
|
self.check_env_variables() |
|
|
|
|
|
self.find_dotenv_files() |
|
|
|
|
|
self.find_config_files() |
|
|
|
|
|
self.load_config_with_gaia_loader() |
|
|
|
|
|
self.verify_direct_key_loading() |
|
|
|
|
|
self.check_path_differences() |
|
|
|
|
|
self._summarize_issues() |
|
|
|
logger.info("Diagnostic checks completed") |
|
return self.results |
|
|
|
def _summarize_issues(self): |
|
"""Summarize and categorize the detected issues.""" |
|
if not self.results["issues"]: |
|
logger.info("No issues detected") |
|
return |
|
|
|
logger.info(f"Found {len(self.results['issues'])} potential issues:") |
|
for i, issue in enumerate(self.results["issues"], 1): |
|
logger.info(f"{i}. [{issue['severity']}] {issue['message']}") |
|
|
|
def generate_report(self, output_file="api_key_loading_report.json"): |
|
"""Generate a JSON report of the diagnostics.""" |
|
report_path = os.path.join(self.cwd, output_file) |
|
with open(report_path, 'w') as f: |
|
json.dump(self.results, f, indent=2) |
|
logger.info(f"Diagnostic report saved to {report_path}") |
|
|
|
|
|
def main(): |
|
"""Main function to run the diagnostics.""" |
|
diagnostic = ApiKeyDiagnostic() |
|
results = diagnostic.run_diagnostics() |
|
diagnostic.generate_report() |
|
|
|
|
|
print("\n========== API KEY LOADING DIAGNOSTIC SUMMARY ==========") |
|
print(f"Working directory: {diagnostic.cwd}") |
|
print(f"Project root: {diagnostic.project_root}") |
|
|
|
print("\nAPI KEY AVAILABILITY:") |
|
for provider, env_var in API_ENV_VARS.items(): |
|
env_value = os.environ.get(env_var) |
|
config_value = None |
|
if diagnostic.config: |
|
config_value = diagnostic.config.get(API_PROVIDERS[provider]) |
|
|
|
if env_value: |
|
print(f" {provider}: AVAILABLE (from environment variable {env_var})") |
|
elif config_value: |
|
print(f" {provider}: AVAILABLE (from configuration)") |
|
else: |
|
print(f" {provider}: NOT AVAILABLE") |
|
|
|
print("\nDOTENV FILES:") |
|
dotenv_found = False |
|
for dotenv_path, exists in diagnostic.results["dotenv_files"].items(): |
|
if isinstance(exists, dict) and exists.get("exists"): |
|
dotenv_found = True |
|
print(f" Found: {dotenv_path}") |
|
|
|
if exists.get("api_keys_present"): |
|
keys_found = [k for k, v in exists["api_keys_present"].items() if v] |
|
if keys_found: |
|
print(f" Contains keys: {', '.join(keys_found)}") |
|
else: |
|
print(f" No API keys found in this file") |
|
|
|
if not dotenv_found: |
|
print(" No .env files found") |
|
|
|
print("\nISSUES:") |
|
if not diagnostic.results["issues"]: |
|
print(" No issues detected") |
|
else: |
|
for i, issue in enumerate(diagnostic.results["issues"], 1): |
|
print(f" {i}. [{issue['severity']}] {issue['message']}") |
|
|
|
print("\nFull diagnostic results saved to api_key_loading_report.json") |
|
print("===========================================================") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |