# MIT License # # Copyright (c) 2023 - 2025 Juan Bindez # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. """ This module implements the core developer interface for pytubefix. The problem domain of the :class:`YouTube class focuses almost exclusively on the developer interface. Pytubefix offloads the heavy lifting to smaller peripheral modules and functions. """ import logging from subprocess import CalledProcessError from typing import Any, Callable, Dict, List, Optional, Tuple import pytubefix import pytubefix.exceptions as exceptions from pytubefix import extract, request from pytubefix import Stream, StreamQuery from pytubefix.helpers import install_proxy from pytubefix.innertube import InnerTube from pytubefix.metadata import YouTubeMetadata from pytubefix.monostate import Monostate from pytubefix.botGuard import bot_guard logger = logging.getLogger(__name__) class YouTube: """Core developer interface for pytubefix.""" def __init__( self, url: str, client: str = InnerTube().client_name, on_progress_callback: Optional[Callable[[Any, bytes, int], None]] = None, on_complete_callback: Optional[Callable[[Any, Optional[str]], None]] = None, proxies: Optional[Dict[str, str]] = None, use_oauth: bool = False, allow_oauth_cache: bool = True, token_file: Optional[str] = None, oauth_verifier: Optional[Callable[[str, str], None]] = None, use_po_token: Optional[bool] = False, po_token_verifier: Optional[Callable[[None], Tuple[str, str]]] = None, ): """Construct a :class:`YouTube `. :param str url: A valid YouTube watch URL. :param str client: (Optional) A YouTube client, Available: WEB, WEB_EMBED, WEB_MUSIC, WEB_CREATOR, WEB_SAFARI, ANDROID, ANDROID_MUSIC, ANDROID_CREATOR, ANDROID_VR, ANDROID_PRODUCER, ANDROID_TESTSUITE, IOS, IOS_MUSIC, IOS_CREATOR, MWEB, TV, TV_EMBED, MEDIA_CONNECT. :param func on_progress_callback: (Optional) User defined callback function for stream download progress events. :param func on_complete_callback: (Optional) User defined callback function for stream download complete events. :param dict proxies: (Optional) A dict mapping protocol to proxy address which will be used by pytube. :param bool use_oauth: (Optional) Prompt the user to authenticate to YouTube. If allow_oauth_cache is set to True, the user should only be prompted once. :param bool allow_oauth_cache: (Optional) Cache OAuth and Po tokens locally on the machine. Defaults to True. These tokens are only generated if use_oauth is set to True as well. :param str token_file: (Optional) Path to the file where the OAuth and Po tokens will be stored. Defaults to None, which means the tokens will be stored in the pytubefix/__cache__ directory. :param Callable oauth_verifier: (optional) Verifier to be used for getting oauth tokens. Verification URL and User-Code will be passed to it respectively. (if passed, else default verifier will be used) :param bool use_po_token: (Optional) Prompt the user to use the proof of origin token on YouTube. It must be sent with the API along with the linked visitorData and then passed as a `po_token` query parameter to affected clients. If allow_oauth_cache is set to True, the user should only be prompted once. (Do not use together with `use_oauth=True`) :param Callable po_token_verifier: (Optional) Verified used to obtain the visitorData and po_token. The verifier will return the visitorData and po_token respectively. (if passed, else default verifier will be used) """ # js fetched by js_url self._js: Optional[str] = None # the url to the js, parsed from watch html self._js_url: Optional[str] = None # content fetched from innertube/player self._vid_info: Optional[Dict] = None self._vid_details: Optional[Dict] = None # the html of /watch?v= self._watch_html: Optional[str] = None self._embed_html: Optional[str] = None # inline js in the html containing self._player_config_args: Optional[Dict] = None self._age_restricted: Optional[bool] = None self._fmt_streams: Optional[List[Stream]] = None self._initial_data = None self._metadata: Optional[YouTubeMetadata] = None # video_id part of /watch?v= self.video_id = extract.video_id(url) self.watch_url = f"https://youtube.com/watch?v={self.video_id}" self.embed_url = f"https://www.youtube.com/embed/{self.video_id}" self.client = 'WEB' if use_po_token else client # oauth can only be used by the TV and TV_EMBED client. self.client = 'TV' if use_oauth else self.client self.fallback_clients = ['TV', 'IOS'] self._signature_timestamp: dict = {} self._visitor_data = None # Shared between all instances of `Stream` (Borg pattern). self.stream_monostate = Monostate( on_progress=on_progress_callback, on_complete=on_complete_callback, youtube=self ) if proxies: install_proxy(proxies) self._author = None self._title = None self._publish_date = None self.use_oauth = use_oauth self.allow_oauth_cache = allow_oauth_cache self.token_file = token_file self.oauth_verifier = oauth_verifier self.use_po_token = use_po_token self.po_token_verifier = po_token_verifier self.po_token = None self._pot = None def __repr__(self): return f'' def __eq__(self, o: object) -> bool: # Compare types and urls, if they're same return true, else return false. return type(o) == type(self) and o.watch_url == self.watch_url @property def watch_html(self): if self._watch_html: return self._watch_html self._watch_html = request.get(url=self.watch_url) return self._watch_html @property def embed_html(self): if self._embed_html: return self._embed_html self._embed_html = request.get(url=self.embed_url) return self._embed_html @property def age_restricted(self): if self._age_restricted: return self._age_restricted self._age_restricted = extract.is_age_restricted(self.watch_html) return self._age_restricted @property def js_url(self): if self._js_url: return self._js_url if self.age_restricted: self._js_url = extract.js_url(self.embed_html) else: self._js_url = extract.js_url(self.watch_html) return self._js_url @property def js(self): if self._js: return self._js # If the js_url doesn't match the cached url, fetch the new js and update # the cache; otherwise, load the cache. if pytubefix.__js_url__ != self.js_url: self._js = request.get(self.js_url) pytubefix.__js__ = self._js pytubefix.__js_url__ = self.js_url else: self._js = pytubefix.__js__ return self._js @property def visitor_data(self) -> str: """ Retrieves the visitorData from the WEB client. """ if self._visitor_data: return self._visitor_data if InnerTube(self.client).require_po_token: try: logger.debug("Looking for visitorData in initial_data") self._visitor_data = extract.visitor_data(str(self.initial_data['responseContext'])) logger.debug('VisitorData obtained successfully') return self._visitor_data except (KeyError, pytubefix.exceptions.RegexMatchError): logger.debug("Unable to obtain visitorData from initial_data. Trying to request from the WEB client") logger.debug("Looking for visitorData in InnerTube API") innertube_response = InnerTube('WEB').player(self.video_id) try: self._visitor_data = innertube_response['responseContext']['visitorData'] except KeyError: self._visitor_data = innertube_response['responseContext']['serviceTrackingParams'][0]['params'][6]['value'] logger.debug('VisitorData obtained successfully') return self._visitor_data @property def pot(self) -> str: """ Retrieves the poToken generated by botGuard. This poToken only works for WEB-based clients. """ if self._pot: return self._pot logger.debug('Invoking botGuard') try: self._pot = bot_guard.generate_po_token(visitor_data=self.visitor_data) logger.debug('PoToken generated successfully') except Exception as e: logger.warning('Unable to run botGuard. Skipping poToken generation, reason: ' + e.__str__()) return self._pot @property def initial_data(self): if self._initial_data: return self._initial_data self._initial_data = extract.initial_data(self.watch_html) return self._initial_data @property def streaming_data(self): """Return streamingData from video info.""" # List of YouTube error video IDs invalid_id_list = ['aQvGIIdgFDM'] # If my previously valid video_info doesn't have the streamingData, # or it is an invalid video, # try to get a new video_info with a different client. if 'streamingData' not in self.vid_info or self.vid_info['videoDetails']['videoId'] in invalid_id_list: original_client = self.client # for each fallback client set, revert videodata, and run check_availability, which # will try to get a new video_info with a different client. # if it fails try the next fallback client, and so on. # If none of the clients have valid streamingData, raise an exception. for client in self.fallback_clients: self.client = client self.vid_info = None try: self.check_availability() except Exception as e: continue if 'streamingData' in self.vid_info: break if 'streamingData' not in self.vid_info: raise exceptions.UnknownVideoError(video_id=self.video_id, developer_message=f'Streaming data is missing, ' f'original client: {original_client}, ' f'fallback clients: {self.fallback_clients}') return self.vid_info['streamingData'] @property def fmt_streams(self): """Returns a list of streams if they have been initialized. If the streams have not been initialized, finds all relevant streams and initializes them. """ self.check_availability() if self._fmt_streams: return self._fmt_streams self._fmt_streams = [] stream_manifest = extract.apply_descrambler(self.streaming_data) inner_tube = InnerTube(self.client) if self.po_token: extract.apply_po_token(stream_manifest, self.vid_info, self.po_token) if inner_tube.require_js_player: # If the cached js doesn't work, try fetching a new js file # https://github.com/pytube/pytube/issues/1054 try: extract.apply_signature(stream_manifest, self.vid_info, self.js, self.js_url) except exceptions.ExtractError: # To force an update to the js file, we clear the cache and retry self._js = None self._js_url = None pytubefix.__js__ = None pytubefix.__js_url__ = None extract.apply_signature(stream_manifest, self.vid_info, self.js, self.js_url) # build instances of :class:`Stream ` # Initialize stream objects for stream in stream_manifest: video = Stream( stream=stream, monostate=self.stream_monostate, po_token=self.po_token, video_playback_ustreamer_config=self.video_playback_ustreamer_config ) self._fmt_streams.append(video) self.stream_monostate.title = self.title self.stream_monostate.duration = self.length return self._fmt_streams def check_availability(self): """Check whether the video is available. Raises different exceptions based on why the video is unavailable, otherwise does nothing. """ status, messages = extract.playability_status(self.vid_info) if InnerTube(self.client).require_po_token and not self.po_token: logger.warning(f"The {self.client} client requires PoToken to obtain functional streams, " f"See more details at https://github.com/JuanBindez/pytubefix/pull/209") for reason in messages: if status == 'UNPLAYABLE': if reason == ( 'Join this channel to get access to members-only content ' 'like this video, and other exclusive perks.' ): raise exceptions.MembersOnly(video_id=self.video_id) elif reason == 'This live stream recording is not available.': raise exceptions.RecordingUnavailable(video_id=self.video_id) elif reason == ( 'Sorry, something is wrong. This video may be inappropriate for some users. ' 'Sign in to your primary account to confirm your age.' ): raise exceptions.AgeCheckRequiredAccountError(video_id=self.video_id) elif reason == ( 'The uploader has not made this video available in your country' ): raise exceptions.VideoRegionBlocked(video_id=self.video_id) else: raise exceptions.VideoUnavailable(video_id=self.video_id) elif status == 'LOGIN_REQUIRED': if reason == ( 'Sign in to confirm your age' ): raise exceptions.AgeRestrictedError(video_id=self.video_id) elif reason == ( 'Sign in to confirm you’re not a bot' ): raise exceptions.BotDetection(video_id=self.video_id) else: raise exceptions.LoginRequired(video_id=self.video_id, reason=reason) elif status == 'AGE_CHECK_REQUIRED': if self.use_oauth: self.age_check() else: raise exceptions.AgeCheckRequiredError(video_id=self.video_id) elif status == 'LIVE_STREAM_OFFLINE': raise exceptions.LiveStreamOffline(video_id=self.video_id, reason=reason) elif status == 'ERROR': if reason == 'Video unavailable': raise exceptions.VideoUnavailable(video_id=self.video_id) elif reason == 'This video is private': raise exceptions.VideoPrivate(video_id=self.video_id) elif reason == 'This video is unavailable': raise exceptions.VideoUnavailable(video_id=self.video_id) elif reason == 'This video has been removed by the uploader': raise exceptions.VideoUnavailable(video_id=self.video_id) elif reason == 'This video is no longer available because the YouTube account associated with this video has been terminated.': raise exceptions.VideoUnavailable(video_id=self.video_id) else: raise exceptions.UnknownVideoError(video_id=self.video_id, status=status, reason=reason, developer_message=f'Unknown reason type for Error status') elif status == 'LIVE_STREAM': raise exceptions.LiveStreamError(video_id=self.video_id) elif status is None: pass else: raise exceptions.UnknownVideoError(video_id=self.video_id, status=status, reason=reason, developer_message=f'Unknown video status') @property def signature_timestamp(self) -> dict: """WEB clients need to be signed with a signature timestamp. The signature is found inside the player's base.js. :rtype: Dict """ if not self._signature_timestamp: self._signature_timestamp = { 'playbackContext': { 'contentPlaybackContext': { 'signatureTimestamp': extract.signature_timestamp(self.js) } } } return self._signature_timestamp @property def video_playback_ustreamer_config(self): return self.vid_info[ 'playerConfig'][ 'mediaCommonConfig'][ 'mediaUstreamerRequestConfig'][ 'videoPlaybackUstreamerConfig'] @property def server_abr_streaming_url(self): """ Extract the url for abr server and decrypt the `n` parameter """ try: url = self.vid_info[ 'streamingData'][ 'serverAbrStreamingUrl'] stream_manifest = [{"url": url}] extract.apply_signature(stream_manifest, vid_info=self.vid_info, js=self.js, url_js=self.js_url) return stream_manifest[0]["url"] except Exception: return None @property def vid_info(self): """Parse the raw vid info and return the parsed result. :rtype: Dict[Any, Any] """ if self._vid_info: return self._vid_info def call_innertube(): innertube = InnerTube( client=self.client, use_oauth=self.use_oauth, allow_cache=self.allow_oauth_cache, token_file=self.token_file, oauth_verifier=self.oauth_verifier, use_po_token=self.use_po_token, po_token_verifier=self.po_token_verifier ) if innertube.require_js_player: innertube.innertube_context.update(self.signature_timestamp) # Automatically generates a poToken if innertube.require_po_token and not self.use_po_token: logger.debug(f"The {self.client} client requires poToken to obtain functional streams") logger.debug("Automatically generating poToken") innertube.insert_po_token(visitor_data=self.visitor_data, po_token=self.pot) elif not self.use_po_token: # from 01/22/2025 all clients must send the visitorData in the API request innertube.insert_visitor_data(visitor_data=self.visitor_data) response = innertube.player(self.video_id) # Retrieves the sent poToken if self.use_po_token or innertube.require_po_token: self.po_token = innertube.access_po_token or self.pot return response innertube_response = call_innertube() for client in self.fallback_clients: # Some clients are unable to access certain types of videos # If the video is unavailable for the current client, attempts will be made with fallback clients playability_status = innertube_response['playabilityStatus'] if playability_status['status'] == 'UNPLAYABLE' and 'reason' in playability_status and playability_status['reason'] == 'This video is not available': logger.warning(f"{self.client} client returned: This video is not available") self.client = client logger.warning(f"Switching to client: {client}") innertube_response = call_innertube() else: break self._vid_info = innertube_response if not self._vid_info: raise pytubefix.exceptions.InnerTubeResponseError(self.video_id, self.client) return self._vid_info @vid_info.setter def vid_info(self, value): self._vid_info = value @property def vid_details(self): """Parse the raw vid details and return the parsed result. The official player sends a request to the `next` endpoint to obtain some details of the video. :rtype: Dict[Any, Any] """ if self._vid_details: return self._vid_details innertube = InnerTube( client='TV' if self.use_oauth else 'WEB', use_oauth=self.use_oauth, allow_cache=self.allow_oauth_cache, token_file=self.token_file, oauth_verifier=self.oauth_verifier, use_po_token=self.use_po_token, po_token_verifier=self.po_token_verifier ) innertube_response = innertube.next(self.video_id) self._vid_details = innertube_response return self._vid_details @vid_details.setter def vid_details(self, value): self._vid_details = value def age_check(self): """If the video has any age restrictions, you must confirm that you wish to continue. Originally the WEB client was used, but with the implementation of PoToken we switched to MWEB. """ self.client = 'TV' innertube = InnerTube( client=self.client, use_oauth=self.use_oauth, allow_cache=self.allow_oauth_cache, token_file=self.token_file, oauth_verifier=self.oauth_verifier, use_po_token=self.use_po_token, po_token_verifier=self.po_token_verifier ) if innertube.require_js_player: innertube.innertube_context.update(self.signature_timestamp) innertube.verify_age(self.video_id) innertube_response = innertube.player(self.video_id) playability_status = innertube_response['playabilityStatus'].get('status', None) # If we still can't access the video, raise an exception if playability_status != 'OK': if playability_status == 'UNPLAYABLE': raise exceptions.AgeCheckRequiredAccountError(self.video_id) else: raise exceptions.AgeCheckRequiredError(self.video_id) self._vid_info = innertube_response @property def caption_tracks(self) -> List[pytubefix.Caption]: """Get a list of :class:`Caption `. :rtype: List[Caption] """ innertube_response = InnerTube( client='WEB' if not self.use_oauth else self.client, use_oauth=self.use_oauth, allow_cache=self.allow_oauth_cache, token_file=self.token_file, oauth_verifier=self.oauth_verifier, use_po_token=self.use_po_token, po_token_verifier=self.po_token_verifier ).player(self.video_id) raw_tracks = ( innertube_response.get("captions", {}) .get("playerCaptionsTracklistRenderer", {}) .get("captionTracks", []) ) return [pytubefix.Caption(track) for track in raw_tracks] @property def captions(self) -> pytubefix.CaptionQuery: """Interface to query caption tracks. :rtype: :class:`CaptionQuery `. """ return pytubefix.CaptionQuery(self.caption_tracks) @property def chapters(self) -> List[pytubefix.Chapter]: """Get a list of :class:`Chapter `. :rtype: List[Chapter] """ try: chapters_data = [] markers_map = self.initial_data['playerOverlays']['playerOverlayRenderer'][ 'decoratedPlayerBarRenderer']['decoratedPlayerBarRenderer']['playerBar'][ 'multiMarkersPlayerBarRenderer']['markersMap'] for marker in markers_map: if marker['key'].upper() == 'DESCRIPTION_CHAPTERS': chapters_data = marker['value']['chapters'] break except (KeyError, IndexError): return [] result: List[pytubefix.Chapter] = [] for i, chapter_data in enumerate(chapters_data): chapter_start = int( chapter_data['chapterRenderer']['timeRangeStartMillis'] / 1000 ) if i == len(chapters_data) - 1: chapter_end = self.length else: chapter_end = int( chapters_data[i + 1]['chapterRenderer']['timeRangeStartMillis'] / 1000 ) result.append(pytubefix.Chapter(chapter_data, chapter_end - chapter_start)) return result @property def key_moments(self) -> List[pytubefix.KeyMoment]: """Get a list of :class:`KeyMoment `. :rtype: List[KeyMoment] """ try: mutations = self.initial_data['frameworkUpdates']['entityBatchUpdate']['mutations'] found = False for mutation in mutations: if mutation.get('payload', {}).get('macroMarkersListEntity', {}).get('markersList', {}).get( 'markerType') == "MARKER_TYPE_TIMESTAMPS": key_moments_data = mutation['payload']['macroMarkersListEntity']['markersList']['markers'] found = True break if not found: return [] except (KeyError, IndexError): return [] result: List[pytubefix.KeyMoment] = [] for i, key_moment_data in enumerate(key_moments_data): key_moment_start = int(key_moment_data['startMillis']) // 1000 if i == len(key_moments_data) - 1: key_moment_end = self.length else: key_moment_end = int(key_moments_data[i + 1]['startMillis']) // 1000 result.append(pytubefix.KeyMoment(key_moment_data, key_moment_end - key_moment_start)) return result @property def replayed_heatmap(self) -> List[Dict[str, float]]: """Get a list of : `Dict`. :rtype: List[Dict[str, float]] """ try: mutations = self.initial_data['frameworkUpdates']['entityBatchUpdate']['mutations'] found = False for mutation in mutations: if mutation.get('payload', {}).get('macroMarkersListEntity', {}).get('markersList', {}).get( 'markerType') == "MARKER_TYPE_HEATMAP": heatmaps_data = mutation['payload']['macroMarkersListEntity']['markersList']['markers'] found = True break if not found: return [] except (KeyError, IndexError): return [] result: List[Dict[str, float]] = [] for heatmap_data in heatmaps_data: heatmap_start = int(heatmap_data['startMillis']) / 1000 duration = int(heatmap_data['durationMillis']) / 1000 norm_intensity = float(heatmap_data['intensityScoreNormalized']) result.append({ "start_seconds": heatmap_start, "duration": duration, "norm_intensity": norm_intensity }) return result @property def streams(self) -> StreamQuery: """Interface to query both adaptive (DASH) and progressive streams. :rtype: :class:`StreamQuery `. """ self.check_availability() return StreamQuery(self.fmt_streams) @property def thumbnail_url(self) -> str: """Get the thumbnail url image. :rtype: str """ thumbnail_details = ( self.vid_info.get("videoDetails", {}) .get("thumbnail", {}) .get("thumbnails") ) if thumbnail_details: thumbnail_details = thumbnail_details[-1] # last item has max size return thumbnail_details["url"] return f"https://img.youtube.com/vi/{self.video_id}/maxresdefault.jpg" @property def publish_date(self): """Get the publish date. :rtype: datetime """ if self._publish_date: return self._publish_date self._publish_date = extract.publish_date(self.watch_html) return self._publish_date @publish_date.setter def publish_date(self, value): """Sets the publish date.""" self._publish_date = value @property def title(self) -> str: """Get the video title. :rtype: str """ self._author = self.vid_info.get("videoDetails", {}).get( "author", "unknown" ) if self._title: return self._title try: # Some clients may not return the title in the `player` endpoint, # so if it is not found we will look for it in the `next` endpoint if 'title' in self.vid_info['videoDetails']: self._title = self.vid_info['videoDetails']['title'] logger.debug('Found title in vid_info') else: if 'singleColumnWatchNextResults' in self.vid_details['contents']: contents = self.vid_details['contents'][ 'singleColumnWatchNextResults'][ 'results'][ 'results'][ 'contents'][0][ 'itemSectionRenderer'][ 'contents'][0] if 'videoMetadataRenderer' in contents: self._title = contents['videoMetadataRenderer']['title']['runs'][0]['text'] else: # JSON tree for titles in videos available on YouTube music self._title = contents['musicWatchMetadataRenderer']['title']['simpleText'] # The type of video with this structure is not yet known. # First reported in: https://github.com/JuanBindez/pytubefix/issues/351 elif 'twoColumnWatchNextResults' in self.vid_details['contents']: self._title = self.vid_details['contents'][ 'twoColumnWatchNextResults'][ 'results'][ 'results'][ 'contents'][0][ 'videoPrimaryInfoRenderer'][ 'title'][ 'runs'][0][ 'text'] logger.debug('Found title in vid_details') except KeyError as e: # Check_availability will raise the correct exception in most cases # if it doesn't, ask for a report. self.check_availability() raise exceptions.PytubeFixError( ( f'Exception while accessing title of {self.watch_url}. ' 'Please file a bug report at https://github.com/JuanBindez/pytubefix' ) ) from e return self._title @title.setter def title(self, value): """Sets the title value.""" self._title = value @property def description(self) -> str: """Get the video description. :rtype: str """ return self.vid_info.get("videoDetails", {}).get("shortDescription") @property def rating(self) -> float: """Get the video average rating. :rtype: float """ return self.vid_info.get("videoDetails", {}).get("averageRating") @property def length(self) -> int: """Get the video length in seconds. :rtype: int """ return int(self.vid_info.get('videoDetails', {}).get('lengthSeconds')) @property def views(self) -> int: """Get the number of the times the video has been viewed. :rtype: int """ return int(self.vid_info.get("videoDetails", {}).get("viewCount", "0")) @property def author(self) -> str: """Get the video author. :rtype: str """ if self._author: return self._author self._author = self.vid_info.get("videoDetails", {}).get( "author", "unknown" ) return self._author @author.setter def author(self, value): """Set the video author.""" self._author = value @property def keywords(self) -> List[str]: """Get the video keywords. :rtype: List[str] """ return self.vid_info.get('videoDetails', {}).get('keywords', []) @property def channel_id(self) -> str: """Get the video poster's channel id. :rtype: str """ return self.vid_info.get('videoDetails', {}).get('channelId', None) @property def channel_url(self) -> str: """Construct the channel url for the video's poster from the channel id. :rtype: str """ return f'https://www.youtube.com/channel/{self.channel_id}' @property def likes(self): """Get the video likes :rtype: str """ try: return self.vid_details[ 'contents'][ 'twoColumnWatchNextResults'][ 'results'][ 'results'][ 'contents'][ 0][ 'videoPrimaryInfoRenderer'][ 'videoActions'][ 'menuRenderer'][ 'topLevelButtons'][ 0][ 'segmentedLikeDislikeButtonViewModel'][ 'likeCountEntity'][ 'likeCountIfLikedNumber'] except (KeyError, IndexError): return None @property def metadata(self) -> Optional[YouTubeMetadata]: """Get the metadata for the video. :rtype: YouTubeMetadata """ if not self._metadata: self._metadata = extract.metadata( self.initial_data) # Creating the metadata return self._metadata def register_on_progress_callback(self, func: Callable[[Any, bytes, int], None]): """Register a download progress callback function post initialization. :param callable func: A callback function that takes ``stream``, ``chunk``, and ``bytes_remaining`` as parameters. :rtype: None """ self.stream_monostate.on_progress = func def register_on_complete_callback(self, func: Callable[[Any, Optional[str]], None]): """Register a download complete callback function post initialization. :param callable func: A callback function that takes ``stream`` and ``file_path``. :rtype: None """ self.stream_monostate.on_complete = func @staticmethod def from_id(video_id: str) -> "YouTube": """Construct a :class:`YouTube ` object from a video id. :param str video_id: The video id of the YouTube video. :rtype: :class:`YouTube ` """ return YouTube(f"https://www.youtube.com/watch?v={video_id}")