|
import random |
|
import argparse |
|
import gzip |
|
import json |
|
import logging |
|
import os |
|
import shutil |
|
import sys |
|
import datetime as dt |
|
import subprocess |
|
from typing import List, Optional |
|
|
|
import pytubefix.exceptions as exceptions |
|
from pytubefix import __version__ |
|
from pytubefix import CaptionQuery, Playlist, Stream |
|
from pytubefix.helpers import safe_filename, setup_logger |
|
from pytubefix import YouTube |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
def build_playback_report(youtube: YouTube) -> None: |
|
"""Serialize the request data to json for offline debugging. |
|
|
|
:param YouTube youtube: |
|
A YouTube object. |
|
""" |
|
ts = int(dt.datetime.now(dt.timezone.utc).timestamp()) |
|
fp = os.path.join(os.getcwd(), f"yt-video-{youtube.video_id}-{ts}.json.gz") |
|
|
|
js = youtube.js |
|
watch_html = youtube.watch_html |
|
vid_info = youtube.vid_info |
|
|
|
with gzip.open(fp, "wb") as fh: |
|
fh.write( |
|
json.dumps( |
|
{ |
|
"url": youtube.watch_url, |
|
"js": js, |
|
"watch_html": watch_html, |
|
"video_info": vid_info, |
|
} |
|
).encode("utf8"), |
|
) |
|
|
|
def display_progress_bar(bytes_received: int, filesize: int, ch: str = "β", scale: float = 0.55) -> None: |
|
"""Display a simple, pretty progress bar. |
|
|
|
Example: |
|
~~~~~~~~ |
|
PSY - GANGNAM STYLE(αα
‘αΌαα
‘α·αα
³αα
‘αα
΅α―) MV.mp4 |
|
β³ |βββββββββββββββββββββββββββββββββββββββ| 100.0% |
|
|
|
:param int bytes_received: |
|
The delta between the total file size (bytes) and bytes already |
|
written to disk. |
|
:param int filesize: |
|
File size of the media stream in bytes. |
|
:param str ch: |
|
Character to use for presenting progress segment. |
|
:param float scale: |
|
Scale multiplier to reduce progress bar size. |
|
""" |
|
columns = shutil.get_terminal_size().columns |
|
max_width = int(columns * scale) |
|
|
|
filled = int(round(max_width * bytes_received / float(filesize))) |
|
remaining = max_width - filled |
|
progress_bar = ch * filled + " " * remaining |
|
percent = round(100.0 * bytes_received / float(filesize), 1) |
|
text = f" β³ |{progress_bar}| {percent}%\r" |
|
sys.stdout.write(text) |
|
sys.stdout.flush() |
|
|
|
def on_progress(stream: Stream, chunk: bytes, bytes_remaining: int) -> None: |
|
filesize = stream.filesize |
|
bytes_received = filesize - bytes_remaining |
|
display_progress_bar(bytes_received, filesize) |
|
|
|
def _download(stream: Stream, target: Optional[str] = None, filename: Optional[str] = None) -> None: |
|
filesize_megabytes = stream.filesize // 1048576 |
|
print(f"{filename or stream.default_filename} | {filesize_megabytes} MB") |
|
file_path = stream.get_file_path(filename=filename, output_path=target) |
|
if stream.exists_at_path(file_path): |
|
print(f"Already downloaded at:\n{file_path}") |
|
return |
|
|
|
stream.download(output_path=target, filename=filename) |
|
sys.stdout.write("\n") |
|
|
|
def _unique_name(base: str, subtype: str, media_type: str, target: str) -> str: |
|
""" |
|
Given a base name, the file format, and the target directory, will generate |
|
a filename unique for that directory and file format. |
|
|
|
:param str base: |
|
The given base-name. |
|
:param str subtype: |
|
The filetype of the video which will be downloaded. |
|
:param str media_type: |
|
The media_type of the file, ie. "audio" or "video" |
|
:param Path target: |
|
Target directory for download. |
|
""" |
|
counter = 0 |
|
while True: |
|
file_name = f"{base}_{media_type}_{counter}" |
|
file_path = os.path.join(target, f"{file_name}.{subtype}") |
|
if not os.path.exists(file_path): |
|
return file_name |
|
counter += 1 |
|
|
|
def ffmpeg_process(youtube: YouTube, resolution: str, target: Optional[str] = None) -> None: |
|
""" |
|
Decides the correct video stream to download, then calls _ffmpeg_downloader. |
|
|
|
:param YouTube youtube: |
|
A valid YouTube object. |
|
:param str resolution: |
|
YouTube video resolution. |
|
:param str target: |
|
Target directory for download |
|
""" |
|
youtube.register_on_progress_callback(on_progress) |
|
target = target or os.getcwd() |
|
|
|
if resolution == None or resolution == "best": |
|
highest_quality_stream = youtube.streams.filter(progressive=False).order_by("resolution").last() |
|
mp4_stream = youtube.streams.filter(progressive=False, subtype="mp4").order_by("resolution").last() |
|
if highest_quality_stream.resolution == mp4_stream.resolution: |
|
video_stream = mp4_stream |
|
else: |
|
video_stream = highest_quality_stream |
|
else: |
|
video_stream = youtube.streams.filter(progressive=False, resolution=resolution).first() |
|
|
|
if not video_stream: |
|
print(f"No streams found for resolution {resolution}") |
|
return |
|
|
|
audio_stream = youtube.streams.filter(progressive=False).order_by("abr").last() |
|
|
|
video_file_name = _unique_name(youtube.title, "mp4", "video", target) |
|
audio_file_name = _unique_name(youtube.title, "mp4", "audio", target) |
|
|
|
video_path = video_stream.get_file_path(filename=video_file_name, output_path=target) |
|
audio_path = audio_stream.get_file_path(filename=audio_file_name, output_path=target) |
|
|
|
if os.path.exists(video_path) and os.path.exists(audio_path): |
|
print("Already downloaded both video and audio.") |
|
return |
|
|
|
_download(video_stream, target=target, filename=video_file_name) |
|
_download(audio_stream, target=target, filename=audio_file_name) |
|
|
|
|
|
command = ["ffmpeg", "-i", video_path, "-i", audio_path, "-c:v", "copy", "-c:a", "aac", "-strict", "experimental", f"{target}/{youtube.title}.mp4"] |
|
|
|
|
|
subprocess.run(command) |
|
|
|
def download_by_resolution(youtube: YouTube, resolution: str, target: Optional[str] = None) -> None: |
|
"""Download a stream by the specified resolution. |
|
|
|
:param YouTube youtube: |
|
A valid YouTube object. |
|
:param str resolution: |
|
The desired resolution of the stream. |
|
:param Optional[str] target: |
|
The target directory for the download. |
|
""" |
|
print(f"Downloading {resolution}...") |
|
stream = youtube.streams.filter(resolution=resolution).first() |
|
if stream is None: |
|
print(f"No stream found for resolution {resolution}") |
|
else: |
|
_download(stream, target) |
|
|
|
def download_audio(youtube: YouTube, filetype: Optional[str] = "mp4", target: Optional[str] = None) -> None: |
|
"""Download audio stream of a YouTube video. |
|
|
|
:param YouTube youtube: |
|
A valid YouTube object. |
|
:param Optional[str] filetype: |
|
The filetype for the audio. Defaults to "mp4". |
|
:param Optional[str] target: |
|
The target directory for the download. |
|
""" |
|
print("Downloading audio...") |
|
stream = youtube.streams.filter(progressive=False, subtype=filetype).order_by("abr").last() |
|
if stream is None: |
|
print(f"No audio stream found for filetype {filetype}") |
|
else: |
|
_download(stream, target) |
|
|
|
def download_highest_resolution_progressive(youtube: YouTube, resolution: str, target: Optional[str] = None) -> None: |
|
"""Download a YouTube video stream at the highest resolution. |
|
|
|
:param YouTube youtube: |
|
A valid YouTube object. |
|
:param str resolution: |
|
The resolution of the stream. |
|
:param Optional[str] target: |
|
The target directory for the download. |
|
""" |
|
print("Downloading highest resolution progressive stream...") |
|
stream = youtube.streams.filter(progressive=True).order_by("resolution").last() |
|
if stream is None: |
|
print("No progressive stream found.") |
|
else: |
|
_download(stream, target) |
|
|
|
def download_by_itag(youtube: YouTube, itag: int, target: Optional[str] = None) -> None: |
|
"""Download a YouTube stream by its itag. |
|
|
|
:param YouTube youtube: |
|
A valid YouTube object. |
|
:param int itag: |
|
The itag of the desired stream. |
|
:param Optional[str] target: |
|
The target directory for the download. |
|
""" |
|
stream = youtube.streams.get_by_itag(itag) |
|
if stream is None: |
|
print(f"No stream found with itag {itag}.") |
|
else: |
|
print(f"Downloading stream with itag {itag}...") |
|
_download(stream, target) |
|
|
|
def download_caption(youtube: YouTube, lang_code: str, target: Optional[str] = None) -> None: |
|
"""Download captions for a given YouTube video. |
|
|
|
:param YouTube youtube: |
|
A valid YouTube object. |
|
:param str lang_code: |
|
The language code for the desired captions. |
|
:param Optional[str] target: |
|
The target directory for the downloaded captions. |
|
""" |
|
print(f"Downloading captions for language: {lang_code}...") |
|
caption = youtube.captions.get_by_language_code(lang_code) |
|
if caption is None: |
|
print(f"No captions found for language code: {lang_code}.") |
|
else: |
|
caption.download(target) |
|
|
|
def _print_available_captions(captions: List[CaptionQuery]) -> None: |
|
"""Print available captions for a YouTube video. |
|
|
|
:param List[CaptionQuery] captions: |
|
The list of available captions. |
|
""" |
|
print("Available captions:") |
|
for caption in captions: |
|
print(f" - {caption.language_code}: {caption.name}") |
|
|
|
def display_streams(youtube: YouTube) -> None: |
|
"""Display available streams for the given YouTube video. |
|
|
|
:param YouTube youtube: |
|
A valid YouTube object. |
|
""" |
|
print(f"Available streams for {youtube.title}:") |
|
for stream in youtube.streams: |
|
print(f" - {stream}") |
|
|
|
|
|
def _parse_args(parser: argparse.ArgumentParser, args: Optional[List] = None) -> argparse.Namespace: |
|
parser.add_argument("url", help="The YouTube /watch or /playlist url", nargs="?") |
|
parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}") |
|
parser.add_argument("--itag", type=int, help="The itag for the desired stream") |
|
parser.add_argument("-r", "--resolution", type=str, help="The resolution for the desired stream") |
|
parser.add_argument("-l", "--list", action="store_true", help="The list option causes pytubefix cli to return a list of streams available to download") |
|
parser.add_argument("--oauth", action="store_true", help="use oauth token") |
|
parser.add_argument("-v", "--verbose", action="store_true", dest="verbose", help="Set logger output to verbose output.") |
|
parser.add_argument("--logfile", action="store", help="logging debug and error messages into a log file") |
|
parser.add_argument("--build-playback-report", action="store_true", help="Save the html and js to disk") |
|
parser.add_argument("-c", "--caption-code", type=str, help="Download srt captions for given language code. Prints available language codes if no argument given") |
|
parser.add_argument('-lc', '--list-captions', action='store_true', help="List available caption codes for a video") |
|
parser.add_argument("-t", "--target", help="The output directory for the downloaded stream. Default is current working directory") |
|
parser.add_argument("-a", "--audio", const="mp4", nargs="?", help="Download the audio for a given URL at the highest bitrate available. Defaults to mp4 format if none is specified") |
|
parser.add_argument("-f", "--ffmpeg", const="best", nargs="?", help="Downloads the audio and video stream for resolution provided. If no resolution is provided, downloads the best resolution. Runs the command line program ffmpeg to combine the audio and video") |
|
|
|
return parser.parse_args(args) |
|
|
|
def _perform_args_on_youtube(youtube: YouTube, args: argparse.Namespace) -> None: |
|
if len(sys.argv) == 2: |
|
download_highest_resolution_progressive(youtube=youtube, resolution="highest", target=args.target) |
|
|
|
if args.list_captions: |
|
_print_available_captions(youtube.captions) |
|
if args.list: |
|
display_streams(youtube) |
|
|
|
if args.itag: |
|
download_by_itag(youtube=youtube, itag=args.itag, target=args.target) |
|
elif args.caption_code: |
|
download_caption(youtube=youtube, lang_code=args.caption_code, target=args.target) |
|
elif args.resolution: |
|
download_by_resolution(youtube=youtube, resolution=args.resolution, target=args.target) |
|
elif args.audio: |
|
download_audio(youtube=youtube, filetype=args.audio, target=args.target) |
|
|
|
if args.ffmpeg: |
|
ffmpeg_process(youtube=youtube, resolution=args.resolution, target=args.target) |
|
|
|
if args.build_playback_report: |
|
build_playback_report(youtube) |
|
|
|
oauth = False |
|
cache = False |
|
|
|
if args.oauth: |
|
oauth = True |
|
cache = True |
|
|
|
print("Loading video...") |
|
youtube = YouTube(args.url, use_oauth=oauth, allow_oauth_cache=cache) |
|
|
|
download_highest_resolution_progressive(youtube=youtube, resolution="highest", target=args.target) |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description=main.__doc__) |
|
args = _parse_args(parser) |
|
|
|
log_filename = args.logfile if args.verbose else None |
|
setup_logger(logging.DEBUG if args.verbose else logging.INFO, log_filename=log_filename) |
|
|
|
if args.verbose: |
|
logger.debug(f'Pytubefix version: {__version__}') |
|
|
|
if not args.url or "youtu" not in args.url: |
|
parser.print_help() |
|
sys.exit(0) |
|
|
|
if "/playlist" in args.url: |
|
print("Loading playlist...") |
|
playlist = Playlist(args.url) |
|
args.target = args.target or safe_filename(playlist.title) |
|
|
|
for youtube_video in playlist.videos: |
|
try: |
|
_perform_args_on_youtube(youtube_video, args) |
|
except exceptions.PytubeFixError as e: |
|
print(f"There was an error with video: {youtube_video}") |
|
print(e) |
|
|
|
else: |
|
print("Loading video...") |
|
youtube = YouTube(args.url) |
|
_perform_args_on_youtube(youtube, args) |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|