Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
Automated API Testing Script for Stock Monitoring API | |
Tests authentication, endpoints, and security features. | |
Updated for new API architecture with /data/analyze endpoint: | |
- Added comprehensive /data/analyze endpoint testing | |
- Tests intraday data with multiple intervals (5m, 15m, 1h, 4h) | |
- Tests pre/post market data functionality | |
- Tests market status checking | |
- Tests rate limiting (20 req/min) | |
- Tests security features and input validation | |
- Tests multiple tickers optimization | |
- Validates technical indicators on all intervals | |
- Tests daily vs intraday data formats | |
""" | |
import requests | |
import json | |
import time | |
import os | |
from dotenv import load_dotenv | |
# Load environment variables from parent directory | |
load_dotenv(dotenv_path="../.env") | |
# Also try loading from current directory | |
load_dotenv() | |
PORT = os.getenv("PORT", "7860") | |
print(f"Using PORT: {PORT}") | |
# Configuration | |
BASE_URL = f"http://localhost:{PORT}" | |
API_KEY = os.getenv("API_KEY") | |
INVALID_API_KEY = "invalid_key_for_testing" | |
# Headers | |
HEADERS_NO_AUTH = {"Content-Type": "application/json"} | |
HEADERS_VALID_AUTH = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {API_KEY}" | |
} | |
HEADERS_INVALID_AUTH = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {INVALID_API_KEY}" | |
} | |
def print_test_header(test_name): | |
"""Print formatted test header.""" | |
print(f"\n{'='*60}") | |
print(f"π§ͺ {test_name}") | |
print(f"{'='*60}") | |
def print_result(endpoint, method, expected_status, actual_status, passed): | |
"""Print test result.""" | |
status_icon = "β " if passed else "β" | |
print(f"{status_icon} {method} {endpoint}") | |
print(f" Expected: {expected_status}, Got: {actual_status}") | |
if not passed: | |
print(f" β TEST FAILED") | |
return passed | |
def test_health_check(): | |
"""Test the health check endpoint (should be public).""" | |
print_test_header("Health Check (Public Endpoint)") | |
try: | |
response = requests.get(f"{BASE_URL}/", headers=HEADERS_NO_AUTH, timeout=10) | |
passed = response.status_code == 200 | |
print_result("/", "GET", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
print(f" π Status: {data.get('status')}") | |
print(f" π Timestamp: {data.get('timestamp')}") | |
print(f" πΎ DB Connected: {data.get('database', {}).get('connected')}") | |
return passed | |
except Exception as e: | |
print(f"β Health check failed: {e}") | |
return False | |
def test_public_endpoints(): | |
"""Test public endpoints that should work without authentication.""" | |
print_test_header("Public Endpoints (No Auth Required)") | |
all_passed = True | |
# Test GET /tickers | |
try: | |
response = requests.get(f"{BASE_URL}/tickers?limit=5", headers=HEADERS_NO_AUTH, timeout=10) | |
passed = response.status_code == 200 | |
all_passed &= print_result("/tickers", "GET", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
print(f" π Returned {len(data)} tickers") | |
except Exception as e: | |
print(f"β GET /tickers failed: {e}") | |
all_passed = False | |
return all_passed | |
def test_protected_endpoints_no_auth(): | |
"""Test protected endpoints without authentication (should fail).""" | |
print_test_header("Protected Endpoints - No Auth (Should Fail)") | |
all_passed = True | |
protected_endpoints = [ | |
("POST", "/tickers/update", {"force_refresh": False}), | |
("POST", "/tickers/update-async", {"force_refresh": False}), | |
("POST", "/data/download-all", {"force_refresh": False, "force_indicators": False}), | |
("POST", "/data/analyze", {"tickers": ["AAPL"], "period": "1d"}), | |
("GET", "/tasks", None), | |
("DELETE", "/tasks/old", None) | |
] | |
for method, endpoint, payload in protected_endpoints: | |
try: | |
if method == "GET": | |
response = requests.get(f"{BASE_URL}{endpoint}", headers=HEADERS_NO_AUTH, timeout=10) | |
elif method == "POST": | |
response = requests.post(f"{BASE_URL}{endpoint}", headers=HEADERS_NO_AUTH, json=payload, timeout=10) | |
elif method == "DELETE": | |
response = requests.delete(f"{BASE_URL}{endpoint}", headers=HEADERS_NO_AUTH, timeout=10) | |
# Should return 403 (Forbidden) or 401 (Unauthorized) | |
passed = response.status_code in [401, 403] | |
all_passed &= print_result(endpoint, method, "401/403", response.status_code, passed) | |
except Exception as e: | |
print(f"β {method} {endpoint} failed: {e}") | |
all_passed = False | |
return all_passed | |
def test_protected_endpoints_invalid_auth(): | |
"""Test protected endpoints with invalid authentication (should fail).""" | |
print_test_header("Protected Endpoints - Invalid Auth (Should Fail)") | |
all_passed = True | |
protected_endpoints = [ | |
("POST", "/tickers/update", {"force_refresh": False}), | |
("POST", "/data/download-all", {"force_refresh": False, "force_indicators": False}), | |
("POST", "/data/analyze", {"tickers": ["AAPL"], "period": "1d"}), | |
("GET", "/tasks", None), | |
] | |
for method, endpoint, payload in protected_endpoints: | |
try: | |
if method == "GET": | |
response = requests.get(f"{BASE_URL}{endpoint}", headers=HEADERS_INVALID_AUTH, timeout=10) | |
elif method == "POST": | |
response = requests.post(f"{BASE_URL}{endpoint}", headers=HEADERS_INVALID_AUTH, json=payload, timeout=10) | |
# Should return 401 (Unauthorized) | |
passed = response.status_code == 401 | |
all_passed &= print_result(endpoint, method, "401", response.status_code, passed) | |
except Exception as e: | |
print(f"β {method} {endpoint} failed: {e}") | |
all_passed = False | |
return all_passed | |
def test_protected_endpoints_valid_auth(): | |
"""Test protected endpoints with valid authentication (should succeed).""" | |
print_test_header("Protected Endpoints - Valid Auth (Should Succeed)") | |
all_passed = True | |
# Test GET /tasks | |
try: | |
response = requests.get(f"{BASE_URL}/tasks", headers=HEADERS_VALID_AUTH, timeout=10) | |
passed = response.status_code == 200 | |
all_passed &= print_result("/tasks", "GET", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
print(f" π Found {len(data)} tasks") | |
except Exception as e: | |
print(f"β GET /tasks failed: {e}") | |
all_passed = False | |
# Test POST /tickers/update-async (safer than sync version) | |
try: | |
response = requests.post( | |
f"{BASE_URL}/tickers/update-async", | |
headers=HEADERS_VALID_AUTH, | |
json={"force_refresh": False}, | |
timeout=15 | |
) | |
passed = response.status_code == 200 | |
all_passed &= print_result("/tickers/update-async", "POST", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
task_id = data.get("task_id") | |
print(f" π Task started: {task_id}") | |
# Test GET /tasks/{task_id} | |
if task_id: | |
time.sleep(1) # Give task a moment to start | |
response = requests.get(f"{BASE_URL}/tasks/{task_id}", headers=HEADERS_VALID_AUTH, timeout=10) | |
passed = response.status_code == 200 | |
all_passed &= print_result(f"/tasks/{task_id}", "GET", 200, response.status_code, passed) | |
if passed: | |
task_data = response.json() | |
print(f" π Task status: {task_data.get('status')}") | |
except Exception as e: | |
print(f"β POST /tickers/update-async failed: {e}") | |
all_passed = False | |
# Test DELETE /tasks/old | |
try: | |
response = requests.delete(f"{BASE_URL}/tasks/old", headers=HEADERS_VALID_AUTH, timeout=10) | |
passed = response.status_code == 200 | |
all_passed &= print_result("/tasks/old", "DELETE", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
print(f" ποΈ Deleted {data.get('deleted', 0)} old tasks") | |
except Exception as e: | |
print(f"β DELETE /tasks/old failed: {e}") | |
all_passed = False | |
return all_passed | |
def test_data_endpoints(): | |
"""Test data download and query endpoints.""" | |
print_test_header("Data Endpoints - Valid Auth (Should Succeed)") | |
all_passed = True | |
# Test POST /data/download-all (bulk download with automatic freshness check) | |
# Note: This endpoint now automatically checks if data is <24h old and skips update if fresh | |
# First run will download all data, subsequent runs may return "data is fresh" message | |
# Now supports force_refresh and force_indicators parameters | |
try: | |
response = requests.post( | |
f"{BASE_URL}/data/download-all", | |
headers=HEADERS_VALID_AUTH, | |
json={"force_refresh": False, "force_indicators": False}, | |
timeout=120 # Bulk download might take longer with 3mo data and technical indicators | |
) | |
passed = response.status_code == 200 | |
all_passed &= print_result("/data/download-all", "POST", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
print(f" π Processed {data.get('tickers_processed', 0)} tickers") | |
print(f" π Created {data.get('records_created', 0)} records") | |
print(f" π Updated {data.get('records_updated', 0)} records") | |
print(f" π Date range: {data.get('date_range', {}).get('start_date')} to {data.get('date_range', {}).get('end_date')}") | |
print(f" π¬ Message: {data.get('message', 'N/A')}") | |
except Exception as e: | |
print(f"β POST /data/download-all failed: {e}") | |
all_passed = False | |
# Test GET /data/tickers/{ticker} (public endpoint) | |
try: | |
response = requests.get(f"{BASE_URL}/data/tickers/AAPL?days=5", headers=HEADERS_NO_AUTH, timeout=10) | |
passed = response.status_code == 200 | |
all_passed &= print_result("/data/tickers/AAPL", "GET", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
print(f" π Retrieved {len(data)} days of AAPL data") | |
if data: | |
latest = data[0] | |
print(f" π° Latest close: ${latest.get('close', 0):.2f}") | |
# Check for technical indicators | |
sma_fast = latest.get('sma_fast') | |
sma_med = latest.get('sma_med') | |
sma_slow = latest.get('sma_slow') | |
if sma_fast is not None: | |
print(f" π SMA Fast (10): ${sma_fast:.2f}") | |
if sma_med is not None: | |
print(f" π SMA Med (20): ${sma_med:.2f}") | |
if sma_slow is not None: | |
print(f" π SMA Slow (50): ${sma_slow:.2f}") | |
except Exception as e: | |
print(f"β GET /data/tickers/AAPL failed: {e}") | |
all_passed = False | |
return all_passed | |
def test_sql_injection_safety(): | |
"""Test that SQL injection attempts are safely handled.""" | |
print_test_header("SQL Injection Safety Tests") | |
all_passed = True | |
# Test various SQL injection attempts in query parameters | |
injection_attempts = [ | |
"'; DROP TABLE tickers; --", | |
"' OR '1'='1", | |
"1' UNION SELECT * FROM tasks --", | |
"'; DELETE FROM tasks; --" | |
] | |
for injection in injection_attempts: | |
try: | |
# Test in ticker endpoint (should be safely parameterized) | |
response = requests.get( | |
f"{BASE_URL}/tickers", | |
params={"limit": injection}, | |
headers=HEADERS_NO_AUTH, | |
timeout=10 | |
) | |
# Should either return 422 (validation error) or 200 with safe handling | |
passed = response.status_code in [200, 422] | |
print_result(f"/tickers?limit={injection[:20]}...", "GET", "200/422", response.status_code, passed) | |
all_passed &= passed | |
except Exception as e: | |
print(f"β SQL injection test failed: {e}") | |
all_passed = False | |
print(" π‘οΈ SQL injection tests completed") | |
return all_passed | |
def main(): | |
"""Run all tests.""" | |
print("π§ͺ Starting Stock Monitoring API Tests") | |
print(f"π Base URL: {BASE_URL}") | |
if API_KEY: | |
print(f"π API Key: {API_KEY[:10]}...") | |
else: | |
print("β API Key not found in environment variables!") | |
return 1 | |
all_tests_passed = True | |
# Run test suites | |
all_tests_passed &= test_health_check() | |
all_tests_passed &= test_public_endpoints() | |
all_tests_passed &= test_protected_endpoints_no_auth() | |
all_tests_passed &= test_protected_endpoints_invalid_auth() | |
all_tests_passed &= test_protected_endpoints_valid_auth() | |
all_tests_passed &= test_data_endpoints() | |
all_tests_passed &= test_technical_indicators() | |
all_tests_passed &= test_analyze_endpoint_daily() | |
all_tests_passed &= test_analyze_endpoint_intraday() | |
all_tests_passed &= test_analyze_endpoint_validation() | |
all_tests_passed &= test_analyze_endpoint_rate_limiting() | |
all_tests_passed &= test_analyze_endpoint_security() | |
all_tests_passed &= test_sql_injection_safety() | |
# Final results | |
print(f"\n{'='*60}") | |
if all_tests_passed: | |
print("π ALL TESTS PASSED! β ") | |
print("β API Key authentication is working") | |
print("β Protected endpoints are secure") | |
print("β SQL injection protection is active") | |
print("β Public endpoints are accessible") | |
print("β Bulk data download with freshness check is working") | |
print("β Technical indicators (SMA 10, 20, 50) are working") | |
print("β /data/analyze endpoint with daily data is functional") | |
print("β /data/analyze endpoint with intraday data is functional") | |
print("β Market status checking is working") | |
print("β Multiple tickers optimization is working") | |
print("β Rate limiting (20 req/min) is enforced") | |
print("β Security headers and input validation are active") | |
print("β Pre/post market data functionality is working") | |
print("β Multiple intervals (5m, 15m, 1h, 4h) are supported") | |
print("β Intraday vs daily datetime formatting is correct") | |
else: | |
print("β SOME TESTS FAILED!") | |
print("β οΈ Please check the API implementation") | |
print(f"{'='*60}") | |
return 0 if all_tests_passed else 1 | |
def test_technical_indicators(): | |
"""Test technical indicators functionality.""" | |
print_test_header("Technical Indicators Tests") | |
all_passed = True | |
# Test POST /data/download-all with force_indicators=True | |
try: | |
response = requests.post( | |
f"{BASE_URL}/data/download-all", | |
headers=HEADERS_VALID_AUTH, | |
json={"force_refresh": False, "force_indicators": True}, | |
timeout=120 | |
) | |
passed = response.status_code == 200 | |
all_passed &= print_result("/data/download-all (force_indicators)", "POST", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
print(f" π Processed {data.get('tickers_processed', 0)} tickers") | |
print(f" π¬ Message: {data.get('message', 'N/A')}") | |
except Exception as e: | |
print(f"β POST /data/download-all (force_indicators) failed: {e}") | |
all_passed = False | |
# Test that ticker data now includes technical indicators | |
try: | |
response = requests.get(f"{BASE_URL}/data/tickers/AAPL?days=60", headers=HEADERS_NO_AUTH, timeout=10) | |
passed = response.status_code == 200 | |
all_passed &= print_result("/data/tickers/AAPL (indicators validation)", "GET", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
print(f" π Retrieved {len(data)} days of AAPL data for indicators test") | |
# Check that we have enough data and some records have indicators | |
indicators_found = 0 | |
for record in data: | |
if (record.get('sma_fast') is not None or | |
record.get('sma_med') is not None or | |
record.get('sma_slow') is not None): | |
indicators_found += 1 | |
print(f" π Records with indicators: {indicators_found}/{len(data)}") | |
# Validate that recent records have indicators (should have SMA after 50+ days) | |
if len(data) >= 50: | |
recent_records = data[:10] # Check most recent 10 records | |
sma_slow_count = sum(1 for r in recent_records if r.get('sma_slow') is not None) | |
print(f" π Recent records with SMA Slow (50): {sma_slow_count}/10") | |
except Exception as e: | |
print(f"β Technical indicators validation failed: {e}") | |
all_passed = False | |
return all_passed | |
def test_analyze_endpoint_daily(): | |
"""Test /data/analyze endpoint with daily data.""" | |
print_test_header("Data Analyze Endpoint - Daily Data") | |
all_passed = True | |
# Test 1: Single ticker daily data | |
try: | |
payload = { | |
"tickers": ["AAPL"], | |
"period": "1mo" | |
} | |
response = requests.post( | |
f"{BASE_URL}/data/analyze", | |
headers=HEADERS_VALID_AUTH, | |
json=payload, | |
timeout=30 | |
) | |
passed = response.status_code == 200 | |
all_passed &= print_result("/data/analyze (single ticker)", "POST", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
print(f" π Success: {data.get('success')}") | |
print(f" π Tickers: {data.get('tickers')}") | |
print(f" π Period: {data.get('period')}") | |
print(f" π Interval: {data.get('interval')}") | |
print(f" π Intraday: {data.get('intraday')}") | |
print(f" π Data points: {data.get('total_data_points')}") | |
print(f" π Date range: {data.get('date_range', {}).get('start_date')} to {data.get('date_range', {}).get('end_date')}") | |
# Validate response structure | |
if data.get('data') and len(data['data']) > 0: | |
sample = data['data'][0] | |
print(f" π° Sample close: ${sample.get('close', 0):.2f}") | |
print(f" π Has SMA Fast: {sample.get('sma_fast') is not None}") | |
print(f" π Has SMA Med: {sample.get('sma_med') is not None}") | |
print(f" π Has SMA Slow: {sample.get('sma_slow') is not None}") | |
# Check datetime format for daily data (should be date only) | |
datetime_str = sample.get('datetime', '') | |
print(f" π DateTime format: {datetime_str} (daily: {len(datetime_str) == 10})") | |
except Exception as e: | |
print(f"β Single ticker daily test failed: {e}") | |
all_passed = False | |
# Small delay to avoid rate limiting | |
time.sleep(1) | |
# Test 2: Multiple tickers daily data | |
try: | |
payload = { | |
"tickers": ["AAPL", "MSFT", "GOOGL"], | |
"period": "5d" | |
} | |
response = requests.post( | |
f"{BASE_URL}/data/analyze", | |
headers=HEADERS_VALID_AUTH, | |
json=payload, | |
timeout=40 | |
) | |
passed = response.status_code == 200 | |
all_passed &= print_result("/data/analyze (multi ticker)", "POST", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
print(f" π Tickers: {len(data.get('tickers', []))}") | |
print(f" π Data points: {data.get('total_data_points')}") | |
# Check rate limit headers | |
rate_limit = response.headers.get('X-RateLimit-Limit') | |
rate_remaining = response.headers.get('X-RateLimit-Remaining') | |
if rate_limit: | |
print(f" π¦ Rate limit: {rate_remaining}/{rate_limit}") | |
except Exception as e: | |
print(f"β Multi ticker daily test failed: {e}") | |
all_passed = False | |
return all_passed | |
def test_analyze_endpoint_intraday(): | |
"""Test /data/analyze endpoint with intraday data.""" | |
print_test_header("Data Analyze Endpoint - Intraday Data") | |
all_passed = True | |
# Test different intervals | |
intervals_to_test = [ | |
("15m", "15-minute intervals"), | |
("1h", "1-hour intervals"), | |
("4h", "4-hour intervals") | |
] | |
for interval, description in intervals_to_test: | |
try: | |
payload = { | |
"tickers": ["AAPL"], | |
"period": "1d", | |
"interval": interval, | |
"intraday": True | |
} | |
response = requests.post( | |
f"{BASE_URL}/data/analyze", | |
headers=HEADERS_VALID_AUTH, | |
json=payload, | |
timeout=30 | |
) | |
passed = response.status_code == 200 | |
all_passed &= print_result(f"/data/analyze ({interval})", "POST", 200, response.status_code, passed) | |
if passed: | |
data = response.json() | |
print(f" π {description}: {data.get('total_data_points')} points") | |
print(f" π Intraday: {data.get('intraday')}") | |
print(f" π Interval: {data.get('interval')}") | |
# Check market status | |
market_status = data.get('market_status') | |
if market_status: | |
print(f" π Market open: {market_status.get('is_open')}") | |
print(f" π Market state: {market_status.get('market_state')}") | |
print(f" π Timezone: {market_status.get('timezone')}") | |
# Check datetime format for intraday (should include time) | |
if data.get('data') and len(data['data']) > 0: | |
sample = data['data'][0] | |
datetime_str = sample.get('datetime', '') | |
print(f" π DateTime format: {datetime_str[:19]}... (intraday: {len(datetime_str) > 10})") | |
# Add delay to avoid rate limiting during tests | |
time.sleep(2) | |
except Exception as e: | |
print(f"β Intraday {interval} test failed: {e}") | |
all_passed = False | |
return all_passed | |
def test_analyze_endpoint_validation(): | |
"""Test /data/analyze endpoint input validation.""" | |
print_test_header("Data Analyze Endpoint - Input Validation") | |
all_passed = True | |
# Test invalid inputs | |
test_cases = [ | |
# (payload, expected_status, description) | |
({"tickers": [], "period": "1d"}, 400, "Empty tickers list"), | |
({"tickers": ["INVALID!!!"], "period": "1d"}, 400, "Invalid ticker characters"), | |
({"tickers": ["AAPL"], "period": "invalid"}, 400, "Invalid period"), | |
({"tickers": ["AAPL"], "period": "1d", "interval": "invalid"}, 400, "Invalid interval"), | |
({"tickers": ["AAPL"], "period": "1y", "interval": "5m", "intraday": True}, 400, "Invalid intraday period"), | |
({"tickers": ["AAPL"], "period": "1d", "interval": "1d", "intraday": True}, 400, "Invalid intraday interval"), | |
({"tickers": ["A" * 50] * 51, "period": "1d"}, 400, "Too many tickers"), | |
] | |
for payload, expected_status, description in test_cases: | |
try: | |
response = requests.post( | |
f"{BASE_URL}/data/analyze", | |
headers=HEADERS_VALID_AUTH, | |
json=payload, | |
timeout=15 | |
) | |
passed = response.status_code == expected_status | |
all_passed &= print_result(f"/data/analyze ({description})", "POST", expected_status, response.status_code, passed) | |
if not passed: | |
print(f" π Response: {response.text[:100]}...") | |
# Small delay to avoid rate limiting except for rate limit test | |
if "Too many tickers" not in description: | |
time.sleep(0.5) | |
except Exception as e: | |
print(f"β Validation test '{description}' failed: {e}") | |
all_passed = False | |
return all_passed | |
def test_analyze_endpoint_rate_limiting(): | |
"""Test rate limiting on /data/analyze endpoint.""" | |
print_test_header("Data Analyze Endpoint - Rate Limiting") | |
all_passed = True | |
print(" β° Waiting 5 seconds to start with fresh rate limit...") | |
time.sleep(5) | |
print(" π¦ Testing rate limit (20 requests/minute)...") | |
# Make requests quickly to test rate limiting | |
payload = {"tickers": ["AAPL"], "period": "1d"} | |
successful_requests = 0 | |
rate_limited_requests = 0 | |
for i in range(15): # Try 15 requests (moderate test, not exhausting all) | |
try: | |
response = requests.post( | |
f"{BASE_URL}/data/analyze", | |
headers=HEADERS_VALID_AUTH, | |
json=payload, | |
timeout=10 | |
) | |
if response.status_code == 200: | |
successful_requests += 1 | |
elif response.status_code == 429: | |
rate_limited_requests += 1 | |
print(f" π¦ Rate limited on request {i+1}") | |
# Check rate limit headers | |
retry_after = response.headers.get('Retry-After') | |
if retry_after: | |
print(f" β° Retry after: {retry_after} seconds") | |
break | |
else: | |
print(f" β οΈ Unexpected status {response.status_code} on request {i+1}") | |
# Very small delay between requests | |
time.sleep(0.05) | |
except Exception as e: | |
print(f"β Rate limit test request {i+1} failed: {e}") | |
print(f" β Successful requests: {successful_requests}") | |
print(f" π¦ Rate limited requests: {rate_limited_requests}") | |
# Rate limiting should kick in | |
passed = successful_requests > 0 and rate_limited_requests > 0 | |
all_passed &= passed | |
if passed: | |
print(" β Rate limiting is working correctly") | |
else: | |
print(" β Rate limiting may not be working properly") | |
return all_passed | |
def test_analyze_endpoint_security(): | |
"""Test security features of /data/analyze endpoint.""" | |
print_test_header("Data Analyze Endpoint - Security Features") | |
all_passed = True | |
print(" β° Waiting 10 seconds before security tests...") | |
time.sleep(10) | |
# Test 1: SQL injection attempts in ticker names | |
injection_attempts = [ | |
"'; DROP TABLE tickers; --", | |
"' OR '1'='1", | |
"AAPL'; DELETE FROM tasks; --", | |
"<script>alert('xss')</script>", | |
"../../etc/passwd" | |
] | |
for injection in injection_attempts: | |
try: | |
payload = {"tickers": [injection], "period": "1d"} | |
response = requests.post( | |
f"{BASE_URL}/data/analyze", | |
headers=HEADERS_VALID_AUTH, | |
json=payload, | |
timeout=15 | |
) | |
# Should return 400 (validation error) for malicious input | |
# But if rate limited, skip this specific test | |
if response.status_code == 429: | |
print(f" π¦ Rate limited during injection test: {injection[:20]}... (skipping)") | |
continue | |
passed = response.status_code == 400 | |
all_passed &= print_result(f"/data/analyze (injection: {injection[:20]}...)", "POST", 400, response.status_code, passed) | |
# Small delay to avoid rate limiting | |
time.sleep(0.5) | |
except Exception as e: | |
print(f"β Security test failed: {e}") | |
all_passed = False | |
# Test 2: Check security headers in response | |
try: | |
payload = {"tickers": ["AAPL"], "period": "1d"} | |
response = requests.post( | |
f"{BASE_URL}/data/analyze", | |
headers=HEADERS_VALID_AUTH, | |
json=payload, | |
timeout=15 | |
) | |
if response.status_code == 200: | |
security_headers = [ | |
'X-Content-Type-Options', | |
'X-Frame-Options', | |
'X-XSS-Protection' | |
] | |
headers_found = 0 | |
for header in security_headers: | |
if header in response.headers: | |
headers_found += 1 | |
print(f" π‘οΈ {header}: {response.headers[header]}") | |
passed = headers_found >= 2 # At least 2 security headers | |
all_passed &= passed | |
print(f" π‘οΈ Security headers: {headers_found}/{len(security_headers)}") | |
except Exception as e: | |
print(f"β Security headers test failed: {e}") | |
all_passed = False | |
return all_passed | |
if __name__ == "__main__": | |
exit(main()) |