|
import time |
|
from fastapi import FastAPI, Request, HTTPException |
|
from fastapi.concurrency import run_in_threadpool |
|
import yt_dlp |
|
import urllib.parse |
|
import os |
|
from datetime import datetime, timedelta |
|
from dotenv import load_dotenv |
|
import tempfile |
|
from pathlib import Path |
|
from collections import defaultdict |
|
import logging |
|
import gc |
|
from typing import Dict, Any |
|
import cloudscraper |
|
from fastapi.staticfiles import StaticFiles |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
load_dotenv() |
|
app = FastAPI() |
|
|
|
|
|
|
|
global_download_dir = tempfile.mkdtemp() |
|
app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads") |
|
|
|
|
|
class RateLimiter: |
|
def __init__(self, max_requests: int, time_window: timedelta): |
|
self.max_requests = max_requests |
|
self.time_window = time_window |
|
self.requests: Dict[str, list] = defaultdict(list) |
|
|
|
def _cleanup_old_requests(self, user_ip: str) -> None: |
|
"""Remove requests that are outside the time window.""" |
|
current_time = time.time() |
|
self.requests[user_ip] = [ |
|
timestamp for timestamp in self.requests[user_ip] |
|
if current_time - timestamp < self.time_window.total_seconds() |
|
] |
|
|
|
def is_rate_limited(self, user_ip: str) -> bool: |
|
"""Check if the user has exceeded their rate limit.""" |
|
self._cleanup_old_requests(user_ip) |
|
|
|
|
|
current_count = len(self.requests[user_ip]) |
|
|
|
|
|
current_time = time.time() |
|
self.requests[user_ip].append(current_time) |
|
|
|
|
|
return (current_count + 1) > self.max_requests |
|
|
|
def get_current_count(self, user_ip: str) -> int: |
|
"""Get the current request count for an IP.""" |
|
self._cleanup_old_requests(user_ip) |
|
return len(self.requests[user_ip]) |
|
|
|
|
|
|
|
rate_limiter = RateLimiter( |
|
max_requests=12, |
|
time_window=timedelta(days=1) |
|
) |
|
|
|
def get_user_ip(request: Request) -> str: |
|
"""Helper function to get user's IP address.""" |
|
forwarded = request.headers.get("X-Forwarded-For") |
|
if forwarded: |
|
return forwarded.split(",")[0] |
|
return request.client.host |
|
|
|
|
|
|
|
restricted_domain = "chrunos.com" |
|
|
|
|
|
|
|
TRACT_API = os.getenv("EXTRACT_API") |
|
ALT_API = os.getenv("ALT_API") |
|
target_domains = [ |
|
"pornhub.com", |
|
"xhamster.com", |
|
"eporner.com", |
|
"youporn.com" |
|
] |
|
|
|
def extract_video_info(video_url: str) -> str: |
|
if any(domain in video_url for domain in target_domains): |
|
EXTRACT_API = TRACT_API |
|
else: |
|
EXTRACT_API = ALT_API |
|
api_url = f'{EXTRACT_API}?url={video_url}' |
|
logger.info(api_url) |
|
session = cloudscraper.create_scraper() |
|
try: |
|
response = session.get(api_url, timeout=20) |
|
|
|
if response.status_code == 200: |
|
json_response = response.json() |
|
result = [] |
|
|
|
if 'formats' in json_response: |
|
for format_item in json_response['formats']: |
|
format_url = format_item.get('url') |
|
format_id = format_item.get('format_id') |
|
p_cookies = format_item.get('cookies') |
|
if format_id and format_url: |
|
result.append({ |
|
"url": format_url, |
|
"format_id": format_id, |
|
"cookies": p_cookies |
|
}) |
|
|
|
title = json_response.get('title') |
|
logger.info(title) |
|
if "pornhub.com" in video_url: |
|
p_result = [item for item in result if 'hls' in item['format_id']] |
|
last_item = p_result[-1] |
|
second_last_item = p_result[-2] |
|
last_item["format_id"] = f'{last_item["format_id"]} - Chrunos Shortcuts Premium Only' |
|
last_item["url"] = 'https://chrunos.com/premium-shortcuts/' |
|
second_last_item["format_id"] = f'{second_last_item["format_id"]} - Chrunos Shortcuts Premium Only' |
|
second_last_item["url"] = 'https://chrunos.com/premium-shortcuts/' |
|
return p_result |
|
else: |
|
new_result = result |
|
|
|
if len(new_result) > 3: |
|
for i in range(3, len(new_result)): |
|
item = new_result[i] |
|
item["format_id"] = f'{item["format_id"]} - Chrunos Shortcuts Premium Only' |
|
item["url"] = 'https://chrunos.com/premium-shortcuts/' |
|
elif 2 <= len(new_result) <= 3: |
|
last_item = new_result[-1] |
|
last_item["format_id"] = f'{last_item["format_id"]} - Chrunos Shortcuts Premium Only' |
|
last_item["url"] = 'https://chrunos.com/premium-shortcuts/' |
|
elif len(new_result) == 1: |
|
new_item = {"url": "https://chrunos.com/premium-shortcuts/", |
|
"format_id": "Best Qaulity Video - Chrunos Shortcuts Premium Only" |
|
} |
|
new_result.append(new_item) |
|
|
|
return new_result |
|
else: |
|
if 'url' in json_response: |
|
download_url = json_response.get('url') |
|
thumbnail_url = json_response.get('thumbnail') |
|
return [ |
|
{"url": download_url, |
|
"format_id": "Normal Quality Video" |
|
}, |
|
{"url": thumbnail_url, |
|
"format_id": "thumbnail"}, |
|
{"url": "https://chrunos.com/premium-shortcuts/", |
|
"format_id": "Best Qaulity Video - Chrunos Shortcuts Premium Only"} |
|
] |
|
return {"error": "No formats available. Report Error on Telegram"} |
|
else: |
|
return {"error": f"Request failed with status code {response.status_code}, API: {api_url}"} |
|
except Exception as e: |
|
logger.error(f"An error occurred: {e}") |
|
return {"error": str(e)} |
|
|
|
|
|
@app.post("/test") |
|
async def test_download(request: Request): |
|
user_ip = get_user_ip(request) |
|
if rate_limiter.is_rate_limited(user_ip): |
|
current_count = rate_limiter.get_current_count(user_ip) |
|
raise HTTPException( |
|
status_code=429, |
|
detail={ |
|
"error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", |
|
"url": "https://t.me/chrunoss" |
|
} |
|
) |
|
data = await request.json() |
|
video_url = data.get('url') |
|
response = extract_video_info(video_url) |
|
return response |
|
|
|
|
|
@app.post("/hls") |
|
async def download_hls_video(request: Request): |
|
data = await request.json() |
|
hls_url = data.get('url') |
|
base_url = str(request.base_url) |
|
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
|
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s') |
|
|
|
ydl_opts = { |
|
'format': 'best', |
|
'outtmpl': output_template, |
|
'quiet': True, |
|
'no_warnings': True, |
|
'noprogress': True, |
|
'merge_output_format': 'mp4' |
|
} |
|
|
|
try: |
|
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([hls_url])) |
|
except Exception as e: |
|
return {"error": f"Download failed: {str(e)}"} |
|
|
|
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) |
|
if not downloaded_files: |
|
return {"error": "Download failed"} |
|
|
|
downloaded_file = downloaded_files[0] |
|
encoded_filename = urllib.parse.quote(downloaded_file.name) |
|
download_url = f"{base_url}file/{encoded_filename}" |
|
gc.collect() |
|
return {"url": download_url} |
|
|