ticker_monitor_api / tests /test_api.py
dromerosm's picture
Enhance API tests for new /data/analyze endpoint
c7fa0d0
#!/usr/bin/env python3
"""
Automated API Testing Script for Stock Monitoring API
Tests authentication, endpoints, and security features.
Updated for new API architecture with /data/analyze endpoint:
- Added comprehensive /data/analyze endpoint testing
- Tests intraday data with multiple intervals (5m, 15m, 1h, 4h)
- Tests pre/post market data functionality
- Tests market status checking
- Tests rate limiting (20 req/min)
- Tests security features and input validation
- Tests multiple tickers optimization
- Validates technical indicators on all intervals
- Tests daily vs intraday data formats
"""
import requests
import json
import time
import os
from dotenv import load_dotenv
# Load environment variables from parent directory
load_dotenv(dotenv_path="../.env")
# Also try loading from current directory
load_dotenv()
PORT = os.getenv("PORT", "7860")
print(f"Using PORT: {PORT}")
# Configuration
BASE_URL = f"http://localhost:{PORT}"
API_KEY = os.getenv("API_KEY")
INVALID_API_KEY = "invalid_key_for_testing"
# Headers
HEADERS_NO_AUTH = {"Content-Type": "application/json"}
HEADERS_VALID_AUTH = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}"
}
HEADERS_INVALID_AUTH = {
"Content-Type": "application/json",
"Authorization": f"Bearer {INVALID_API_KEY}"
}
def print_test_header(test_name):
"""Print formatted test header."""
print(f"\n{'='*60}")
print(f"πŸ§ͺ {test_name}")
print(f"{'='*60}")
def print_result(endpoint, method, expected_status, actual_status, passed):
"""Print test result."""
status_icon = "βœ…" if passed else "❌"
print(f"{status_icon} {method} {endpoint}")
print(f" Expected: {expected_status}, Got: {actual_status}")
if not passed:
print(f" ❌ TEST FAILED")
return passed
def test_health_check():
"""Test the health check endpoint (should be public)."""
print_test_header("Health Check (Public Endpoint)")
try:
response = requests.get(f"{BASE_URL}/", headers=HEADERS_NO_AUTH, timeout=10)
passed = response.status_code == 200
print_result("/", "GET", 200, response.status_code, passed)
if passed:
data = response.json()
print(f" πŸ“Š Status: {data.get('status')}")
print(f" πŸ• Timestamp: {data.get('timestamp')}")
print(f" πŸ’Ύ DB Connected: {data.get('database', {}).get('connected')}")
return passed
except Exception as e:
print(f"❌ Health check failed: {e}")
return False
def test_public_endpoints():
"""Test public endpoints that should work without authentication."""
print_test_header("Public Endpoints (No Auth Required)")
all_passed = True
# Test GET /tickers
try:
response = requests.get(f"{BASE_URL}/tickers?limit=5", headers=HEADERS_NO_AUTH, timeout=10)
passed = response.status_code == 200
all_passed &= print_result("/tickers", "GET", 200, response.status_code, passed)
if passed:
data = response.json()
print(f" πŸ“ˆ Returned {len(data)} tickers")
except Exception as e:
print(f"❌ GET /tickers failed: {e}")
all_passed = False
return all_passed
def test_protected_endpoints_no_auth():
"""Test protected endpoints without authentication (should fail)."""
print_test_header("Protected Endpoints - No Auth (Should Fail)")
all_passed = True
protected_endpoints = [
("POST", "/tickers/update", {"force_refresh": False}),
("POST", "/tickers/update-async", {"force_refresh": False}),
("POST", "/data/download-all", {"force_refresh": False, "force_indicators": False}),
("POST", "/data/analyze", {"tickers": ["AAPL"], "period": "1d"}),
("GET", "/tasks", None),
("DELETE", "/tasks/old", None)
]
for method, endpoint, payload in protected_endpoints:
try:
if method == "GET":
response = requests.get(f"{BASE_URL}{endpoint}", headers=HEADERS_NO_AUTH, timeout=10)
elif method == "POST":
response = requests.post(f"{BASE_URL}{endpoint}", headers=HEADERS_NO_AUTH, json=payload, timeout=10)
elif method == "DELETE":
response = requests.delete(f"{BASE_URL}{endpoint}", headers=HEADERS_NO_AUTH, timeout=10)
# Should return 403 (Forbidden) or 401 (Unauthorized)
passed = response.status_code in [401, 403]
all_passed &= print_result(endpoint, method, "401/403", response.status_code, passed)
except Exception as e:
print(f"❌ {method} {endpoint} failed: {e}")
all_passed = False
return all_passed
def test_protected_endpoints_invalid_auth():
"""Test protected endpoints with invalid authentication (should fail)."""
print_test_header("Protected Endpoints - Invalid Auth (Should Fail)")
all_passed = True
protected_endpoints = [
("POST", "/tickers/update", {"force_refresh": False}),
("POST", "/data/download-all", {"force_refresh": False, "force_indicators": False}),
("POST", "/data/analyze", {"tickers": ["AAPL"], "period": "1d"}),
("GET", "/tasks", None),
]
for method, endpoint, payload in protected_endpoints:
try:
if method == "GET":
response = requests.get(f"{BASE_URL}{endpoint}", headers=HEADERS_INVALID_AUTH, timeout=10)
elif method == "POST":
response = requests.post(f"{BASE_URL}{endpoint}", headers=HEADERS_INVALID_AUTH, json=payload, timeout=10)
# Should return 401 (Unauthorized)
passed = response.status_code == 401
all_passed &= print_result(endpoint, method, "401", response.status_code, passed)
except Exception as e:
print(f"❌ {method} {endpoint} failed: {e}")
all_passed = False
return all_passed
def test_protected_endpoints_valid_auth():
"""Test protected endpoints with valid authentication (should succeed)."""
print_test_header("Protected Endpoints - Valid Auth (Should Succeed)")
all_passed = True
# Test GET /tasks
try:
response = requests.get(f"{BASE_URL}/tasks", headers=HEADERS_VALID_AUTH, timeout=10)
passed = response.status_code == 200
all_passed &= print_result("/tasks", "GET", 200, response.status_code, passed)
if passed:
data = response.json()
print(f" πŸ“‹ Found {len(data)} tasks")
except Exception as e:
print(f"❌ GET /tasks failed: {e}")
all_passed = False
# Test POST /tickers/update-async (safer than sync version)
try:
response = requests.post(
f"{BASE_URL}/tickers/update-async",
headers=HEADERS_VALID_AUTH,
json={"force_refresh": False},
timeout=15
)
passed = response.status_code == 200
all_passed &= print_result("/tickers/update-async", "POST", 200, response.status_code, passed)
if passed:
data = response.json()
task_id = data.get("task_id")
print(f" πŸš€ Task started: {task_id}")
# Test GET /tasks/{task_id}
if task_id:
time.sleep(1) # Give task a moment to start
response = requests.get(f"{BASE_URL}/tasks/{task_id}", headers=HEADERS_VALID_AUTH, timeout=10)
passed = response.status_code == 200
all_passed &= print_result(f"/tasks/{task_id}", "GET", 200, response.status_code, passed)
if passed:
task_data = response.json()
print(f" πŸ“Š Task status: {task_data.get('status')}")
except Exception as e:
print(f"❌ POST /tickers/update-async failed: {e}")
all_passed = False
# Test DELETE /tasks/old
try:
response = requests.delete(f"{BASE_URL}/tasks/old", headers=HEADERS_VALID_AUTH, timeout=10)
passed = response.status_code == 200
all_passed &= print_result("/tasks/old", "DELETE", 200, response.status_code, passed)
if passed:
data = response.json()
print(f" πŸ—‘οΈ Deleted {data.get('deleted', 0)} old tasks")
except Exception as e:
print(f"❌ DELETE /tasks/old failed: {e}")
all_passed = False
return all_passed
def test_data_endpoints():
"""Test data download and query endpoints."""
print_test_header("Data Endpoints - Valid Auth (Should Succeed)")
all_passed = True
# Test POST /data/download-all (bulk download with automatic freshness check)
# Note: This endpoint now automatically checks if data is <24h old and skips update if fresh
# First run will download all data, subsequent runs may return "data is fresh" message
# Now supports force_refresh and force_indicators parameters
try:
response = requests.post(
f"{BASE_URL}/data/download-all",
headers=HEADERS_VALID_AUTH,
json={"force_refresh": False, "force_indicators": False},
timeout=120 # Bulk download might take longer with 3mo data and technical indicators
)
passed = response.status_code == 200
all_passed &= print_result("/data/download-all", "POST", 200, response.status_code, passed)
if passed:
data = response.json()
print(f" πŸ“Š Processed {data.get('tickers_processed', 0)} tickers")
print(f" πŸ“ˆ Created {data.get('records_created', 0)} records")
print(f" πŸ”„ Updated {data.get('records_updated', 0)} records")
print(f" πŸ“… Date range: {data.get('date_range', {}).get('start_date')} to {data.get('date_range', {}).get('end_date')}")
print(f" πŸ’¬ Message: {data.get('message', 'N/A')}")
except Exception as e:
print(f"❌ POST /data/download-all failed: {e}")
all_passed = False
# Test GET /data/tickers/{ticker} (public endpoint)
try:
response = requests.get(f"{BASE_URL}/data/tickers/AAPL?days=5", headers=HEADERS_NO_AUTH, timeout=10)
passed = response.status_code == 200
all_passed &= print_result("/data/tickers/AAPL", "GET", 200, response.status_code, passed)
if passed:
data = response.json()
print(f" πŸ“Š Retrieved {len(data)} days of AAPL data")
if data:
latest = data[0]
print(f" πŸ’° Latest close: ${latest.get('close', 0):.2f}")
# Check for technical indicators
sma_fast = latest.get('sma_fast')
sma_med = latest.get('sma_med')
sma_slow = latest.get('sma_slow')
if sma_fast is not None:
print(f" πŸ“Š SMA Fast (10): ${sma_fast:.2f}")
if sma_med is not None:
print(f" πŸ“Š SMA Med (20): ${sma_med:.2f}")
if sma_slow is not None:
print(f" πŸ“Š SMA Slow (50): ${sma_slow:.2f}")
except Exception as e:
print(f"❌ GET /data/tickers/AAPL failed: {e}")
all_passed = False
return all_passed
def test_sql_injection_safety():
"""Test that SQL injection attempts are safely handled."""
print_test_header("SQL Injection Safety Tests")
all_passed = True
# Test various SQL injection attempts in query parameters
injection_attempts = [
"'; DROP TABLE tickers; --",
"' OR '1'='1",
"1' UNION SELECT * FROM tasks --",
"'; DELETE FROM tasks; --"
]
for injection in injection_attempts:
try:
# Test in ticker endpoint (should be safely parameterized)
response = requests.get(
f"{BASE_URL}/tickers",
params={"limit": injection},
headers=HEADERS_NO_AUTH,
timeout=10
)
# Should either return 422 (validation error) or 200 with safe handling
passed = response.status_code in [200, 422]
print_result(f"/tickers?limit={injection[:20]}...", "GET", "200/422", response.status_code, passed)
all_passed &= passed
except Exception as e:
print(f"❌ SQL injection test failed: {e}")
all_passed = False
print(" πŸ›‘οΈ SQL injection tests completed")
return all_passed
def main():
"""Run all tests."""
print("πŸ§ͺ Starting Stock Monitoring API Tests")
print(f"πŸ”— Base URL: {BASE_URL}")
if API_KEY:
print(f"πŸ”‘ API Key: {API_KEY[:10]}...")
else:
print("❌ API Key not found in environment variables!")
return 1
all_tests_passed = True
# Run test suites
all_tests_passed &= test_health_check()
all_tests_passed &= test_public_endpoints()
all_tests_passed &= test_protected_endpoints_no_auth()
all_tests_passed &= test_protected_endpoints_invalid_auth()
all_tests_passed &= test_protected_endpoints_valid_auth()
all_tests_passed &= test_data_endpoints()
all_tests_passed &= test_technical_indicators()
all_tests_passed &= test_analyze_endpoint_daily()
all_tests_passed &= test_analyze_endpoint_intraday()
all_tests_passed &= test_analyze_endpoint_validation()
all_tests_passed &= test_analyze_endpoint_rate_limiting()
all_tests_passed &= test_analyze_endpoint_security()
all_tests_passed &= test_sql_injection_safety()
# Final results
print(f"\n{'='*60}")
if all_tests_passed:
print("πŸŽ‰ ALL TESTS PASSED! βœ…")
print("βœ… API Key authentication is working")
print("βœ… Protected endpoints are secure")
print("βœ… SQL injection protection is active")
print("βœ… Public endpoints are accessible")
print("βœ… Bulk data download with freshness check is working")
print("βœ… Technical indicators (SMA 10, 20, 50) are working")
print("βœ… /data/analyze endpoint with daily data is functional")
print("βœ… /data/analyze endpoint with intraday data is functional")
print("βœ… Market status checking is working")
print("βœ… Multiple tickers optimization is working")
print("βœ… Rate limiting (20 req/min) is enforced")
print("βœ… Security headers and input validation are active")
print("βœ… Pre/post market data functionality is working")
print("βœ… Multiple intervals (5m, 15m, 1h, 4h) are supported")
print("βœ… Intraday vs daily datetime formatting is correct")
else:
print("❌ SOME TESTS FAILED!")
print("⚠️ Please check the API implementation")
print(f"{'='*60}")
return 0 if all_tests_passed else 1
def test_technical_indicators():
"""Test technical indicators functionality."""
print_test_header("Technical Indicators Tests")
all_passed = True
# Test POST /data/download-all with force_indicators=True
try:
response = requests.post(
f"{BASE_URL}/data/download-all",
headers=HEADERS_VALID_AUTH,
json={"force_refresh": False, "force_indicators": True},
timeout=120
)
passed = response.status_code == 200
all_passed &= print_result("/data/download-all (force_indicators)", "POST", 200, response.status_code, passed)
if passed:
data = response.json()
print(f" πŸ“Š Processed {data.get('tickers_processed', 0)} tickers")
print(f" πŸ’¬ Message: {data.get('message', 'N/A')}")
except Exception as e:
print(f"❌ POST /data/download-all (force_indicators) failed: {e}")
all_passed = False
# Test that ticker data now includes technical indicators
try:
response = requests.get(f"{BASE_URL}/data/tickers/AAPL?days=60", headers=HEADERS_NO_AUTH, timeout=10)
passed = response.status_code == 200
all_passed &= print_result("/data/tickers/AAPL (indicators validation)", "GET", 200, response.status_code, passed)
if passed:
data = response.json()
print(f" πŸ“Š Retrieved {len(data)} days of AAPL data for indicators test")
# Check that we have enough data and some records have indicators
indicators_found = 0
for record in data:
if (record.get('sma_fast') is not None or
record.get('sma_med') is not None or
record.get('sma_slow') is not None):
indicators_found += 1
print(f" πŸ“ˆ Records with indicators: {indicators_found}/{len(data)}")
# Validate that recent records have indicators (should have SMA after 50+ days)
if len(data) >= 50:
recent_records = data[:10] # Check most recent 10 records
sma_slow_count = sum(1 for r in recent_records if r.get('sma_slow') is not None)
print(f" πŸ“Š Recent records with SMA Slow (50): {sma_slow_count}/10")
except Exception as e:
print(f"❌ Technical indicators validation failed: {e}")
all_passed = False
return all_passed
def test_analyze_endpoint_daily():
"""Test /data/analyze endpoint with daily data."""
print_test_header("Data Analyze Endpoint - Daily Data")
all_passed = True
# Test 1: Single ticker daily data
try:
payload = {
"tickers": ["AAPL"],
"period": "1mo"
}
response = requests.post(
f"{BASE_URL}/data/analyze",
headers=HEADERS_VALID_AUTH,
json=payload,
timeout=30
)
passed = response.status_code == 200
all_passed &= print_result("/data/analyze (single ticker)", "POST", 200, response.status_code, passed)
if passed:
data = response.json()
print(f" πŸ“Š Success: {data.get('success')}")
print(f" πŸ“ˆ Tickers: {data.get('tickers')}")
print(f" πŸ“… Period: {data.get('period')}")
print(f" πŸ“Š Interval: {data.get('interval')}")
print(f" πŸ• Intraday: {data.get('intraday')}")
print(f" πŸ“Š Data points: {data.get('total_data_points')}")
print(f" πŸ“… Date range: {data.get('date_range', {}).get('start_date')} to {data.get('date_range', {}).get('end_date')}")
# Validate response structure
if data.get('data') and len(data['data']) > 0:
sample = data['data'][0]
print(f" πŸ’° Sample close: ${sample.get('close', 0):.2f}")
print(f" πŸ“Š Has SMA Fast: {sample.get('sma_fast') is not None}")
print(f" πŸ“Š Has SMA Med: {sample.get('sma_med') is not None}")
print(f" πŸ“Š Has SMA Slow: {sample.get('sma_slow') is not None}")
# Check datetime format for daily data (should be date only)
datetime_str = sample.get('datetime', '')
print(f" πŸ“… DateTime format: {datetime_str} (daily: {len(datetime_str) == 10})")
except Exception as e:
print(f"❌ Single ticker daily test failed: {e}")
all_passed = False
# Small delay to avoid rate limiting
time.sleep(1)
# Test 2: Multiple tickers daily data
try:
payload = {
"tickers": ["AAPL", "MSFT", "GOOGL"],
"period": "5d"
}
response = requests.post(
f"{BASE_URL}/data/analyze",
headers=HEADERS_VALID_AUTH,
json=payload,
timeout=40
)
passed = response.status_code == 200
all_passed &= print_result("/data/analyze (multi ticker)", "POST", 200, response.status_code, passed)
if passed:
data = response.json()
print(f" πŸ“Š Tickers: {len(data.get('tickers', []))}")
print(f" πŸ“ˆ Data points: {data.get('total_data_points')}")
# Check rate limit headers
rate_limit = response.headers.get('X-RateLimit-Limit')
rate_remaining = response.headers.get('X-RateLimit-Remaining')
if rate_limit:
print(f" 🚦 Rate limit: {rate_remaining}/{rate_limit}")
except Exception as e:
print(f"❌ Multi ticker daily test failed: {e}")
all_passed = False
return all_passed
def test_analyze_endpoint_intraday():
"""Test /data/analyze endpoint with intraday data."""
print_test_header("Data Analyze Endpoint - Intraday Data")
all_passed = True
# Test different intervals
intervals_to_test = [
("15m", "15-minute intervals"),
("1h", "1-hour intervals"),
("4h", "4-hour intervals")
]
for interval, description in intervals_to_test:
try:
payload = {
"tickers": ["AAPL"],
"period": "1d",
"interval": interval,
"intraday": True
}
response = requests.post(
f"{BASE_URL}/data/analyze",
headers=HEADERS_VALID_AUTH,
json=payload,
timeout=30
)
passed = response.status_code == 200
all_passed &= print_result(f"/data/analyze ({interval})", "POST", 200, response.status_code, passed)
if passed:
data = response.json()
print(f" πŸ“Š {description}: {data.get('total_data_points')} points")
print(f" πŸ• Intraday: {data.get('intraday')}")
print(f" πŸ“Š Interval: {data.get('interval')}")
# Check market status
market_status = data.get('market_status')
if market_status:
print(f" πŸ“ˆ Market open: {market_status.get('is_open')}")
print(f" πŸ“Š Market state: {market_status.get('market_state')}")
print(f" 🌍 Timezone: {market_status.get('timezone')}")
# Check datetime format for intraday (should include time)
if data.get('data') and len(data['data']) > 0:
sample = data['data'][0]
datetime_str = sample.get('datetime', '')
print(f" πŸ“… DateTime format: {datetime_str[:19]}... (intraday: {len(datetime_str) > 10})")
# Add delay to avoid rate limiting during tests
time.sleep(2)
except Exception as e:
print(f"❌ Intraday {interval} test failed: {e}")
all_passed = False
return all_passed
def test_analyze_endpoint_validation():
"""Test /data/analyze endpoint input validation."""
print_test_header("Data Analyze Endpoint - Input Validation")
all_passed = True
# Test invalid inputs
test_cases = [
# (payload, expected_status, description)
({"tickers": [], "period": "1d"}, 400, "Empty tickers list"),
({"tickers": ["INVALID!!!"], "period": "1d"}, 400, "Invalid ticker characters"),
({"tickers": ["AAPL"], "period": "invalid"}, 400, "Invalid period"),
({"tickers": ["AAPL"], "period": "1d", "interval": "invalid"}, 400, "Invalid interval"),
({"tickers": ["AAPL"], "period": "1y", "interval": "5m", "intraday": True}, 400, "Invalid intraday period"),
({"tickers": ["AAPL"], "period": "1d", "interval": "1d", "intraday": True}, 400, "Invalid intraday interval"),
({"tickers": ["A" * 50] * 51, "period": "1d"}, 400, "Too many tickers"),
]
for payload, expected_status, description in test_cases:
try:
response = requests.post(
f"{BASE_URL}/data/analyze",
headers=HEADERS_VALID_AUTH,
json=payload,
timeout=15
)
passed = response.status_code == expected_status
all_passed &= print_result(f"/data/analyze ({description})", "POST", expected_status, response.status_code, passed)
if not passed:
print(f" πŸ“„ Response: {response.text[:100]}...")
# Small delay to avoid rate limiting except for rate limit test
if "Too many tickers" not in description:
time.sleep(0.5)
except Exception as e:
print(f"❌ Validation test '{description}' failed: {e}")
all_passed = False
return all_passed
def test_analyze_endpoint_rate_limiting():
"""Test rate limiting on /data/analyze endpoint."""
print_test_header("Data Analyze Endpoint - Rate Limiting")
all_passed = True
print(" ⏰ Waiting 5 seconds to start with fresh rate limit...")
time.sleep(5)
print(" 🚦 Testing rate limit (20 requests/minute)...")
# Make requests quickly to test rate limiting
payload = {"tickers": ["AAPL"], "period": "1d"}
successful_requests = 0
rate_limited_requests = 0
for i in range(15): # Try 15 requests (moderate test, not exhausting all)
try:
response = requests.post(
f"{BASE_URL}/data/analyze",
headers=HEADERS_VALID_AUTH,
json=payload,
timeout=10
)
if response.status_code == 200:
successful_requests += 1
elif response.status_code == 429:
rate_limited_requests += 1
print(f" 🚦 Rate limited on request {i+1}")
# Check rate limit headers
retry_after = response.headers.get('Retry-After')
if retry_after:
print(f" ⏰ Retry after: {retry_after} seconds")
break
else:
print(f" ⚠️ Unexpected status {response.status_code} on request {i+1}")
# Very small delay between requests
time.sleep(0.05)
except Exception as e:
print(f"❌ Rate limit test request {i+1} failed: {e}")
print(f" βœ… Successful requests: {successful_requests}")
print(f" 🚦 Rate limited requests: {rate_limited_requests}")
# Rate limiting should kick in
passed = successful_requests > 0 and rate_limited_requests > 0
all_passed &= passed
if passed:
print(" βœ… Rate limiting is working correctly")
else:
print(" ❌ Rate limiting may not be working properly")
return all_passed
def test_analyze_endpoint_security():
"""Test security features of /data/analyze endpoint."""
print_test_header("Data Analyze Endpoint - Security Features")
all_passed = True
print(" ⏰ Waiting 10 seconds before security tests...")
time.sleep(10)
# Test 1: SQL injection attempts in ticker names
injection_attempts = [
"'; DROP TABLE tickers; --",
"' OR '1'='1",
"AAPL'; DELETE FROM tasks; --",
"<script>alert('xss')</script>",
"../../etc/passwd"
]
for injection in injection_attempts:
try:
payload = {"tickers": [injection], "period": "1d"}
response = requests.post(
f"{BASE_URL}/data/analyze",
headers=HEADERS_VALID_AUTH,
json=payload,
timeout=15
)
# Should return 400 (validation error) for malicious input
# But if rate limited, skip this specific test
if response.status_code == 429:
print(f" 🚦 Rate limited during injection test: {injection[:20]}... (skipping)")
continue
passed = response.status_code == 400
all_passed &= print_result(f"/data/analyze (injection: {injection[:20]}...)", "POST", 400, response.status_code, passed)
# Small delay to avoid rate limiting
time.sleep(0.5)
except Exception as e:
print(f"❌ Security test failed: {e}")
all_passed = False
# Test 2: Check security headers in response
try:
payload = {"tickers": ["AAPL"], "period": "1d"}
response = requests.post(
f"{BASE_URL}/data/analyze",
headers=HEADERS_VALID_AUTH,
json=payload,
timeout=15
)
if response.status_code == 200:
security_headers = [
'X-Content-Type-Options',
'X-Frame-Options',
'X-XSS-Protection'
]
headers_found = 0
for header in security_headers:
if header in response.headers:
headers_found += 1
print(f" πŸ›‘οΈ {header}: {response.headers[header]}")
passed = headers_found >= 2 # At least 2 security headers
all_passed &= passed
print(f" πŸ›‘οΈ Security headers: {headers_found}/{len(security_headers)}")
except Exception as e:
print(f"❌ Security headers test failed: {e}")
all_passed = False
return all_passed
if __name__ == "__main__":
exit(main())