""" GAIA Credential Validation Script This script performs comprehensive validation of all API keys and credentials required by the GAIA system. It does minimal API calls to verify the validity of each credential and produces a detailed report of missing, invalid, or expired credentials with instructions for fixing issues. Usage: python -m src.gaia.utils.validate_all_credentials [--verbose] [--output-json] Arguments: --verbose: Show detailed validation process --output-json: Save results to a JSON file """ import os import sys import json import time import logging import argparse from typing import Dict, Any, List, Tuple, Optional from pathlib import Path import urllib.request import urllib.error import requests from datetime import datetime # Add a small delay between API calls to avoid rate-limiting API_CALL_DELAY = 0.5 # Setup logging logger = logging.getLogger("gaia.credential_validator") handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) # Define credential types and validation requirements CREDENTIALS = [ { "name": "OPENAI_API_KEY", "description": "OpenAI API key for language model access", "required": True, "url": "https://platform.openai.com/", "instructions": "Create or log in to your account at OpenAI, navigate to API Keys section, and create a new key" }, { "name": "SUPABASE_URL", "description": "Supabase project URL", "required": True, "url": "https://app.supabase.com/", "instructions": "Log in to Supabase dashboard, select your project, navigate to Settings → API" }, { "name": "SUPABASE_KEY", "description": "Supabase service role key", "required": True, "url": "https://app.supabase.com/", "instructions": "Log in to Supabase dashboard, select your project, navigate to Settings → API, use service_role key" }, { "name": "AUTH_SECRET_KEY", "description": "Secret key for authentication", "required": True, "url": None, "instructions": "Generate a secure random key with: openssl rand -hex 32" }, { "name": "SERPER_API_KEY", "description": "Serper API key for web search", "required": False, "url": "https://serper.dev/", "instructions": "Sign up at Serper.dev, navigate to API Keys section, create and copy your key" }, { "name": "PERPLEXITY_API_KEY", "description": "Perplexity API key for AI search", "required": False, "url": "https://www.perplexity.ai/", "instructions": "Sign up at Perplexity AI, navigate to account settings to find API section" }, { "name": "YOUTUBE_API_KEY", "description": "YouTube Data API key", "required": False, "url": "https://console.cloud.google.com/", "instructions": "Create a project in Google Cloud Console, enable YouTube Data API v3, and create API credentials" }, { "name": "HF_TOKEN", "description": "Hugging Face token for deployment", "required": False, "url": "https://huggingface.co/settings/tokens", "instructions": "Create or log in to Hugging Face, navigate to Settings → Access Tokens, create a new token" } ] class CredentialValidator: """Validates all credentials and produces a detailed report.""" def __init__(self, verbose: bool = False): """ Initialize the credential validator. Args: verbose: Whether to show detailed validation process """ self.verbose = verbose self.results = { "timestamp": datetime.now().isoformat(), "credentials": {}, "summary": { "total": 0, "valid": 0, "invalid": 0, "missing": 0, "expired": 0, "not_required": 0 } } def log(self, message: str): """Log a message if verbose mode is enabled.""" if self.verbose: logger.info(message) def validate_all_credentials(self) -> Dict[str, Any]: """ Validate all credentials and return results. Returns: Dictionary containing validation results """ self.log("Starting credential validation...") for cred in CREDENTIALS: self.validate_credential(cred) time.sleep(API_CALL_DELAY) # Avoid rate limiting # Update summary self.results["summary"]["total"] = len(CREDENTIALS) valid_count = sum(1 for result in self.results["credentials"].values() if result.get("status") == "valid") self.results["summary"]["valid"] = valid_count invalid_count = sum(1 for result in self.results["credentials"].values() if result.get("status") == "invalid") self.results["summary"]["invalid"] = invalid_count missing_count = sum(1 for result in self.results["credentials"].values() if result.get("status") == "missing") self.results["summary"]["missing"] = missing_count expired_count = sum(1 for result in self.results["credentials"].values() if result.get("status") == "expired") self.results["summary"]["expired"] = expired_count not_required_count = sum(1 for result in self.results["credentials"].values() if result.get("required") is False and result.get("status") == "missing") self.results["summary"]["not_required"] = not_required_count self.log("Credential validation complete.") return self.results def validate_credential(self, credential: Dict[str, Any]): """ Validate a single credential. Args: credential: Dictionary containing credential information """ name = credential["name"] self.log(f"Validating {name}...") value = os.environ.get(name) result = { "name": name, "description": credential["description"], "required": credential["required"], "url": credential["url"], "instructions": credential["instructions"] } if not value: result["status"] = "missing" result["message"] = f"{name} is not set in environment variables" self.results["credentials"][name] = result if credential["required"]: self.log(f"❌ {name}: Missing (REQUIRED)") else: self.log(f"⚠️ {name}: Missing (OPTIONAL)") return # Mask the key value for security in logs and results masked_value = f"{value[:5]}...{value[-4:]}" if len(value) > 9 else "***" result["masked_value"] = masked_value # Validate the credential is_valid, message, details = self._validate_specific_credential(name, value) if is_valid: result["status"] = "valid" result["message"] = message self.log(f"✅ {name}: Valid") else: result["status"] = "invalid" result["message"] = message result["details"] = details self.log(f"❌ {name}: Invalid - {message}") self.results["credentials"][name] = result def _validate_specific_credential(self, name: str, value: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """ Validate a specific credential with the appropriate API call. Args: name: Credential name value: Credential value Returns: Tuple containing: - Boolean indicating whether the credential is valid - Message describing the validation result - Optional dictionary with additional details """ try: if name == "OPENAI_API_KEY": return self._validate_openai_key(value) elif name == "SUPABASE_URL": return self._validate_supabase_url(value) elif name == "SUPABASE_KEY": return self._validate_supabase_key(value) elif name == "AUTH_SECRET_KEY": return self._validate_auth_secret_key(value) elif name == "SERPER_API_KEY": return self._validate_serper_key(value) elif name == "PERPLEXITY_API_KEY": return self._validate_perplexity_key(value) elif name == "YOUTUBE_API_KEY": return self._validate_youtube_key(value) elif name == "HF_TOKEN": return self._validate_hf_token(value) else: return False, f"Unknown credential: {name}", None except Exception as e: logger.error(f"Error validating {name}: {str(e)}") return False, f"Validation error: {str(e)}", {"exception": str(e)} def _validate_openai_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """Validate OpenAI API key with a minimal API call.""" url = "https://api.openai.com/v1/models" req = urllib.request.Request(url) req.add_header("Authorization", f"Bearer {api_key}") try: with urllib.request.urlopen(req, timeout=5) as response: if response.status == 200: return True, "OpenAI API key is valid", None else: return False, f"Unexpected status code: {response.status}", {"status_code": response.status} except urllib.error.HTTPError as e: if e.code == 401: return False, "Invalid API key", {"status_code": e.code} else: return False, f"HTTP error: {e.code} {e.reason}", {"status_code": e.code, "reason": e.reason} except Exception as e: return False, f"Error validating OpenAI API key: {str(e)}", {"exception": str(e)} def _validate_supabase_url(self, url: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """Validate Supabase URL format and accessibility.""" if not url.startswith("https://") or not ".supabase.co" in url: return False, "Invalid Supabase URL format", {"url": url} try: response = requests.head(url, timeout=5) if response.status_code < 400: # Any non-error response is good return True, "Supabase URL is valid", None else: return False, f"Supabase URL returned status code: {response.status_code}", {"status_code": response.status_code} except requests.exceptions.RequestException as e: return False, f"Error connecting to Supabase URL: {str(e)}", {"exception": str(e)} def _validate_supabase_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """Validate Supabase API key with a minimal API call.""" # This requires both the URL and key to be set url = os.environ.get("SUPABASE_URL") if not url: return False, "Cannot validate Supabase key without SUPABASE_URL", None health_url = f"{url}/rest/v1/" headers = { "apikey": api_key, "Authorization": f"Bearer {api_key}" } try: response = requests.get(health_url, headers=headers, timeout=5) if response.status_code < 300: return True, "Supabase API key is valid", None elif response.status_code == 401 or response.status_code == 403: return False, "Invalid Supabase API key", {"status_code": response.status_code} else: return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code} except requests.exceptions.RequestException as e: return False, f"Error connecting to Supabase API: {str(e)}", {"exception": str(e)} def _validate_auth_secret_key(self, key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """Validate AUTH_SECRET_KEY (simple length and complexity check).""" if len(key) < 10: return False, "Auth secret key is too short (should be at least 10 characters)", {"length": len(key)} # Basic complexity check has_letters = any(c.isalpha() for c in key) has_numbers = any(c.isdigit() for c in key) if not (has_letters and has_numbers): return False, "Auth secret key is not complex enough (should contain letters and numbers)", None return True, "Auth secret key appears to be valid", None def _validate_serper_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """Validate Serper API key with a minimal API call.""" url = "https://google.serper.dev/search" headers = { "X-API-KEY": api_key, "Content-Type": "application/json" } payload = { "q": "test query", "gl": "us", "hl": "en", "num": 1 } try: response = requests.post(url, headers=headers, json=payload, timeout=5) if response.status_code == 200: return True, "Serper API key is valid", None elif response.status_code == 401 or response.status_code == 403: return False, "Invalid Serper API key", {"status_code": response.status_code} else: return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code} except requests.exceptions.RequestException as e: return False, f"Error connecting to Serper API: {str(e)}", {"exception": str(e)} def _validate_perplexity_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """Validate Perplexity API key with a minimal API call.""" url = "https://api.perplexity.ai/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "model": "sonar-small-online", "messages": [{"role": "user", "content": "Hello"}], "max_tokens": 5 } try: response = requests.post(url, headers=headers, json=payload, timeout=5) if response.status_code == 200: return True, "Perplexity API key is valid", None elif response.status_code == 401 or response.status_code == 403: return False, "Invalid Perplexity API key", {"status_code": response.status_code} else: return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code} except requests.exceptions.RequestException as e: return False, f"Error connecting to Perplexity API: {str(e)}", {"exception": str(e)} def _validate_youtube_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """Validate YouTube API key with a minimal API call.""" url = f"https://www.googleapis.com/youtube/v3/videos?part=snippet&chart=mostPopular&maxResults=1&key={api_key}" try: response = requests.get(url, timeout=5) if response.status_code == 200: return True, "YouTube API key is valid", None elif response.status_code == 400: data = response.json() error = data.get("error", {}).get("message", "Unknown error") return False, f"YouTube API error: {error}", {"error": error} elif response.status_code == 403: return False, "Invalid YouTube API key or quota exceeded", {"status_code": response.status_code} else: return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code} except requests.exceptions.RequestException as e: return False, f"Error connecting to YouTube API: {str(e)}", {"exception": str(e)} def _validate_hf_token(self, token: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """Validate Hugging Face token with a minimal API call.""" url = "https://huggingface.co/api/whoami" headers = {"Authorization": f"Bearer {token}"} try: response = requests.get(url, headers=headers, timeout=5) if response.status_code == 200: return True, "Hugging Face token is valid", None elif response.status_code == 401 or response.status_code == 403: return False, "Invalid Hugging Face token", {"status_code": response.status_code} else: return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code} except requests.exceptions.RequestException as e: return False, f"Error connecting to Hugging Face API: {str(e)}", {"exception": str(e)} def generate_report(self) -> str: """ Generate a formatted text report of credential validation results. Returns: Formatted text report """ report_lines = [ "======================================================", " GAIA CREDENTIAL VALIDATION REPORT ", "======================================================", f"Timestamp: {self.results['timestamp']}", "", "SUMMARY:", f" Total credentials checked: {self.results['summary']['total']}", f" Valid: {self.results['summary']['valid']}", f" Invalid: {self.results['summary']['invalid']}", f" Missing: {self.results['summary']['missing']}", f" Missing (not required): {self.results['summary']['not_required']}", "", "CREDENTIAL DETAILS:", "" ] # Group by status for better organization valid_creds = [cred for name, cred in self.results["credentials"].items() if cred.get("status") == "valid"] invalid_creds = [cred for name, cred in self.results["credentials"].items() if cred.get("status") == "invalid"] missing_required_creds = [cred for name, cred in self.results["credentials"].items() if cred.get("status") == "missing" and cred.get("required") is True] missing_optional_creds = [cred for name, cred in self.results["credentials"].items() if cred.get("status") == "missing" and cred.get("required") is False] # First show issues that need attention if invalid_creds: report_lines.append("INVALID CREDENTIALS (Action required):") for cred in invalid_creds: report_lines.append(f" ❌ {cred['name']}: {cred['message']}") report_lines.append(f" Description: {cred['description']}") report_lines.append(f" Instructions: {cred['instructions']}") if cred.get("url"): report_lines.append(f" URL: {cred['url']}") report_lines.append("") if missing_required_creds: report_lines.append("MISSING REQUIRED CREDENTIALS (Action required):") for cred in missing_required_creds: report_lines.append(f" ❌ {cred['name']}: Missing") report_lines.append(f" Description: {cred['description']}") report_lines.append(f" Instructions: {cred['instructions']}") if cred.get("url"): report_lines.append(f" URL: {cred['url']}") report_lines.append("") if valid_creds: report_lines.append("VALID CREDENTIALS:") for cred in valid_creds: report_lines.append(f" ✅ {cred['name']}: {cred['message']}") if cred.get("masked_value"): report_lines.append(f" Value: {cred['masked_value']}") report_lines.append("") if missing_optional_creds: report_lines.append("MISSING OPTIONAL CREDENTIALS:") for cred in missing_optional_creds: report_lines.append(f" ⚠️ {cred['name']}: Missing (optional)") report_lines.append(f" Description: {cred['description']}") report_lines.append(f" Instructions: {cred['instructions']}") if cred.get("url"): report_lines.append(f" URL: {cred['url']}") report_lines.append("") # Add a section for fixing credential issues report_lines.extend([ "======================================================", " NEXT STEPS ", "======================================================", "" ]) if invalid_creds or missing_required_creds: report_lines.append("REQUIRED ACTIONS:") for cred in invalid_creds + missing_required_creds: report_lines.append(f"1. Fix {cred['name']}:") report_lines.append(f" - {cred['instructions']}") if cred.get("url"): report_lines.append(f" - Visit: {cred['url']}") report_lines.append("") report_lines.append("2. Add fixed credentials to your .env file") report_lines.append("3. Run this validation script again to confirm fixes") report_lines.append("") else: report_lines.append("✅ All required credentials are valid!") report_lines.append("") if missing_optional_creds: report_lines.append("OPTIONAL ENHANCEMENTS:") for cred in missing_optional_creds: report_lines.append(f"- Add {cred['name']} to enable {cred['description'].lower()}") report_lines.append(f" Instructions: {cred['instructions']}") if cred.get("url"): report_lines.append(f" URL: {cred['url']}") report_lines.append("") return "\n".join(report_lines) def main(): parser = argparse.ArgumentParser(description="Validate GAIA credentials") parser.add_argument("--verbose", action="store_true", help="Show detailed validation process") parser.add_argument("--output-json", action="store_true", help="Save results to a JSON file") args = parser.parse_args() # Configure logging based on verbosity if args.verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) # Load environment variables from .env file if dotenv is available try: from dotenv import load_dotenv load_dotenv() logger.info("Loaded environment variables from .env file") except ImportError: logger.info("python-dotenv not installed, using environment variables directly") # Run validation validator = CredentialValidator(verbose=args.verbose) results = validator.validate_all_credentials() # Generate and print report report = validator.generate_report() print(report) # Save results to JSON if requested if args.output_json: output_dir = Path("results") output_dir.mkdir(exist_ok=True) output_file = output_dir / f"credential_validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(output_file, "w") as f: json.dump(results, f, indent=2) print(f"\nJSON results saved to: {output_file}") # Return exit code based on validation results if (results["summary"]["invalid"] > 0 or (results["summary"]["missing"] > 0 and results["summary"]["missing"] != results["summary"]["not_required"])): return 1 return 0 if __name__ == "__main__": sys.exit(main())