|
from fastapi import FastAPI, Request, HTTPException, status, Form |
|
from fastapi.responses import StreamingResponse, HTMLResponse, RedirectResponse |
|
import requests |
|
import os |
|
import time |
|
import logging |
|
import random |
|
import json |
|
from datetime import datetime, timedelta |
|
from collections import defaultdict |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
app = FastAPI(title="Instagram Stories API") |
|
|
|
|
|
CACHE_DIR = "/tmp/instagram_cache" |
|
COOKIES_FILE = "/tmp/instagram_cookies.json" |
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
|
|
|
|
IG_SESSION = None |
|
IG_SESSION_USERNAME = None |
|
IG_SESSION_EXPIRY = None |
|
STORY_CACHE = {} |
|
CACHE_EXPIRY = {} |
|
LAST_REQUEST = {} |
|
|
|
|
|
USER_AGENTS = [ |
|
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" |
|
] |
|
|
|
|
|
def get_cache_key(username): |
|
return username.lower() |
|
|
|
def get_cached_stories(username): |
|
key = get_cache_key(username) |
|
if key in STORY_CACHE and key in CACHE_EXPIRY: |
|
if datetime.now() < CACHE_EXPIRY[key]: |
|
logger.info(f"Cache hit for {username}") |
|
return STORY_CACHE[key] |
|
else: |
|
STORY_CACHE.pop(key, None) |
|
CACHE_EXPIRY.pop(key, None) |
|
return None |
|
|
|
def save_to_cache(username, data, minutes=30): |
|
key = get_cache_key(username) |
|
STORY_CACHE[key] = data |
|
CACHE_EXPIRY[key] = datetime.now() + timedelta(minutes=minutes) |
|
|
|
try: |
|
cache_file = os.path.join(CACHE_DIR, f"{key}.json") |
|
with open(cache_file, 'w') as f: |
|
json.dump({ |
|
"data": data, |
|
"expires": CACHE_EXPIRY[key].isoformat() |
|
}, f) |
|
logger.info(f"Cached {username} stories for {minutes} minutes") |
|
except Exception as e: |
|
logger.warning(f"Failed to save cache: {str(e)}") |
|
|
|
def should_rate_limit_request(username): |
|
key = username.lower() |
|
now = datetime.now() |
|
|
|
if key in LAST_REQUEST: |
|
seconds_since = (now - LAST_REQUEST[key]).total_seconds() |
|
if seconds_since < 60: |
|
logger.warning(f"Rate limiting {username}: {seconds_since:.1f}s since last request") |
|
return True |
|
|
|
LAST_REQUEST[key] = now |
|
return False |
|
|
|
|
|
def get_instagram_session(): |
|
global IG_SESSION, IG_SESSION_EXPIRY |
|
|
|
if IG_SESSION is None or IG_SESSION_EXPIRY is None or datetime.now() > IG_SESSION_EXPIRY: |
|
if os.path.exists(COOKIES_FILE): |
|
try: |
|
with open(COOKIES_FILE, 'r') as f: |
|
data = json.load(f) |
|
|
|
if 'expires' in data and datetime.fromisoformat(data['expires']) > datetime.now(): |
|
IG_SESSION = requests.Session() |
|
|
|
for cookie in data['cookies']: |
|
IG_SESSION.cookies.set( |
|
cookie['name'], |
|
cookie['value'], |
|
domain=cookie.get('domain', '.instagram.com') |
|
) |
|
|
|
IG_SESSION_EXPIRY = datetime.fromisoformat(data['expires']) |
|
global IG_SESSION_USERNAME |
|
IG_SESSION_USERNAME = data.get('username', 'unknown') |
|
|
|
logger.info(f"Loaded Instagram session for {IG_SESSION_USERNAME}") |
|
return IG_SESSION |
|
else: |
|
logger.warning("Saved session expired, needs new import") |
|
except Exception as e: |
|
logger.error(f"Failed to load cookies: {str(e)}") |
|
|
|
if IG_SESSION is not None and IG_SESSION_EXPIRY is not None and datetime.now() < IG_SESSION_EXPIRY: |
|
return IG_SESSION |
|
|
|
return None |
|
|
|
def save_instagram_session(cookies, username, days=30): |
|
"""Save imported cookie data""" |
|
global IG_SESSION, IG_SESSION_EXPIRY, IG_SESSION_USERNAME |
|
|
|
IG_SESSION = requests.Session() |
|
IG_SESSION_EXPIRY = datetime.now() + timedelta(days=days) |
|
IG_SESSION_USERNAME = username |
|
|
|
for cookie in cookies: |
|
IG_SESSION.cookies.set( |
|
cookie['name'], |
|
cookie['value'], |
|
domain=cookie.get('domain', '.instagram.com') |
|
) |
|
|
|
try: |
|
with open(COOKIES_FILE, 'w') as f: |
|
json.dump({ |
|
'cookies': cookies, |
|
'expires': IG_SESSION_EXPIRY.isoformat(), |
|
'username': username |
|
}, f) |
|
|
|
logger.info(f"Saved Instagram session for {username}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Failed to save cookies: {str(e)}") |
|
return False |
|
|
|
def clear_instagram_session(): |
|
global IG_SESSION, IG_SESSION_EXPIRY, IG_SESSION_USERNAME |
|
|
|
IG_SESSION = None |
|
IG_SESSION_EXPIRY = None |
|
IG_SESSION_USERNAME = None |
|
|
|
if os.path.exists(COOKIES_FILE): |
|
try: |
|
os.remove(COOKIES_FILE) |
|
logger.info("Removed Instagram session cookies") |
|
except Exception as e: |
|
logger.error(f"Failed to remove cookies file: {str(e)}") |
|
|
|
def parse_cookie_string(cookie_str): |
|
"""Parse cookie string into cookie objects""" |
|
cookies = [] |
|
|
|
for cookie_part in cookie_str.split(';'): |
|
cookie_part = cookie_part.strip() |
|
if not cookie_part: |
|
continue |
|
|
|
if '=' in cookie_part: |
|
name, value = cookie_part.split('=', 1) |
|
cookies.append({ |
|
'name': name.strip(), |
|
'value': value.strip(), |
|
'domain': '.instagram.com' |
|
}) |
|
|
|
return cookies |
|
|
|
|
|
def get_instagram_stories(username): |
|
session = get_instagram_session() |
|
if not session: |
|
logger.warning("No valid Instagram session") |
|
return None |
|
|
|
try: |
|
|
|
profile_url = f"https://www.instagram.com/api/v1/users/web_profile_info/?username={username}" |
|
|
|
session.headers.update({ |
|
'User-Agent': random.choice(USER_AGENTS), |
|
'X-IG-App-ID': '936619743392459', |
|
'Referer': f'https://www.instagram.com/{username}/', |
|
}) |
|
|
|
profile_response = session.get(profile_url) |
|
|
|
if profile_response.status_code != 200: |
|
logger.warning(f"Failed to get user profile: {profile_response.status_code}") |
|
|
|
try: |
|
resp_data = profile_response.json() |
|
if 'message' in resp_data: |
|
return {"error": "api_error", "message": resp_data['message']} |
|
except: |
|
pass |
|
|
|
return {"error": "profile_error", "status": profile_response.status_code} |
|
|
|
profile_data = profile_response.json() |
|
|
|
if 'data' not in profile_data or 'user' not in profile_data['data']: |
|
return {"error": "not_found", "message": "User profile not found"} |
|
|
|
user_id = profile_data['data']['user']['id'] |
|
|
|
time.sleep(random.uniform(1, 2)) |
|
|
|
|
|
stories_url = f"https://www.instagram.com/api/v1/feed/user/{user_id}/story/" |
|
|
|
stories_response = session.get(stories_url) |
|
|
|
if stories_response.status_code != 200: |
|
return {"error": "stories_error", "status": stories_response.status_code} |
|
|
|
stories_data = stories_response.json() |
|
|
|
if 'reel' not in stories_data or not stories_data['reel'].get('items'): |
|
return {"error": "no_stories", "message": "No stories found"} |
|
|
|
stories = [] |
|
for item in stories_data['reel']['items']: |
|
story = { |
|
"id": item.get('pk', ''), |
|
"type": "video" if item.get('media_type') == 2 else "image", |
|
"timestamp": datetime.fromtimestamp(item.get('taken_at', 0)).isoformat() |
|
} |
|
|
|
|
|
if story["type"] == "video" and 'video_versions' in item and item['video_versions']: |
|
story["url"] = item['video_versions'][0]['url'] |
|
|
|
if 'view_count' in item: |
|
story["views"] = item['view_count'] |
|
|
|
elif 'image_versions2' in item and 'candidates' in item['image_versions2'] and item['image_versions2']['candidates']: |
|
story["url"] = item['image_versions2']['candidates'][0]['url'] |
|
else: |
|
continue |
|
|
|
stories.append(story) |
|
|
|
if not stories: |
|
return {"error": "no_valid_stories", "message": "No valid stories found"} |
|
|
|
result = { |
|
"data": stories, |
|
"count": len(stories), |
|
"username": username, |
|
"fetched_at": datetime.now().isoformat() |
|
} |
|
|
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"Error getting stories: {str(e)}") |
|
return {"error": "exception", "message": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/import", response_class=HTMLResponse) |
|
async def import_page(message: str = None, success: bool = False): |
|
status_class = "success" if success else "error" |
|
status_div = f'<div class="status {status_class}">{message}</div>' if message else "" |
|
|
|
html = """ |
|
<!DOCTYPE html> |
|
<html> |
|
<head> |
|
<title>Import Instagram Session</title> |
|
<style> |
|
body { font-family: Arial; max-width: 700px; margin: 0 auto; padding: 20px; } |
|
.container { background-color: #fff; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); padding: 25px; } |
|
h1 { color: #262626; text-align: center; } |
|
.form-group { margin-bottom: 15px; } |
|
label { display: block; margin-bottom: 10px; } |
|
textarea { width: 100%; padding: 10px; border: 1px solid #dbdbdb; border-radius: 3px; height: 150px; } |
|
input[type="text"] { width: 100%; padding: 10px; border: 1px solid #dbdbdb; border-radius: 3px; } |
|
.btn { background-color: #0095f6; border: none; color: white; padding: 10px 15px; border-radius: 4px; cursor: pointer; font-weight: bold; width: 100%; } |
|
.status { margin-top: 20px; padding: 10px; border-radius: 4px; } |
|
.success { background-color: #e8f5e9; color: #388e3c; } |
|
.error { background-color: #ffebee; color: #d32f2f; } |
|
.instructions { background-color: #f8f9fa; padding: 15px; border-radius: 4px; margin-bottom: 20px; } |
|
.code { font-family: monospace; background-color: #f1f1f1; padding: 2px 5px; border-radius: 3px; } |
|
ol { padding-left: 20px; } |
|
li { margin-bottom: 10px; } |
|
</style> |
|
</head> |
|
<body> |
|
<div class="container"> |
|
<h1>Import Instagram Session</h1> |
|
|
|
<div class="instructions"> |
|
<h3>How to get your Instagram cookies:</h3> |
|
<ol> |
|
<li>Log in to Instagram in your browser</li> |
|
<li>Open browser DevTools (F12 or right-click and select "Inspect")</li> |
|
<li>Go to the "Application" or "Storage" tab</li> |
|
<li>Under "Cookies", find "instagram.com"</li> |
|
<li>Find the <span class="code">sessionid</span> cookie and copy its value</li> |
|
<li>Also copy the <span class="code">ds_user_id</span> and <span class="code">csrftoken</span> cookies if available</li> |
|
<li>Paste the cookies in the format: <span class="code">name=value; name2=value2;</span></li> |
|
</ol> |
|
</div> |
|
|
|
""" + status_div + """ |
|
|
|
<form method="post" action="/import-cookies"> |
|
<div class="form-group"> |
|
<label for="cookies">Paste your Instagram cookies here:</label> |
|
<textarea id="cookies" name="cookies" placeholder="sessionid=YOUR_SESSION_ID; ds_user_id=YOUR_USER_ID; csrftoken=YOUR_CSRF_TOKEN;" required></textarea> |
|
</div> |
|
<div class="form-group"> |
|
<label for="username">Your Instagram username:</label> |
|
<input type="text" id="username" name="username" required> |
|
</div> |
|
<button type="submit" class="btn">Import Session</button> |
|
</form> |
|
</div> |
|
</body> |
|
</html> |
|
""" |
|
|
|
return HTMLResponse(content=html) |
|
|
|
@app.post("/import-cookies") |
|
async def import_cookies(cookies: str = Form(...), username: str = Form(...)): |
|
logger.info(f"Importing cookies for {username}") |
|
|
|
try: |
|
cookie_objects = parse_cookie_string(cookies) |
|
|
|
if not cookie_objects: |
|
logger.warning("No valid cookies found in input") |
|
return import_page(message="No valid cookies found in input") |
|
|
|
|
|
has_sessionid = any(c['name'] == 'sessionid' for c in cookie_objects) |
|
|
|
if not has_sessionid: |
|
logger.warning("Missing essential sessionid cookie") |
|
return import_page(message="Missing essential sessionid cookie") |
|
|
|
|
|
if save_instagram_session(cookie_objects, username): |
|
|
|
session = get_instagram_session() |
|
if session: |
|
logger.info("Session imported successfully") |
|
return RedirectResponse(url="/status", status_code=303) |
|
else: |
|
logger.error("Failed to get session after saving") |
|
return import_page(message="Failed to create session") |
|
else: |
|
logger.error("Failed to save session") |
|
return import_page(message="Failed to save session") |
|
|
|
except Exception as e: |
|
logger.error(f"Import error: {str(e)}") |
|
return import_page(message=f"Import error: {str(e)}") |
|
|
|
@app.get("/status", response_class=HTMLResponse) |
|
async def status_page(): |
|
session = get_instagram_session() |
|
session_valid = session is not None |
|
|
|
cache_count = len(STORY_CACHE) |
|
story_count = sum(len(data.get("data", [])) for data in STORY_CACHE.values()) |
|
|
|
status_badge = 'Connected' if session_valid else 'Disconnected' |
|
status_class = 'connected' if session_valid else 'disconnected' |
|
|
|
if session_valid: |
|
status_info = f"✅ Logged in as <strong>{IG_SESSION_USERNAME}</strong><br>" |
|
status_info += f"Session expires: {IG_SESSION_EXPIRY.strftime('%Y-%m-%d %H:%M:%S')}" |
|
else: |
|
status_info = "❌ No active Instagram session<br>Please import your cookies to use the API" |
|
|
|
html = f""" |
|
<!DOCTYPE html> |
|
<html> |
|
<head> |
|
<title>API Status</title> |
|
<style> |
|
body {{ font-family: Arial; max-width: 600px; margin: 0 auto; padding: 20px; }} |
|
.container {{ background-color: #fff; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); padding: 25px; }} |
|
h1 {{ color: #262626; text-align: center; }} |
|
.status-badge {{ display: inline-block; padding: 5px 10px; border-radius: 4px; font-weight: bold; margin-left: 10px; }} |
|
.connected {{ background-color: #e8f5e9; color: #388e3c; }} |
|
.disconnected {{ background-color: #ffebee; color: #d32f2f; }} |
|
.info {{ background-color: #f5f5f5; border-radius: 4px; padding: 15px; margin: 15px 0; }} |
|
.btn {{ background-color: #0095f6; border: none; color: white; padding: 10px 15px; border-radius: 4px; text-decoration: none; display: inline-block; margin-right: 10px; }} |
|
.btn-danger {{ background-color: #f44336; }} |
|
table {{ width: 100%; border-collapse: collapse; margin-top: 15px; }} |
|
th, td {{ text-align: left; padding: 8px; border-bottom: 1px solid #ddd; }} |
|
</style> |
|
</head> |
|
<body> |
|
<div class="container"> |
|
<h1>Instagram API Status</h1> |
|
|
|
<h2>Connection Status |
|
<span class="status-badge {status_class}">{status_badge}</span> |
|
</h2> |
|
|
|
<div class="info"> |
|
{status_info} |
|
</div> |
|
|
|
<a href="/import" class="btn">{"Re-Import Cookies" if session_valid else "Import Cookies"}</a> |
|
{"<a href='/logout' class='btn btn-danger'>Logout</a>" if session_valid else ""} |
|
|
|
<h2>API Endpoints</h2> |
|
<table> |
|
<tr><th>Endpoint</th><th>Description</th></tr> |
|
<tr><td><code>/stories/{{username}}</code></td><td>Get stories for a user</td></tr> |
|
<tr><td><code>/download/{{url}}</code></td><td>Download media from a URL</td></tr> |
|
<tr><td><code>/status</code></td><td>View API status</td></tr> |
|
</table> |
|
|
|
<h2>Cache Info</h2> |
|
<div class="info"> |
|
<p>Cached users: {cache_count}</p> |
|
<p>Total stories cached: {story_count}</p> |
|
</div> |
|
</div> |
|
</body> |
|
</html> |
|
""" |
|
|
|
return HTMLResponse(content=html) |
|
|
|
@app.get("/logout") |
|
async def logout(): |
|
clear_instagram_session() |
|
return RedirectResponse(url="/status", status_code=303) |
|
|
|
|
|
|
|
|
|
class RateLimiter: |
|
def __init__(self, max_requests: int, time_window: timedelta): |
|
self.max_requests = max_requests |
|
self.time_window = time_window |
|
self.requests: Dict[str, list] = defaultdict(list) |
|
|
|
def _cleanup_old_requests(self, user_ip: str) -> None: |
|
"""Remove requests that are outside the time window.""" |
|
current_time = time.time() |
|
self.requests[user_ip] = [ |
|
timestamp for timestamp in self.requests[user_ip] |
|
if current_time - timestamp < self.time_window.total_seconds() |
|
] |
|
|
|
def is_rate_limited(self, user_ip: str) -> bool: |
|
"""Check if the user has exceeded their rate limit.""" |
|
self._cleanup_old_requests(user_ip) |
|
|
|
|
|
current_count = len(self.requests[user_ip]) |
|
|
|
|
|
current_time = time.time() |
|
self.requests[user_ip].append(current_time) |
|
|
|
|
|
return (current_count + 1) > self.max_requests |
|
|
|
def get_current_count(self, user_ip: str) -> int: |
|
"""Get the current request count for an IP.""" |
|
self._cleanup_old_requests(user_ip) |
|
return len(self.requests[user_ip]) |
|
|
|
|
|
|
|
rate_limiter = RateLimiter( |
|
max_requests=6, |
|
time_window=timedelta(days=1) |
|
) |
|
|
|
def get_user_ip(request: Request) -> str: |
|
"""Helper function to get user's IP address.""" |
|
forwarded = request.headers.get("X-Forwarded-For") |
|
if forwarded: |
|
return forwarded.split(",")[0] |
|
return request.client.host |
|
|
|
|
|
class ApiRotator: |
|
def __init__(self, apis): |
|
self.apis = apis |
|
self.last_successful_index = None |
|
|
|
def get_prioritized_apis(self): |
|
if self.last_successful_index is not None: |
|
|
|
rotated_apis = ( |
|
[self.apis[self.last_successful_index]] + |
|
self.apis[:self.last_successful_index] + |
|
self.apis[self.last_successful_index+1:] |
|
) |
|
return rotated_apis |
|
return self.apis |
|
|
|
def update_last_successful(self, index): |
|
self.last_successful_index = index |
|
|
|
@app.get("/stories/{username}") |
|
async def get_stories(request: Request, username: str, cached: bool = False): |
|
user_ip = get_user_ip(request) |
|
if rate_limiter.is_rate_limited(user_ip): |
|
current_count = rate_limiter.get_current_count(user_ip) |
|
raise HTTPException( |
|
status_code=429, |
|
detail={ |
|
"error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", |
|
"url": "https://t.me/chrunoss" |
|
} |
|
) |
|
logger.info(f"Request for @{username} stories") |
|
|
|
if cached: |
|
cached_result = get_cached_stories(username) |
|
if cached_result: |
|
return {**cached_result, "from_cache": True} |
|
|
|
try: |
|
if should_rate_limit_request(username): |
|
cached_result = get_cached_stories(username) |
|
if cached_result: |
|
return {**cached_result, "from_cache": True, "rate_limited": True} |
|
else: |
|
raise HTTPException( |
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS, |
|
detail="Rate limit exceeded. Please try again later." |
|
) |
|
|
|
result = get_instagram_stories(username) |
|
|
|
if not result: |
|
raise HTTPException( |
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
detail="No valid Instagram session. Please import your cookies first." |
|
) |
|
|
|
if "error" in result: |
|
if result["error"] == "api_error": |
|
cached_result = get_cached_stories(username) |
|
if cached_result: |
|
return {**cached_result, "from_cache": True, "error_occurred": True} |
|
|
|
raise HTTPException( |
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS, |
|
detail=result.get("message", "Instagram API error") |
|
) |
|
|
|
elif result["error"] in ["profile_error", "not_found"]: |
|
raise HTTPException( |
|
status_code=status.HTTP_404_NOT_FOUND, |
|
detail="Profile not found" |
|
) |
|
|
|
elif result["error"] in ["no_stories", "no_valid_stories"]: |
|
raise HTTPException( |
|
status_code=status.HTTP_404_NOT_FOUND, |
|
detail="No stories found for this user" |
|
) |
|
|
|
else: |
|
raise HTTPException( |
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
detail=result.get("message", "Failed to get stories") |
|
) |
|
|
|
save_to_cache(username, result) |
|
return result |
|
|
|
except HTTPException: |
|
raise |
|
except Exception as e: |
|
logger.error(f"Error: {str(e)}") |
|
|
|
cached_result = get_cached_stories(username) |
|
if cached_result: |
|
return {**cached_result, "from_cache": True, "error_occurred": True} |
|
|
|
raise HTTPException( |
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
detail=str(e) |
|
) |
|
|
|
@app.get("/download/{url:path}") |
|
async def download_media(url: str): |
|
logger.info(f"Download request for media") |
|
|
|
try: |
|
if not url.startswith(("https://", "http://")): |
|
raise HTTPException( |
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
detail="Invalid URL format" |
|
) |
|
|
|
session = get_instagram_session() or requests.Session() |
|
headers = { |
|
"User-Agent": random.choice(USER_AGENTS), |
|
"Referer": "https://www.instagram.com/", |
|
} |
|
|
|
response = session.get(url, headers=headers, stream=True, timeout=15) |
|
response.raise_for_status() |
|
|
|
if "Content-Type" in response.headers: |
|
content_type = response.headers["Content-Type"] |
|
elif url.endswith((".jpg", ".jpeg")): |
|
content_type = "image/jpeg" |
|
elif url.endswith(".mp4"): |
|
content_type = "video/mp4" |
|
else: |
|
content_type = "application/octet-stream" |
|
|
|
logger.info(f"Media downloaded: {content_type}") |
|
|
|
return StreamingResponse( |
|
response.iter_content(chunk_size=8192), |
|
media_type=content_type, |
|
headers={"Content-Disposition": f"attachment; filename={url.split('/')[-1]}"} |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Download failed: {str(e)}") |
|
raise HTTPException( |
|
status_code=status.HTTP_502_BAD_GATEWAY, |
|
detail="Failed to download media" |
|
) |
|
|
|
|
|
@app.get("/health") |
|
async def health_check(): |
|
session = get_instagram_session() |
|
return { |
|
"status": "ok", |
|
"authenticated": session is not None, |
|
"username": IG_SESSION_USERNAME, |
|
"session_expires": IG_SESSION_EXPIRY.isoformat() if IG_SESSION_EXPIRY else None, |
|
"cache_items": len(STORY_CACHE), |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
|
|
@app.on_event("startup") |
|
def startup_event(): |
|
|
|
try: |
|
count = 0 |
|
for filename in os.listdir(CACHE_DIR): |
|
if filename.endswith('.json'): |
|
try: |
|
with open(os.path.join(CACHE_DIR, filename), 'r') as f: |
|
cache_data = json.load(f) |
|
|
|
if "data" in cache_data and "expires" in cache_data: |
|
expires = datetime.fromisoformat(cache_data["expires"]) |
|
if expires > datetime.now(): |
|
key = filename.replace('.json', '') |
|
STORY_CACHE[key] = cache_data["data"] |
|
CACHE_EXPIRY[key] = expires |
|
count += 1 |
|
except Exception as e: |
|
logger.warning(f"Couldn't load cache file {filename}: {e}") |
|
|
|
logger.info(f"Loaded {count} items from cache") |
|
except Exception as e: |
|
logger.error(f"Cache loading error: {e}") |
|
|
|
|
|
get_instagram_session() |