back / app.py
Chrunos's picture
Update app.py
55d9ffc verified
import time
from fastapi import FastAPI, Request, HTTPException
from fastapi.concurrency import run_in_threadpool
import yt_dlp
import urllib.parse
import os
from datetime import datetime, timedelta
from dotenv import load_dotenv
import tempfile
from pathlib import Path
from collections import defaultdict
import logging
import gc
from typing import Dict, Any
import cloudscraper
from fastapi.staticfiles import StaticFiles
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv()
app = FastAPI()
# Define a global temporary download directory
global_download_dir = tempfile.mkdtemp()
app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads")
# Rate limiting dictionary
class RateLimiter:
def __init__(self, max_requests: int, time_window: timedelta):
self.max_requests = max_requests
self.time_window = time_window
self.requests: Dict[str, list] = defaultdict(list)
def _cleanup_old_requests(self, user_ip: str) -> None:
"""Remove requests that are outside the time window."""
current_time = time.time()
self.requests[user_ip] = [
timestamp for timestamp in self.requests[user_ip]
if current_time - timestamp < self.time_window.total_seconds()
]
def is_rate_limited(self, user_ip: str) -> bool:
"""Check if the user has exceeded their rate limit."""
self._cleanup_old_requests(user_ip)
# Get current count after cleanup
current_count = len(self.requests[user_ip])
# Add current request timestamp (incrementing the count)
current_time = time.time()
self.requests[user_ip].append(current_time)
# Check if user has exceeded the maximum requests
return (current_count + 1) > self.max_requests
def get_current_count(self, user_ip: str) -> int:
"""Get the current request count for an IP."""
self._cleanup_old_requests(user_ip)
return len(self.requests[user_ip])
# Initialize rate limiter with 100 requests per day
rate_limiter = RateLimiter(
max_requests=12,
time_window=timedelta(days=1)
)
def get_user_ip(request: Request) -> str:
"""Helper function to get user's IP address."""
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0]
return request.client.host
restricted_domain = "chrunos.com"
TRACT_API = os.getenv("EXTRACT_API")
ALT_API = os.getenv("ALT_API")
target_domains = [
"pornhub.com",
"xhamster.com",
"eporner.com",
"youporn.com"
]
def extract_video_info(video_url: str) -> str:
if any(domain in video_url for domain in target_domains):
EXTRACT_API = TRACT_API
else:
EXTRACT_API = ALT_API
api_url = f'{EXTRACT_API}?url={video_url}'
logger.info(api_url)
session = cloudscraper.create_scraper()
try:
response = session.get(api_url, timeout=20)
if response.status_code == 200:
json_response = response.json()
result = []
# 检查 formats 列表是否存在且不为空
if 'formats' in json_response:
for format_item in json_response['formats']:
format_url = format_item.get('url')
format_id = format_item.get('format_id')
p_cookies = format_item.get('cookies')
if format_id and format_url:
result.append({
"url": format_url,
"format_id": format_id,
"cookies": p_cookies
})
title = json_response.get('title')
logger.info(title)
if "pornhub.com" in video_url:
p_result = [item for item in result if 'hls' in item['format_id']]
last_item = p_result[-1]
second_last_item = p_result[-2]
last_item["format_id"] = f'{last_item["format_id"]} - Chrunos Shortcuts Premium Only'
last_item["url"] = 'https://chrunos.com/premium-shortcuts/'
second_last_item["format_id"] = f'{second_last_item["format_id"]} - Chrunos Shortcuts Premium Only'
second_last_item["url"] = 'https://chrunos.com/premium-shortcuts/'
return p_result
else:
new_result = result
# Check if new_result has more than one item
if len(new_result) > 3:
for i in range(3, len(new_result)):
item = new_result[i]
item["format_id"] = f'{item["format_id"]} - Chrunos Shortcuts Premium Only'
item["url"] = 'https://chrunos.com/premium-shortcuts/'
elif 2 <= len(new_result) <= 3:
last_item = new_result[-1]
last_item["format_id"] = f'{last_item["format_id"]} - Chrunos Shortcuts Premium Only'
last_item["url"] = 'https://chrunos.com/premium-shortcuts/'
elif len(new_result) == 1:
new_item = {"url": "https://chrunos.com/premium-shortcuts/",
"format_id": "Best Qaulity Video - Chrunos Shortcuts Premium Only"
}
new_result.append(new_item)
return new_result
else:
if 'url' in json_response:
download_url = json_response.get('url')
thumbnail_url = json_response.get('thumbnail')
return [
{"url": download_url,
"format_id": "Normal Quality Video"
},
{"url": thumbnail_url,
"format_id": "thumbnail"},
{"url": "https://chrunos.com/premium-shortcuts/",
"format_id": "Best Qaulity Video - Chrunos Shortcuts Premium Only"}
]
return {"error": "No formats available. Report Error on Telegram"}
else:
return {"error": f"Request failed with status code {response.status_code}, API: {api_url}"}
except Exception as e:
logger.error(f"An error occurred: {e}")
return {"error": str(e)}
@app.post("/test")
async def test_download(request: Request):
user_ip = get_user_ip(request)
if rate_limiter.is_rate_limited(user_ip):
current_count = rate_limiter.get_current_count(user_ip)
raise HTTPException(
status_code=429,
detail={
"error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.",
"url": "https://t.me/chrunoss"
}
)
data = await request.json()
video_url = data.get('url')
response = extract_video_info(video_url)
return response
@app.post("/hls")
async def download_hls_video(request: Request):
data = await request.json()
hls_url = data.get('url')
base_url = str(request.base_url)
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
ydl_opts = {
'format': 'best',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'merge_output_format': 'mp4'
}
try:
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([hls_url]))
except Exception as e:
return {"error": f"Download failed: {str(e)}"}
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{base_url}file/{encoded_filename}"
gc.collect()
return {"url": download_url}