Final_Assignment_GAIAAgent / src /gaia /utils /validate_all_credentials.py
JoachimVC's picture
Upload GAIA agent implementation files for assessment
c922f8b
"""
GAIA Credential Validation Script
This script performs comprehensive validation of all API keys and credentials required
by the GAIA system. It does minimal API calls to verify the validity of each credential
and produces a detailed report of missing, invalid, or expired credentials with
instructions for fixing issues.
Usage:
python -m src.gaia.utils.validate_all_credentials [--verbose] [--output-json]
Arguments:
--verbose: Show detailed validation process
--output-json: Save results to a JSON file
"""
import os
import sys
import json
import time
import logging
import argparse
from typing import Dict, Any, List, Tuple, Optional
from pathlib import Path
import urllib.request
import urllib.error
import requests
from datetime import datetime
# Add a small delay between API calls to avoid rate-limiting
API_CALL_DELAY = 0.5
# Setup logging
logger = logging.getLogger("gaia.credential_validator")
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# Define credential types and validation requirements
CREDENTIALS = [
{
"name": "OPENAI_API_KEY",
"description": "OpenAI API key for language model access",
"required": True,
"url": "https://platform.openai.com/",
"instructions": "Create or log in to your account at OpenAI, navigate to API Keys section, and create a new key"
},
{
"name": "SUPABASE_URL",
"description": "Supabase project URL",
"required": True,
"url": "https://app.supabase.com/",
"instructions": "Log in to Supabase dashboard, select your project, navigate to Settings → API"
},
{
"name": "SUPABASE_KEY",
"description": "Supabase service role key",
"required": True,
"url": "https://app.supabase.com/",
"instructions": "Log in to Supabase dashboard, select your project, navigate to Settings → API, use service_role key"
},
{
"name": "AUTH_SECRET_KEY",
"description": "Secret key for authentication",
"required": True,
"url": None,
"instructions": "Generate a secure random key with: openssl rand -hex 32"
},
{
"name": "SERPER_API_KEY",
"description": "Serper API key for web search",
"required": False,
"url": "https://serper.dev/",
"instructions": "Sign up at Serper.dev, navigate to API Keys section, create and copy your key"
},
{
"name": "PERPLEXITY_API_KEY",
"description": "Perplexity API key for AI search",
"required": False,
"url": "https://www.perplexity.ai/",
"instructions": "Sign up at Perplexity AI, navigate to account settings to find API section"
},
{
"name": "YOUTUBE_API_KEY",
"description": "YouTube Data API key",
"required": False,
"url": "https://console.cloud.google.com/",
"instructions": "Create a project in Google Cloud Console, enable YouTube Data API v3, and create API credentials"
},
{
"name": "HF_TOKEN",
"description": "Hugging Face token for deployment",
"required": False,
"url": "https://huggingface.co/settings/tokens",
"instructions": "Create or log in to Hugging Face, navigate to Settings → Access Tokens, create a new token"
}
]
class CredentialValidator:
"""Validates all credentials and produces a detailed report."""
def __init__(self, verbose: bool = False):
"""
Initialize the credential validator.
Args:
verbose: Whether to show detailed validation process
"""
self.verbose = verbose
self.results = {
"timestamp": datetime.now().isoformat(),
"credentials": {},
"summary": {
"total": 0,
"valid": 0,
"invalid": 0,
"missing": 0,
"expired": 0,
"not_required": 0
}
}
def log(self, message: str):
"""Log a message if verbose mode is enabled."""
if self.verbose:
logger.info(message)
def validate_all_credentials(self) -> Dict[str, Any]:
"""
Validate all credentials and return results.
Returns:
Dictionary containing validation results
"""
self.log("Starting credential validation...")
for cred in CREDENTIALS:
self.validate_credential(cred)
time.sleep(API_CALL_DELAY) # Avoid rate limiting
# Update summary
self.results["summary"]["total"] = len(CREDENTIALS)
valid_count = sum(1 for result in self.results["credentials"].values()
if result.get("status") == "valid")
self.results["summary"]["valid"] = valid_count
invalid_count = sum(1 for result in self.results["credentials"].values()
if result.get("status") == "invalid")
self.results["summary"]["invalid"] = invalid_count
missing_count = sum(1 for result in self.results["credentials"].values()
if result.get("status") == "missing")
self.results["summary"]["missing"] = missing_count
expired_count = sum(1 for result in self.results["credentials"].values()
if result.get("status") == "expired")
self.results["summary"]["expired"] = expired_count
not_required_count = sum(1 for result in self.results["credentials"].values()
if result.get("required") is False and result.get("status") == "missing")
self.results["summary"]["not_required"] = not_required_count
self.log("Credential validation complete.")
return self.results
def validate_credential(self, credential: Dict[str, Any]):
"""
Validate a single credential.
Args:
credential: Dictionary containing credential information
"""
name = credential["name"]
self.log(f"Validating {name}...")
value = os.environ.get(name)
result = {
"name": name,
"description": credential["description"],
"required": credential["required"],
"url": credential["url"],
"instructions": credential["instructions"]
}
if not value:
result["status"] = "missing"
result["message"] = f"{name} is not set in environment variables"
self.results["credentials"][name] = result
if credential["required"]:
self.log(f"❌ {name}: Missing (REQUIRED)")
else:
self.log(f"⚠️ {name}: Missing (OPTIONAL)")
return
# Mask the key value for security in logs and results
masked_value = f"{value[:5]}...{value[-4:]}" if len(value) > 9 else "***"
result["masked_value"] = masked_value
# Validate the credential
is_valid, message, details = self._validate_specific_credential(name, value)
if is_valid:
result["status"] = "valid"
result["message"] = message
self.log(f"✅ {name}: Valid")
else:
result["status"] = "invalid"
result["message"] = message
result["details"] = details
self.log(f"❌ {name}: Invalid - {message}")
self.results["credentials"][name] = result
def _validate_specific_credential(self, name: str, value: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
"""
Validate a specific credential with the appropriate API call.
Args:
name: Credential name
value: Credential value
Returns:
Tuple containing:
- Boolean indicating whether the credential is valid
- Message describing the validation result
- Optional dictionary with additional details
"""
try:
if name == "OPENAI_API_KEY":
return self._validate_openai_key(value)
elif name == "SUPABASE_URL":
return self._validate_supabase_url(value)
elif name == "SUPABASE_KEY":
return self._validate_supabase_key(value)
elif name == "AUTH_SECRET_KEY":
return self._validate_auth_secret_key(value)
elif name == "SERPER_API_KEY":
return self._validate_serper_key(value)
elif name == "PERPLEXITY_API_KEY":
return self._validate_perplexity_key(value)
elif name == "YOUTUBE_API_KEY":
return self._validate_youtube_key(value)
elif name == "HF_TOKEN":
return self._validate_hf_token(value)
else:
return False, f"Unknown credential: {name}", None
except Exception as e:
logger.error(f"Error validating {name}: {str(e)}")
return False, f"Validation error: {str(e)}", {"exception": str(e)}
def _validate_openai_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
"""Validate OpenAI API key with a minimal API call."""
url = "https://api.openai.com/v1/models"
req = urllib.request.Request(url)
req.add_header("Authorization", f"Bearer {api_key}")
try:
with urllib.request.urlopen(req, timeout=5) as response:
if response.status == 200:
return True, "OpenAI API key is valid", None
else:
return False, f"Unexpected status code: {response.status}", {"status_code": response.status}
except urllib.error.HTTPError as e:
if e.code == 401:
return False, "Invalid API key", {"status_code": e.code}
else:
return False, f"HTTP error: {e.code} {e.reason}", {"status_code": e.code, "reason": e.reason}
except Exception as e:
return False, f"Error validating OpenAI API key: {str(e)}", {"exception": str(e)}
def _validate_supabase_url(self, url: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
"""Validate Supabase URL format and accessibility."""
if not url.startswith("https://") or not ".supabase.co" in url:
return False, "Invalid Supabase URL format", {"url": url}
try:
response = requests.head(url, timeout=5)
if response.status_code < 400: # Any non-error response is good
return True, "Supabase URL is valid", None
else:
return False, f"Supabase URL returned status code: {response.status_code}", {"status_code": response.status_code}
except requests.exceptions.RequestException as e:
return False, f"Error connecting to Supabase URL: {str(e)}", {"exception": str(e)}
def _validate_supabase_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
"""Validate Supabase API key with a minimal API call."""
# This requires both the URL and key to be set
url = os.environ.get("SUPABASE_URL")
if not url:
return False, "Cannot validate Supabase key without SUPABASE_URL", None
health_url = f"{url}/rest/v1/"
headers = {
"apikey": api_key,
"Authorization": f"Bearer {api_key}"
}
try:
response = requests.get(health_url, headers=headers, timeout=5)
if response.status_code < 300:
return True, "Supabase API key is valid", None
elif response.status_code == 401 or response.status_code == 403:
return False, "Invalid Supabase API key", {"status_code": response.status_code}
else:
return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code}
except requests.exceptions.RequestException as e:
return False, f"Error connecting to Supabase API: {str(e)}", {"exception": str(e)}
def _validate_auth_secret_key(self, key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
"""Validate AUTH_SECRET_KEY (simple length and complexity check)."""
if len(key) < 10:
return False, "Auth secret key is too short (should be at least 10 characters)", {"length": len(key)}
# Basic complexity check
has_letters = any(c.isalpha() for c in key)
has_numbers = any(c.isdigit() for c in key)
if not (has_letters and has_numbers):
return False, "Auth secret key is not complex enough (should contain letters and numbers)", None
return True, "Auth secret key appears to be valid", None
def _validate_serper_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
"""Validate Serper API key with a minimal API call."""
url = "https://google.serper.dev/search"
headers = {
"X-API-KEY": api_key,
"Content-Type": "application/json"
}
payload = {
"q": "test query",
"gl": "us",
"hl": "en",
"num": 1
}
try:
response = requests.post(url, headers=headers, json=payload, timeout=5)
if response.status_code == 200:
return True, "Serper API key is valid", None
elif response.status_code == 401 or response.status_code == 403:
return False, "Invalid Serper API key", {"status_code": response.status_code}
else:
return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code}
except requests.exceptions.RequestException as e:
return False, f"Error connecting to Serper API: {str(e)}", {"exception": str(e)}
def _validate_perplexity_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
"""Validate Perplexity API key with a minimal API call."""
url = "https://api.perplexity.ai/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "sonar-small-online",
"messages": [{"role": "user", "content": "Hello"}],
"max_tokens": 5
}
try:
response = requests.post(url, headers=headers, json=payload, timeout=5)
if response.status_code == 200:
return True, "Perplexity API key is valid", None
elif response.status_code == 401 or response.status_code == 403:
return False, "Invalid Perplexity API key", {"status_code": response.status_code}
else:
return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code}
except requests.exceptions.RequestException as e:
return False, f"Error connecting to Perplexity API: {str(e)}", {"exception": str(e)}
def _validate_youtube_key(self, api_key: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
"""Validate YouTube API key with a minimal API call."""
url = f"https://www.googleapis.com/youtube/v3/videos?part=snippet&chart=mostPopular&maxResults=1&key={api_key}"
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
return True, "YouTube API key is valid", None
elif response.status_code == 400:
data = response.json()
error = data.get("error", {}).get("message", "Unknown error")
return False, f"YouTube API error: {error}", {"error": error}
elif response.status_code == 403:
return False, "Invalid YouTube API key or quota exceeded", {"status_code": response.status_code}
else:
return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code}
except requests.exceptions.RequestException as e:
return False, f"Error connecting to YouTube API: {str(e)}", {"exception": str(e)}
def _validate_hf_token(self, token: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
"""Validate Hugging Face token with a minimal API call."""
url = "https://huggingface.co/api/whoami"
headers = {"Authorization": f"Bearer {token}"}
try:
response = requests.get(url, headers=headers, timeout=5)
if response.status_code == 200:
return True, "Hugging Face token is valid", None
elif response.status_code == 401 or response.status_code == 403:
return False, "Invalid Hugging Face token", {"status_code": response.status_code}
else:
return False, f"Unexpected status code: {response.status_code}", {"status_code": response.status_code}
except requests.exceptions.RequestException as e:
return False, f"Error connecting to Hugging Face API: {str(e)}", {"exception": str(e)}
def generate_report(self) -> str:
"""
Generate a formatted text report of credential validation results.
Returns:
Formatted text report
"""
report_lines = [
"======================================================",
" GAIA CREDENTIAL VALIDATION REPORT ",
"======================================================",
f"Timestamp: {self.results['timestamp']}",
"",
"SUMMARY:",
f" Total credentials checked: {self.results['summary']['total']}",
f" Valid: {self.results['summary']['valid']}",
f" Invalid: {self.results['summary']['invalid']}",
f" Missing: {self.results['summary']['missing']}",
f" Missing (not required): {self.results['summary']['not_required']}",
"",
"CREDENTIAL DETAILS:",
""
]
# Group by status for better organization
valid_creds = [cred for name, cred in self.results["credentials"].items()
if cred.get("status") == "valid"]
invalid_creds = [cred for name, cred in self.results["credentials"].items()
if cred.get("status") == "invalid"]
missing_required_creds = [cred for name, cred in self.results["credentials"].items()
if cred.get("status") == "missing" and cred.get("required") is True]
missing_optional_creds = [cred for name, cred in self.results["credentials"].items()
if cred.get("status") == "missing" and cred.get("required") is False]
# First show issues that need attention
if invalid_creds:
report_lines.append("INVALID CREDENTIALS (Action required):")
for cred in invalid_creds:
report_lines.append(f" ❌ {cred['name']}: {cred['message']}")
report_lines.append(f" Description: {cred['description']}")
report_lines.append(f" Instructions: {cred['instructions']}")
if cred.get("url"):
report_lines.append(f" URL: {cred['url']}")
report_lines.append("")
if missing_required_creds:
report_lines.append("MISSING REQUIRED CREDENTIALS (Action required):")
for cred in missing_required_creds:
report_lines.append(f" ❌ {cred['name']}: Missing")
report_lines.append(f" Description: {cred['description']}")
report_lines.append(f" Instructions: {cred['instructions']}")
if cred.get("url"):
report_lines.append(f" URL: {cred['url']}")
report_lines.append("")
if valid_creds:
report_lines.append("VALID CREDENTIALS:")
for cred in valid_creds:
report_lines.append(f" ✅ {cred['name']}: {cred['message']}")
if cred.get("masked_value"):
report_lines.append(f" Value: {cred['masked_value']}")
report_lines.append("")
if missing_optional_creds:
report_lines.append("MISSING OPTIONAL CREDENTIALS:")
for cred in missing_optional_creds:
report_lines.append(f" ⚠️ {cred['name']}: Missing (optional)")
report_lines.append(f" Description: {cred['description']}")
report_lines.append(f" Instructions: {cred['instructions']}")
if cred.get("url"):
report_lines.append(f" URL: {cred['url']}")
report_lines.append("")
# Add a section for fixing credential issues
report_lines.extend([
"======================================================",
" NEXT STEPS ",
"======================================================",
""
])
if invalid_creds or missing_required_creds:
report_lines.append("REQUIRED ACTIONS:")
for cred in invalid_creds + missing_required_creds:
report_lines.append(f"1. Fix {cred['name']}:")
report_lines.append(f" - {cred['instructions']}")
if cred.get("url"):
report_lines.append(f" - Visit: {cred['url']}")
report_lines.append("")
report_lines.append("2. Add fixed credentials to your .env file")
report_lines.append("3. Run this validation script again to confirm fixes")
report_lines.append("")
else:
report_lines.append("✅ All required credentials are valid!")
report_lines.append("")
if missing_optional_creds:
report_lines.append("OPTIONAL ENHANCEMENTS:")
for cred in missing_optional_creds:
report_lines.append(f"- Add {cred['name']} to enable {cred['description'].lower()}")
report_lines.append(f" Instructions: {cred['instructions']}")
if cred.get("url"):
report_lines.append(f" URL: {cred['url']}")
report_lines.append("")
return "\n".join(report_lines)
def main():
parser = argparse.ArgumentParser(description="Validate GAIA credentials")
parser.add_argument("--verbose", action="store_true", help="Show detailed validation process")
parser.add_argument("--output-json", action="store_true", help="Save results to a JSON file")
args = parser.parse_args()
# Configure logging based on verbosity
if args.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# Load environment variables from .env file if dotenv is available
try:
from dotenv import load_dotenv
load_dotenv()
logger.info("Loaded environment variables from .env file")
except ImportError:
logger.info("python-dotenv not installed, using environment variables directly")
# Run validation
validator = CredentialValidator(verbose=args.verbose)
results = validator.validate_all_credentials()
# Generate and print report
report = validator.generate_report()
print(report)
# Save results to JSON if requested
if args.output_json:
output_dir = Path("results")
output_dir.mkdir(exist_ok=True)
output_file = output_dir / f"credential_validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, "w") as f:
json.dump(results, f, indent=2)
print(f"\nJSON results saved to: {output_file}")
# Return exit code based on validation results
if (results["summary"]["invalid"] > 0 or
(results["summary"]["missing"] > 0 and results["summary"]["missing"] != results["summary"]["not_required"])):
return 1
return 0
if __name__ == "__main__":
sys.exit(main())