File size: 8,221 Bytes
f647181 55d9ffc f647181 55d9ffc f647181 c40a7a0 f647181 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
import time
from fastapi import FastAPI, Request, HTTPException
from fastapi.concurrency import run_in_threadpool
import yt_dlp
import urllib.parse
import os
from datetime import datetime, timedelta
from dotenv import load_dotenv
import tempfile
from pathlib import Path
from collections import defaultdict
import logging
import gc
from typing import Dict, Any
import cloudscraper
from fastapi.staticfiles import StaticFiles
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
load_dotenv()
app = FastAPI()
# Define a global temporary download directory
global_download_dir = tempfile.mkdtemp()
app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads")
# Rate limiting dictionary
class RateLimiter:
def __init__(self, max_requests: int, time_window: timedelta):
self.max_requests = max_requests
self.time_window = time_window
self.requests: Dict[str, list] = defaultdict(list)
def _cleanup_old_requests(self, user_ip: str) -> None:
"""Remove requests that are outside the time window."""
current_time = time.time()
self.requests[user_ip] = [
timestamp for timestamp in self.requests[user_ip]
if current_time - timestamp < self.time_window.total_seconds()
]
def is_rate_limited(self, user_ip: str) -> bool:
"""Check if the user has exceeded their rate limit."""
self._cleanup_old_requests(user_ip)
# Get current count after cleanup
current_count = len(self.requests[user_ip])
# Add current request timestamp (incrementing the count)
current_time = time.time()
self.requests[user_ip].append(current_time)
# Check if user has exceeded the maximum requests
return (current_count + 1) > self.max_requests
def get_current_count(self, user_ip: str) -> int:
"""Get the current request count for an IP."""
self._cleanup_old_requests(user_ip)
return len(self.requests[user_ip])
# Initialize rate limiter with 100 requests per day
rate_limiter = RateLimiter(
max_requests=12,
time_window=timedelta(days=1)
)
def get_user_ip(request: Request) -> str:
"""Helper function to get user's IP address."""
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0]
return request.client.host
restricted_domain = "chrunos.com"
TRACT_API = os.getenv("EXTRACT_API")
ALT_API = os.getenv("ALT_API")
target_domains = [
"pornhub.com",
"xhamster.com",
"eporner.com",
"youporn.com"
]
def extract_video_info(video_url: str) -> str:
if any(domain in video_url for domain in target_domains):
EXTRACT_API = TRACT_API
else:
EXTRACT_API = ALT_API
api_url = f'{EXTRACT_API}?url={video_url}'
logger.info(api_url)
session = cloudscraper.create_scraper()
try:
response = session.get(api_url, timeout=20)
if response.status_code == 200:
json_response = response.json()
result = []
# 检查 formats 列表是否存在且不为空
if 'formats' in json_response:
for format_item in json_response['formats']:
format_url = format_item.get('url')
format_id = format_item.get('format_id')
p_cookies = format_item.get('cookies')
if format_id and format_url:
result.append({
"url": format_url,
"format_id": format_id,
"cookies": p_cookies
})
title = json_response.get('title')
logger.info(title)
if "pornhub.com" in video_url:
p_result = [item for item in result if 'hls' in item['format_id']]
last_item = p_result[-1]
second_last_item = p_result[-2]
last_item["format_id"] = f'{last_item["format_id"]} - Chrunos Shortcuts Premium Only'
last_item["url"] = 'https://chrunos.com/premium-shortcuts/'
second_last_item["format_id"] = f'{second_last_item["format_id"]} - Chrunos Shortcuts Premium Only'
second_last_item["url"] = 'https://chrunos.com/premium-shortcuts/'
return p_result
else:
new_result = result
# Check if new_result has more than one item
if len(new_result) > 3:
for i in range(3, len(new_result)):
item = new_result[i]
item["format_id"] = f'{item["format_id"]} - Chrunos Shortcuts Premium Only'
item["url"] = 'https://chrunos.com/premium-shortcuts/'
elif 2 <= len(new_result) <= 3:
last_item = new_result[-1]
last_item["format_id"] = f'{last_item["format_id"]} - Chrunos Shortcuts Premium Only'
last_item["url"] = 'https://chrunos.com/premium-shortcuts/'
elif len(new_result) == 1:
new_item = {"url": "https://chrunos.com/premium-shortcuts/",
"format_id": "Best Qaulity Video - Chrunos Shortcuts Premium Only"
}
new_result.append(new_item)
return new_result
else:
if 'url' in json_response:
download_url = json_response.get('url')
thumbnail_url = json_response.get('thumbnail')
return [
{"url": download_url,
"format_id": "Normal Quality Video"
},
{"url": thumbnail_url,
"format_id": "thumbnail"},
{"url": "https://chrunos.com/premium-shortcuts/",
"format_id": "Best Qaulity Video - Chrunos Shortcuts Premium Only"}
]
return {"error": "No formats available. Report Error on Telegram"}
else:
return {"error": f"Request failed with status code {response.status_code}, API: {api_url}"}
except Exception as e:
logger.error(f"An error occurred: {e}")
return {"error": str(e)}
@app.post("/test")
async def test_download(request: Request):
user_ip = get_user_ip(request)
if rate_limiter.is_rate_limited(user_ip):
current_count = rate_limiter.get_current_count(user_ip)
raise HTTPException(
status_code=429,
detail={
"error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.",
"url": "https://t.me/chrunoss"
}
)
data = await request.json()
video_url = data.get('url')
response = extract_video_info(video_url)
return response
@app.post("/hls")
async def download_hls_video(request: Request):
data = await request.json()
hls_url = data.get('url')
base_url = str(request.base_url)
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
ydl_opts = {
'format': 'best',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'merge_output_format': 'mp4'
}
try:
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([hls_url]))
except Exception as e:
return {"error": f"Download failed: {str(e)}"}
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{base_url}file/{encoded_filename}"
gc.collect()
return {"url": download_url}
|