|
""" |
|
GAIA Credential Validation Script |
|
|
|
This script performs comprehensive validation of all API keys and credentials required |
|
by the GAIA system. It does minimal API calls to verify the validity of each credential |
|
and produces a detailed report of missing, invalid, or expired credentials with |
|
instructions for fixing issues. |
|
|
|
Usage: |
|
python -m src.gaia.utils.validate_all_credentials [--verbose] [--output-json] |
|
|
|
Arguments: |
|
--verbose: Show detailed validation process |
|
--output-json: Save results to a JSON file |
|
""" |
|
|
|
import os |
|
import sys |
|
import json |
|
import time |
|
import logging |
|
import argparse |
|
from typing import Dict, Any, List, Tuple, Optional |
|
from pathlib import Path |
|
import urllib.request |
|
import urllib.error |
|
import requests |
|
from datetime import datetime |
|
|
|
|
|
API_CALL_DELAY = 0.5 |
|
|
|
|
|
logger = logging.getLogger("gaia.credential_validator") |
|
handler = logging.StreamHandler() |
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
handler.setFormatter(formatter) |
|
logger.addHandler(handler) |
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
CREDENTIALS = [ |
|
{ |
|
"name": "OPENAI_API_KEY", |
|
"description": "OpenAI API key for language model access", |
|
"required": True, |
|
"url": "https://platform.openai.com/", |
|
"instructions": "Create or log in to your account at OpenAI, navigate to API Keys section, and create a new key" |
|
}, |
|
{ |
|
"name": "SUPABASE_URL", |
|
"description": "Supabase project URL", |
|
"required": True, |
|
"url": "https://app.supabase.com/", |
|
"instructions": "Log in to Supabase dashboard, select your project, navigate to Settings → API" |
|
}, |
|
{ |
|
"name": "SUPABASE_KEY", |
|
"description": "Supabase service role key", |
|
"required": True, |
|
"url": "https://app.supabase.com/", |
|
"instructions": "Log in to Supabase dashboard, select your project, navigate to Settings → API, use service_role key" |
|
}, |
|
{ |
|
"name": "AUTH_SECRET_KEY", |
|
"description": "Secret key for authentication", |
|
"required": True, |
|
"url": None, |
|
"instructions": "Generate a secure random key with: openssl rand -hex 32" |
|
}, |
|
{ |
|
"name": "SERPER_API_KEY", |
|
"description": "Serper API key for web search", |
|
"required": False, |
|
"url": "https://serper.dev/", |
|
"instructions": "Sign up at Serper.dev, navigate to API Keys section, create and copy your key" |
|
}, |
|
{ |
|
"name": "PERPLEXITY_API_KEY", |
|
"description": "Perplexity API key for AI search", |
|
"required": False, |
|
"url": "https://www.perplexity.ai/", |
|
"instructions": "Sign up at Perplexity AI, navigate to account settings to find API section" |
|
}, |
|
{ |
|
"name": "YOUTUBE_API_KEY", |
|
"description": "YouTube Data API key", |
|
"required": False, |
|
"url": "https://console.cloud.google.com/", |
|
"instructions": "Create a project in Google Cloud Console, enable YouTube Data API v3, and create API credentials" |
|
}, |
|
{ |
|
"name": "HF_TOKEN", |
|
"description": "Hugging Face token for deployment", |
|
"required": False, |
|
"url": "https://huggingface.co/settings/tokens", |
|
"instructions": "Create or log in to Hugging Face, navigate to Settings → Access Tokens, create a new token" |
|
} |
|
] |
|
|
|
class CredentialValidator: |
|
"""Validates all credentials and produces a detailed report.""" |
|
|
|
def __init__(self, verbose: bool = False): |
|
""" |
|
Initialize the credential validator. |
|
|
|
Args: |
|
verbose: Whether to show detailed validation process |
|
""" |
|
self.verbose = verbose |
|
self.results = { |
|
"timestamp": datetime.now().isoformat(), |
|
"credentials": {}, |
|
"summary": { |
|
"total": 0, |
|
"valid": 0, |
|
"invalid": 0, |
|
"missing": 0, |
|
"expired": 0, |
|
"not_required": 0 |
|
} |
|
} |
|
|
|
def log(self, message: str): |
|
"""Log a message if verbose mode is enabled.""" |
|
if self.verbose: |
|
logger.info(message) |
|
|
|
def validate_all_credentials(self) -> Dict[str, Any]: |
|
""" |
|
Validate all credentials and return results. |
|
|
|
Returns: |
|
Dictionary containing validation results |
|
""" |
|
self.log("Starting credential validation...") |
|
|
|
for cred in CREDENTIALS: |
|
self.validate_credential(cred) |
|
time.sleep(API_CALL_DELAY) |
|
|
|
|
|
self.results["summary"]["total"] = len(CREDENTIALS) |
|
|
|
valid_count = sum(1 for result in self.results["credentials"].values() |
|
if result.get("status") == "valid") |
|
self.results["summary"]["valid"] = valid_count |
|
|
|
invalid_count = sum(1 for result in self.results["credentials"].values() |
|
if result.get("status") == "invalid") |
|
self.results["summary"]["invalid"] = invalid_count |
|
|
|
missing_count = sum(1 for result in self.results["credentials"].values() |
|
if result.get("status") == "missing") |
|
self.results["summary"]["missing"] = missing_count |
|
|
|
expired_count = sum(1 for result in self.results["credentials"].values() |
|
if result.get("status") == "expired") |
|
self.results["summary"]["expired"] = expired_count |
|
|
|
not_required_count = sum(1 for result in self.results["credentials"].values() |
|
if result.get("required") is False and result.get("status") == "missing") |
|
self.results["summary"]["not_required"] = not_required_count |
|
|
|
self.log("Credential validation complete.") |
|
return self.results |
|
|
|
def validate_credential(self, credential: Dict[str, Any]): |
|
""" |
|
Validate a single credential. |
|
|
|
Args: |
|
credential: Dictionary containing credential information |
|
""" |
|
name = credential["name"] |
|
self.log(f"Validating {name}...") |
|
|
|
value = os.environ.get(name) |
|
|
|
result = { |
|
"name": name, |
|
"description": credential["description"], |
|
"required": credential["required"], |
|
"url": credential["url"], |
|
"instructions": credential["instructions"] |
|
} |
|
|
|
if not value: |
|
result["status"] = "missing" |
|
result["message"] = f"{name} is not set in environment variables" |
|
self.results["credentials"][name] = result |
|
if credential["required"]: |
|
self.log(f"❌ {name}: Missing (REQUIRED)") |
|
else: |
|
self.log(f"⚠️ {name}: Missing (OPTIONAL)") |
|
return |
|
|
|
|
|
masked_value = f"{value[:5]}...{value[-4:]}" if len(value) > 9 else "***" |
|
result["masked_value"] = masked_value |
|
|
|
|
|
is_valid, message, details = self._validate_specific_credential(name, value) |
|
|
|
if is_valid: |
|
result["status"] = "valid" |
|
result["message"] = message |
|
self.log(f"✅ {name}: Valid") |
|
else: |
|
result["status"] = "invalid" |
|
result["message"] = message |
|
result["details"] = details |
|
self.log(f"❌ {name}: Invalid - {message}") |
|
|
|
self.results["credentials"][name] = result |
|
|
|
def _validate_specific_credential(self, name: str, value: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: |
|
""" |
|
Validate a specific credential with the appropriate API call. |
|
|
|
Args: |
|
name: Credential name |
|
value: Credential value |
|
|
|
Returns: |
|
Tuple containing: |
|
- Boolean indicating whether the credential is valid |
|
- Message describing the validation result |
|
- Optional dictionary with additional details |
|
""" |
|
try: |
|
if name == "OPENAI_API_KEY": |
|
return self._validate_openai_key(value) |
|
elif name == "SUPABASE_URL": |
|
return self._validate_supabase_url(value) |
|
elif name == "SUPABASE_KEY": |
|
return self._validate_supabase_key(value) |
|
elif name == "AUTH_SECRET_KEY": |
|
return self._validate_auth_secret_key(value) |
|
elif name == "SERPER_API_KEY": |
|
return self._validate_serper_key(value) |
|
elif name == "PERPLEXITY_API_KEY": |
|
return self._validate_perplexity_key(value) |
|
elif name == "YOUTUBE_API_KEY": |
|
return self._validate_youtube_key(value) |
|
elif name == "HF_TOKEN": |
|
return self._validate_hf_token(value) |
|
else: |
|
return False, f"Unknown credential: {name}", None |
|
|
|
except Exception as e: |
|
logger.error(f"Error validating {name}: {str(e)}") |
|
return False, f"Validation error: {str(e)}", {"exception": str(e)} |
|
|
|
def _validate_openai_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: |
|
"""Validate OpenAI API key with a minimal API call.""" |
|
url = "https://api.openai.com/v1/models" |
|
req = urllib.request.Request(url) |
|
req.add_header("Authorization", f"Bearer {api_key}") |
|
|
|
try: |
|
with urllib.request.urlopen(req, timeout=5) as response: |
|
if response.status == 200: |
|
return True, "OpenAI API key is valid", None |
|
else: |
|
return False, f"Unexpected status code: {response.status}", {"status_code": response.status} |
|
except urllib.error.HTTPError as e: |
|
if e.code == 401: |
|
return False, "Invalid API key", {"status_code": e.code} |
|
else: |
|
return False, f"HTTP error: {e.code} {e.reason}", {"status_code": e.code, "reason": e.reason} |
|
except Exception as e: |
|
return False, f"Error validating OpenAI API key: {str(e)}", {"exception": str(e)} |
|
|
|
def _validate_supabase_url(self, url: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: |
|
"""Validate Supabase URL format and accessibility.""" |
|
if not url.startswith("https://") or not ".supabase.co" in url: |
|
return False, "Invalid Supabase URL format", {"url": url} |
|
|
|
try: |
|
response = requests.head(url, timeout=5) |
|
if response.status_code < 400: |
|
return True, "Supabase URL is valid", None |
|
else: |
|
return False, f"Supabase URL returned status code: {response.status_code}", {"status_code": response.status_code} |
|
except requests.exceptions.RequestException as e: |
|
return False, f"Error connecting to Supabase URL: {str(e)}", {"exception": str(e)} |
|
|
|
def _validate_supabase_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: |
|
"""Validate Supabase API key with a minimal API call.""" |
|
|
|
url = os.environ.get("SUPABASE_URL") |
|
if not url: |
|
return False, "Cannot validate Supabase key without SUPABASE_URL", None |
|
|
|
health_url = f"{url}/rest/v1/" |
|
headers = { |
|
"apikey": api_key, |
|
"Authorization": f"Bearer {api_key}" |
|
} |
|
|
|
try: |
|
response = requests.get(health_url, headers=headers, timeout=5) |
|
if response.status_code < 300: |
|
return True, "Supabase API key is valid", None |
|
elif response.status_code == 401 or response.status_code == 403: |
|
return False, "Invalid Supabase API key", {"status_code": response.status_code} |
|
else: |
|
return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code} |
|
except requests.exceptions.RequestException as e: |
|
return False, f"Error connecting to Supabase API: {str(e)}", {"exception": str(e)} |
|
|
|
def _validate_auth_secret_key(self, key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: |
|
"""Validate AUTH_SECRET_KEY (simple length and complexity check).""" |
|
if len(key) < 10: |
|
return False, "Auth secret key is too short (should be at least 10 characters)", {"length": len(key)} |
|
|
|
|
|
has_letters = any(c.isalpha() for c in key) |
|
has_numbers = any(c.isdigit() for c in key) |
|
|
|
if not (has_letters and has_numbers): |
|
return False, "Auth secret key is not complex enough (should contain letters and numbers)", None |
|
|
|
return True, "Auth secret key appears to be valid", None |
|
|
|
def _validate_serper_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: |
|
"""Validate Serper API key with a minimal API call.""" |
|
url = "https://google.serper.dev/search" |
|
headers = { |
|
"X-API-KEY": api_key, |
|
"Content-Type": "application/json" |
|
} |
|
payload = { |
|
"q": "test query", |
|
"gl": "us", |
|
"hl": "en", |
|
"num": 1 |
|
} |
|
|
|
try: |
|
response = requests.post(url, headers=headers, json=payload, timeout=5) |
|
if response.status_code == 200: |
|
return True, "Serper API key is valid", None |
|
elif response.status_code == 401 or response.status_code == 403: |
|
return False, "Invalid Serper API key", {"status_code": response.status_code} |
|
else: |
|
return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code} |
|
except requests.exceptions.RequestException as e: |
|
return False, f"Error connecting to Serper API: {str(e)}", {"exception": str(e)} |
|
|
|
def _validate_perplexity_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: |
|
"""Validate Perplexity API key with a minimal API call.""" |
|
url = "https://api.perplexity.ai/chat/completions" |
|
headers = { |
|
"Authorization": f"Bearer {api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
payload = { |
|
"model": "sonar-small-online", |
|
"messages": [{"role": "user", "content": "Hello"}], |
|
"max_tokens": 5 |
|
} |
|
|
|
try: |
|
response = requests.post(url, headers=headers, json=payload, timeout=5) |
|
if response.status_code == 200: |
|
return True, "Perplexity API key is valid", None |
|
elif response.status_code == 401 or response.status_code == 403: |
|
return False, "Invalid Perplexity API key", {"status_code": response.status_code} |
|
else: |
|
return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code} |
|
except requests.exceptions.RequestException as e: |
|
return False, f"Error connecting to Perplexity API: {str(e)}", {"exception": str(e)} |
|
|
|
def _validate_youtube_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: |
|
"""Validate YouTube API key with a minimal API call.""" |
|
url = f"https://www.googleapis.com/youtube/v3/videos?part=snippet&chart=mostPopular&maxResults=1&key={api_key}" |
|
|
|
try: |
|
response = requests.get(url, timeout=5) |
|
if response.status_code == 200: |
|
return True, "YouTube API key is valid", None |
|
elif response.status_code == 400: |
|
data = response.json() |
|
error = data.get("error", {}).get("message", "Unknown error") |
|
return False, f"YouTube API error: {error}", {"error": error} |
|
elif response.status_code == 403: |
|
return False, "Invalid YouTube API key or quota exceeded", {"status_code": response.status_code} |
|
else: |
|
return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code} |
|
except requests.exceptions.RequestException as e: |
|
return False, f"Error connecting to YouTube API: {str(e)}", {"exception": str(e)} |
|
|
|
def _validate_hf_token(self, token: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: |
|
"""Validate Hugging Face token with a minimal API call.""" |
|
url = "https://huggingface.co/api/whoami" |
|
headers = {"Authorization": f"Bearer {token}"} |
|
|
|
try: |
|
response = requests.get(url, headers=headers, timeout=5) |
|
if response.status_code == 200: |
|
return True, "Hugging Face token is valid", None |
|
elif response.status_code == 401 or response.status_code == 403: |
|
return False, "Invalid Hugging Face token", {"status_code": response.status_code} |
|
else: |
|
return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code} |
|
except requests.exceptions.RequestException as e: |
|
return False, f"Error connecting to Hugging Face API: {str(e)}", {"exception": str(e)} |
|
|
|
def generate_report(self) -> str: |
|
""" |
|
Generate a formatted text report of credential validation results. |
|
|
|
Returns: |
|
Formatted text report |
|
""" |
|
report_lines = [ |
|
"======================================================", |
|
" GAIA CREDENTIAL VALIDATION REPORT ", |
|
"======================================================", |
|
f"Timestamp: {self.results['timestamp']}", |
|
"", |
|
"SUMMARY:", |
|
f" Total credentials checked: {self.results['summary']['total']}", |
|
f" Valid: {self.results['summary']['valid']}", |
|
f" Invalid: {self.results['summary']['invalid']}", |
|
f" Missing: {self.results['summary']['missing']}", |
|
f" Missing (not required): {self.results['summary']['not_required']}", |
|
"", |
|
"CREDENTIAL DETAILS:", |
|
"" |
|
] |
|
|
|
|
|
valid_creds = [cred for name, cred in self.results["credentials"].items() |
|
if cred.get("status") == "valid"] |
|
|
|
invalid_creds = [cred for name, cred in self.results["credentials"].items() |
|
if cred.get("status") == "invalid"] |
|
|
|
missing_required_creds = [cred for name, cred in self.results["credentials"].items() |
|
if cred.get("status") == "missing" and cred.get("required") is True] |
|
|
|
missing_optional_creds = [cred for name, cred in self.results["credentials"].items() |
|
if cred.get("status") == "missing" and cred.get("required") is False] |
|
|
|
|
|
if invalid_creds: |
|
report_lines.append("INVALID CREDENTIALS (Action required):") |
|
for cred in invalid_creds: |
|
report_lines.append(f" ❌ {cred['name']}: {cred['message']}") |
|
report_lines.append(f" Description: {cred['description']}") |
|
report_lines.append(f" Instructions: {cred['instructions']}") |
|
if cred.get("url"): |
|
report_lines.append(f" URL: {cred['url']}") |
|
report_lines.append("") |
|
|
|
if missing_required_creds: |
|
report_lines.append("MISSING REQUIRED CREDENTIALS (Action required):") |
|
for cred in missing_required_creds: |
|
report_lines.append(f" ❌ {cred['name']}: Missing") |
|
report_lines.append(f" Description: {cred['description']}") |
|
report_lines.append(f" Instructions: {cred['instructions']}") |
|
if cred.get("url"): |
|
report_lines.append(f" URL: {cred['url']}") |
|
report_lines.append("") |
|
|
|
if valid_creds: |
|
report_lines.append("VALID CREDENTIALS:") |
|
for cred in valid_creds: |
|
report_lines.append(f" ✅ {cred['name']}: {cred['message']}") |
|
if cred.get("masked_value"): |
|
report_lines.append(f" Value: {cred['masked_value']}") |
|
report_lines.append("") |
|
|
|
if missing_optional_creds: |
|
report_lines.append("MISSING OPTIONAL CREDENTIALS:") |
|
for cred in missing_optional_creds: |
|
report_lines.append(f" ⚠️ {cred['name']}: Missing (optional)") |
|
report_lines.append(f" Description: {cred['description']}") |
|
report_lines.append(f" Instructions: {cred['instructions']}") |
|
if cred.get("url"): |
|
report_lines.append(f" URL: {cred['url']}") |
|
report_lines.append("") |
|
|
|
|
|
report_lines.extend([ |
|
"======================================================", |
|
" NEXT STEPS ", |
|
"======================================================", |
|
"" |
|
]) |
|
|
|
if invalid_creds or missing_required_creds: |
|
report_lines.append("REQUIRED ACTIONS:") |
|
for cred in invalid_creds + missing_required_creds: |
|
report_lines.append(f"1. Fix {cred['name']}:") |
|
report_lines.append(f" - {cred['instructions']}") |
|
if cred.get("url"): |
|
report_lines.append(f" - Visit: {cred['url']}") |
|
report_lines.append("") |
|
|
|
report_lines.append("2. Add fixed credentials to your .env file") |
|
report_lines.append("3. Run this validation script again to confirm fixes") |
|
report_lines.append("") |
|
else: |
|
report_lines.append("✅ All required credentials are valid!") |
|
report_lines.append("") |
|
|
|
if missing_optional_creds: |
|
report_lines.append("OPTIONAL ENHANCEMENTS:") |
|
for cred in missing_optional_creds: |
|
report_lines.append(f"- Add {cred['name']} to enable {cred['description'].lower()}") |
|
report_lines.append(f" Instructions: {cred['instructions']}") |
|
if cred.get("url"): |
|
report_lines.append(f" URL: {cred['url']}") |
|
report_lines.append("") |
|
|
|
return "\n".join(report_lines) |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="Validate GAIA credentials") |
|
parser.add_argument("--verbose", action="store_true", help="Show detailed validation process") |
|
parser.add_argument("--output-json", action="store_true", help="Save results to a JSON file") |
|
args = parser.parse_args() |
|
|
|
|
|
if args.verbose: |
|
logger.setLevel(logging.DEBUG) |
|
else: |
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
try: |
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
logger.info("Loaded environment variables from .env file") |
|
except ImportError: |
|
logger.info("python-dotenv not installed, using environment variables directly") |
|
|
|
|
|
validator = CredentialValidator(verbose=args.verbose) |
|
results = validator.validate_all_credentials() |
|
|
|
|
|
report = validator.generate_report() |
|
print(report) |
|
|
|
|
|
if args.output_json: |
|
output_dir = Path("results") |
|
output_dir.mkdir(exist_ok=True) |
|
output_file = output_dir / f"credential_validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
|
with open(output_file, "w") as f: |
|
json.dump(results, f, indent=2) |
|
print(f"\nJSON results saved to: {output_file}") |
|
|
|
|
|
if (results["summary"]["invalid"] > 0 or |
|
(results["summary"]["missing"] > 0 and results["summary"]["missing"] != results["summary"]["not_required"])): |
|
return 1 |
|
return 0 |
|
|
|
if __name__ == "__main__": |
|
sys.exit(main()) |