|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
This module implements the core developer interface for pytubefix. |
|
|
|
The problem domain of the :class:`YouTube <YouTube> class focuses almost |
|
exclusively on the developer interface. Pytubefix offloads the heavy lifting to |
|
smaller peripheral modules and functions. |
|
|
|
""" |
|
|
|
import logging |
|
from subprocess import CalledProcessError |
|
from typing import Any, Callable, Dict, List, Optional, Tuple |
|
|
|
import pytubefix |
|
import pytubefix.exceptions as exceptions |
|
from pytubefix import extract, request |
|
from pytubefix import Stream, StreamQuery |
|
from pytubefix.helpers import install_proxy |
|
from pytubefix.innertube import InnerTube |
|
from pytubefix.metadata import YouTubeMetadata |
|
from pytubefix.monostate import Monostate |
|
from pytubefix.botGuard import bot_guard |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class YouTube: |
|
"""Core developer interface for pytubefix.""" |
|
|
|
def __init__( |
|
self, |
|
url: str, |
|
client: str = InnerTube().client_name, |
|
on_progress_callback: Optional[Callable[[Any, bytes, int], None]] = None, |
|
on_complete_callback: Optional[Callable[[Any, Optional[str]], None]] = None, |
|
proxies: Optional[Dict[str, str]] = None, |
|
use_oauth: bool = False, |
|
allow_oauth_cache: bool = True, |
|
token_file: Optional[str] = None, |
|
oauth_verifier: Optional[Callable[[str, str], None]] = None, |
|
use_po_token: Optional[bool] = False, |
|
po_token_verifier: Optional[Callable[[None], Tuple[str, str]]] = None, |
|
): |
|
"""Construct a :class:`YouTube <YouTube>`. |
|
|
|
:param str url: |
|
A valid YouTube watch URL. |
|
:param str client: |
|
(Optional) A YouTube client, |
|
Available: |
|
WEB, WEB_EMBED, WEB_MUSIC, WEB_CREATOR, WEB_SAFARI, |
|
ANDROID, ANDROID_MUSIC, ANDROID_CREATOR, ANDROID_VR, ANDROID_PRODUCER, ANDROID_TESTSUITE, |
|
IOS, IOS_MUSIC, IOS_CREATOR, |
|
MWEB, TV, TV_EMBED, MEDIA_CONNECT. |
|
:param func on_progress_callback: |
|
(Optional) User defined callback function for stream download |
|
progress events. |
|
:param func on_complete_callback: |
|
(Optional) User defined callback function for stream download |
|
complete events. |
|
:param dict proxies: |
|
(Optional) A dict mapping protocol to proxy address which will be used by pytube. |
|
:param bool use_oauth: |
|
(Optional) Prompt the user to authenticate to YouTube. |
|
If allow_oauth_cache is set to True, the user should only be prompted once. |
|
:param bool allow_oauth_cache: |
|
(Optional) Cache OAuth and Po tokens locally on the machine. Defaults to True. |
|
These tokens are only generated if use_oauth is set to True as well. |
|
:param str token_file: |
|
(Optional) Path to the file where the OAuth and Po tokens will be stored. |
|
Defaults to None, which means the tokens will be stored in the pytubefix/__cache__ directory. |
|
:param Callable oauth_verifier: |
|
(optional) Verifier to be used for getting oauth tokens. |
|
Verification URL and User-Code will be passed to it respectively. |
|
(if passed, else default verifier will be used) |
|
:param bool use_po_token: |
|
(Optional) Prompt the user to use the proof of origin token on YouTube. |
|
It must be sent with the API along with the linked visitorData and |
|
then passed as a `po_token` query parameter to affected clients. |
|
If allow_oauth_cache is set to True, the user should only be prompted once. |
|
(Do not use together with `use_oauth=True`) |
|
:param Callable po_token_verifier: |
|
(Optional) Verified used to obtain the visitorData and po_token. |
|
The verifier will return the visitorData and po_token respectively. |
|
(if passed, else default verifier will be used) |
|
""" |
|
|
|
self._js: Optional[str] = None |
|
|
|
|
|
self._js_url: Optional[str] = None |
|
|
|
|
|
self._vid_info: Optional[Dict] = None |
|
self._vid_details: Optional[Dict] = None |
|
|
|
|
|
self._watch_html: Optional[str] = None |
|
self._embed_html: Optional[str] = None |
|
|
|
|
|
self._player_config_args: Optional[Dict] = None |
|
self._age_restricted: Optional[bool] = None |
|
|
|
self._fmt_streams: Optional[List[Stream]] = None |
|
|
|
self._initial_data = None |
|
self._metadata: Optional[YouTubeMetadata] = None |
|
|
|
|
|
self.video_id = extract.video_id(url) |
|
|
|
self.watch_url = f"https://youtube.com/watch?v={self.video_id}" |
|
self.embed_url = f"https://www.youtube.com/embed/{self.video_id}" |
|
|
|
self.client = 'WEB' if use_po_token else client |
|
|
|
|
|
self.client = 'TV' if use_oauth else self.client |
|
|
|
self.fallback_clients = ['TV', 'IOS'] |
|
|
|
self._signature_timestamp: dict = {} |
|
self._visitor_data = None |
|
|
|
|
|
self.stream_monostate = Monostate( |
|
on_progress=on_progress_callback, on_complete=on_complete_callback, youtube=self |
|
) |
|
|
|
if proxies: |
|
install_proxy(proxies) |
|
|
|
self._author = None |
|
self._title = None |
|
self._publish_date = None |
|
|
|
self.use_oauth = use_oauth |
|
self.allow_oauth_cache = allow_oauth_cache |
|
self.token_file = token_file |
|
self.oauth_verifier = oauth_verifier |
|
|
|
self.use_po_token = use_po_token |
|
self.po_token_verifier = po_token_verifier |
|
|
|
self.po_token = None |
|
self._pot = None |
|
|
|
def __repr__(self): |
|
return f'<pytubefix.__main__.YouTube object: videoId={self.video_id}>' |
|
|
|
def __eq__(self, o: object) -> bool: |
|
|
|
return type(o) == type(self) and o.watch_url == self.watch_url |
|
|
|
@property |
|
def watch_html(self): |
|
if self._watch_html: |
|
return self._watch_html |
|
self._watch_html = request.get(url=self.watch_url) |
|
return self._watch_html |
|
|
|
@property |
|
def embed_html(self): |
|
if self._embed_html: |
|
return self._embed_html |
|
self._embed_html = request.get(url=self.embed_url) |
|
return self._embed_html |
|
|
|
@property |
|
def age_restricted(self): |
|
if self._age_restricted: |
|
return self._age_restricted |
|
self._age_restricted = extract.is_age_restricted(self.watch_html) |
|
return self._age_restricted |
|
|
|
@property |
|
def js_url(self): |
|
if self._js_url: |
|
return self._js_url |
|
|
|
if self.age_restricted: |
|
self._js_url = extract.js_url(self.embed_html) |
|
else: |
|
self._js_url = extract.js_url(self.watch_html) |
|
|
|
return self._js_url |
|
|
|
@property |
|
def js(self): |
|
if self._js: |
|
return self._js |
|
|
|
|
|
|
|
if pytubefix.__js_url__ != self.js_url: |
|
self._js = request.get(self.js_url) |
|
pytubefix.__js__ = self._js |
|
pytubefix.__js_url__ = self.js_url |
|
else: |
|
self._js = pytubefix.__js__ |
|
|
|
return self._js |
|
|
|
@property |
|
def visitor_data(self) -> str: |
|
""" |
|
Retrieves the visitorData from the WEB client. |
|
""" |
|
if self._visitor_data: |
|
return self._visitor_data |
|
|
|
if InnerTube(self.client).require_po_token: |
|
try: |
|
logger.debug("Looking for visitorData in initial_data") |
|
self._visitor_data = extract.visitor_data(str(self.initial_data['responseContext'])) |
|
logger.debug('VisitorData obtained successfully') |
|
return self._visitor_data |
|
except (KeyError, pytubefix.exceptions.RegexMatchError): |
|
logger.debug("Unable to obtain visitorData from initial_data. Trying to request from the WEB client") |
|
|
|
logger.debug("Looking for visitorData in InnerTube API") |
|
innertube_response = InnerTube('WEB').player(self.video_id) |
|
try: |
|
self._visitor_data = innertube_response['responseContext']['visitorData'] |
|
except KeyError: |
|
self._visitor_data = innertube_response['responseContext']['serviceTrackingParams'][0]['params'][6]['value'] |
|
logger.debug('VisitorData obtained successfully') |
|
|
|
return self._visitor_data |
|
|
|
@property |
|
def pot(self) -> str: |
|
""" |
|
Retrieves the poToken generated by botGuard. |
|
|
|
This poToken only works for WEB-based clients. |
|
""" |
|
if self._pot: |
|
return self._pot |
|
logger.debug('Invoking botGuard') |
|
try: |
|
self._pot = bot_guard.generate_po_token(visitor_data=self.visitor_data) |
|
logger.debug('PoToken generated successfully') |
|
except Exception as e: |
|
logger.warning('Unable to run botGuard. Skipping poToken generation, reason: ' + e.__str__()) |
|
return self._pot |
|
|
|
@property |
|
def initial_data(self): |
|
if self._initial_data: |
|
return self._initial_data |
|
self._initial_data = extract.initial_data(self.watch_html) |
|
return self._initial_data |
|
|
|
@property |
|
def streaming_data(self): |
|
"""Return streamingData from video info.""" |
|
|
|
|
|
invalid_id_list = ['aQvGIIdgFDM'] |
|
|
|
|
|
|
|
|
|
if 'streamingData' not in self.vid_info or self.vid_info['videoDetails']['videoId'] in invalid_id_list: |
|
original_client = self.client |
|
|
|
|
|
|
|
|
|
|
|
for client in self.fallback_clients: |
|
self.client = client |
|
self.vid_info = None |
|
try: |
|
self.check_availability() |
|
except Exception as e: |
|
continue |
|
if 'streamingData' in self.vid_info: |
|
break |
|
if 'streamingData' not in self.vid_info: |
|
raise exceptions.UnknownVideoError(video_id=self.video_id, |
|
developer_message=f'Streaming data is missing, ' |
|
f'original client: {original_client}, ' |
|
f'fallback clients: {self.fallback_clients}') |
|
|
|
return self.vid_info['streamingData'] |
|
|
|
@property |
|
def fmt_streams(self): |
|
"""Returns a list of streams if they have been initialized. |
|
|
|
If the streams have not been initialized, finds all relevant |
|
streams and initializes them. |
|
""" |
|
self.check_availability() |
|
if self._fmt_streams: |
|
return self._fmt_streams |
|
|
|
self._fmt_streams = [] |
|
|
|
stream_manifest = extract.apply_descrambler(self.streaming_data) |
|
inner_tube = InnerTube(self.client) |
|
if self.po_token: |
|
extract.apply_po_token(stream_manifest, self.vid_info, self.po_token) |
|
|
|
if inner_tube.require_js_player: |
|
|
|
|
|
try: |
|
extract.apply_signature(stream_manifest, self.vid_info, self.js, self.js_url) |
|
except exceptions.ExtractError: |
|
|
|
self._js = None |
|
self._js_url = None |
|
pytubefix.__js__ = None |
|
pytubefix.__js_url__ = None |
|
extract.apply_signature(stream_manifest, self.vid_info, self.js, self.js_url) |
|
|
|
|
|
|
|
for stream in stream_manifest: |
|
video = Stream( |
|
stream=stream, |
|
monostate=self.stream_monostate, |
|
po_token=self.po_token, |
|
video_playback_ustreamer_config=self.video_playback_ustreamer_config |
|
) |
|
self._fmt_streams.append(video) |
|
|
|
self.stream_monostate.title = self.title |
|
self.stream_monostate.duration = self.length |
|
|
|
return self._fmt_streams |
|
|
|
def check_availability(self): |
|
"""Check whether the video is available. |
|
|
|
Raises different exceptions based on why the video is unavailable, |
|
otherwise does nothing. |
|
""" |
|
status, messages = extract.playability_status(self.vid_info) |
|
|
|
if InnerTube(self.client).require_po_token and not self.po_token: |
|
logger.warning(f"The {self.client} client requires PoToken to obtain functional streams, " |
|
f"See more details at https://github.com/JuanBindez/pytubefix/pull/209") |
|
|
|
for reason in messages: |
|
if status == 'UNPLAYABLE': |
|
if reason == ( |
|
'Join this channel to get access to members-only content ' |
|
'like this video, and other exclusive perks.' |
|
): |
|
raise exceptions.MembersOnly(video_id=self.video_id) |
|
|
|
elif reason == 'This live stream recording is not available.': |
|
raise exceptions.RecordingUnavailable(video_id=self.video_id) |
|
|
|
elif reason == ( |
|
'Sorry, something is wrong. This video may be inappropriate for some users. ' |
|
'Sign in to your primary account to confirm your age.' |
|
): |
|
raise exceptions.AgeCheckRequiredAccountError(video_id=self.video_id) |
|
elif reason == ( |
|
'The uploader has not made this video available in your country' |
|
): |
|
raise exceptions.VideoRegionBlocked(video_id=self.video_id) |
|
else: |
|
raise exceptions.VideoUnavailable(video_id=self.video_id) |
|
|
|
elif status == 'LOGIN_REQUIRED': |
|
if reason == ( |
|
'Sign in to confirm your age' |
|
): |
|
raise exceptions.AgeRestrictedError(video_id=self.video_id) |
|
elif reason == ( |
|
'Sign in to confirm you’re not a bot' |
|
): |
|
raise exceptions.BotDetection(video_id=self.video_id) |
|
else: |
|
raise exceptions.LoginRequired(video_id=self.video_id, reason=reason) |
|
|
|
elif status == 'AGE_CHECK_REQUIRED': |
|
if self.use_oauth: |
|
self.age_check() |
|
else: |
|
raise exceptions.AgeCheckRequiredError(video_id=self.video_id) |
|
|
|
elif status == 'LIVE_STREAM_OFFLINE': |
|
raise exceptions.LiveStreamOffline(video_id=self.video_id, reason=reason) |
|
|
|
elif status == 'ERROR': |
|
if reason == 'Video unavailable': |
|
raise exceptions.VideoUnavailable(video_id=self.video_id) |
|
elif reason == 'This video is private': |
|
raise exceptions.VideoPrivate(video_id=self.video_id) |
|
elif reason == 'This video is unavailable': |
|
raise exceptions.VideoUnavailable(video_id=self.video_id) |
|
elif reason == 'This video has been removed by the uploader': |
|
raise exceptions.VideoUnavailable(video_id=self.video_id) |
|
elif reason == 'This video is no longer available because the YouTube account associated with this video has been terminated.': |
|
raise exceptions.VideoUnavailable(video_id=self.video_id) |
|
else: |
|
raise exceptions.UnknownVideoError(video_id=self.video_id, status=status, reason=reason, developer_message=f'Unknown reason type for Error status') |
|
elif status == 'LIVE_STREAM': |
|
raise exceptions.LiveStreamError(video_id=self.video_id) |
|
elif status is None: |
|
pass |
|
else: |
|
raise exceptions.UnknownVideoError(video_id=self.video_id, status=status, reason=reason, developer_message=f'Unknown video status') |
|
|
|
@property |
|
def signature_timestamp(self) -> dict: |
|
"""WEB clients need to be signed with a signature timestamp. |
|
|
|
The signature is found inside the player's base.js. |
|
|
|
:rtype: Dict |
|
""" |
|
if not self._signature_timestamp: |
|
self._signature_timestamp = { |
|
'playbackContext': { |
|
'contentPlaybackContext': { |
|
'signatureTimestamp': extract.signature_timestamp(self.js) |
|
} |
|
} |
|
} |
|
return self._signature_timestamp |
|
|
|
@property |
|
def video_playback_ustreamer_config(self): |
|
return self.vid_info[ |
|
'playerConfig'][ |
|
'mediaCommonConfig'][ |
|
'mediaUstreamerRequestConfig'][ |
|
'videoPlaybackUstreamerConfig'] |
|
|
|
@property |
|
def server_abr_streaming_url(self): |
|
""" |
|
Extract the url for abr server and decrypt the `n` parameter |
|
""" |
|
try: |
|
url = self.vid_info[ |
|
'streamingData'][ |
|
'serverAbrStreamingUrl'] |
|
stream_manifest = [{"url": url}] |
|
extract.apply_signature(stream_manifest, vid_info=self.vid_info, js=self.js, url_js=self.js_url) |
|
return stream_manifest[0]["url"] |
|
except Exception: |
|
return None |
|
|
|
@property |
|
def vid_info(self): |
|
"""Parse the raw vid info and return the parsed result. |
|
|
|
:rtype: Dict[Any, Any] |
|
""" |
|
if self._vid_info: |
|
return self._vid_info |
|
|
|
def call_innertube(): |
|
innertube = InnerTube( |
|
client=self.client, |
|
use_oauth=self.use_oauth, |
|
allow_cache=self.allow_oauth_cache, |
|
token_file=self.token_file, |
|
oauth_verifier=self.oauth_verifier, |
|
use_po_token=self.use_po_token, |
|
po_token_verifier=self.po_token_verifier |
|
) |
|
if innertube.require_js_player: |
|
innertube.innertube_context.update(self.signature_timestamp) |
|
|
|
|
|
if innertube.require_po_token and not self.use_po_token: |
|
logger.debug(f"The {self.client} client requires poToken to obtain functional streams") |
|
logger.debug("Automatically generating poToken") |
|
innertube.insert_po_token(visitor_data=self.visitor_data, po_token=self.pot) |
|
elif not self.use_po_token: |
|
|
|
innertube.insert_visitor_data(visitor_data=self.visitor_data) |
|
|
|
response = innertube.player(self.video_id) |
|
|
|
|
|
if self.use_po_token or innertube.require_po_token: |
|
self.po_token = innertube.access_po_token or self.pot |
|
return response |
|
|
|
innertube_response = call_innertube() |
|
for client in self.fallback_clients: |
|
|
|
|
|
playability_status = innertube_response['playabilityStatus'] |
|
if playability_status['status'] == 'UNPLAYABLE' and 'reason' in playability_status and playability_status['reason'] == 'This video is not available': |
|
logger.warning(f"{self.client} client returned: This video is not available") |
|
self.client = client |
|
logger.warning(f"Switching to client: {client}") |
|
innertube_response = call_innertube() |
|
else: |
|
break |
|
|
|
self._vid_info = innertube_response |
|
if not self._vid_info: |
|
raise pytubefix.exceptions.InnerTubeResponseError(self.video_id, self.client) |
|
|
|
return self._vid_info |
|
|
|
@vid_info.setter |
|
def vid_info(self, value): |
|
self._vid_info = value |
|
|
|
@property |
|
def vid_details(self): |
|
"""Parse the raw vid details and return the parsed result. |
|
|
|
The official player sends a request to the `next` endpoint to obtain some details of the video. |
|
|
|
:rtype: Dict[Any, Any] |
|
""" |
|
if self._vid_details: |
|
return self._vid_details |
|
|
|
innertube = InnerTube( |
|
client='TV' if self.use_oauth else 'WEB', |
|
use_oauth=self.use_oauth, |
|
allow_cache=self.allow_oauth_cache, |
|
token_file=self.token_file, |
|
oauth_verifier=self.oauth_verifier, |
|
use_po_token=self.use_po_token, |
|
po_token_verifier=self.po_token_verifier |
|
) |
|
innertube_response = innertube.next(self.video_id) |
|
self._vid_details = innertube_response |
|
return self._vid_details |
|
|
|
@vid_details.setter |
|
def vid_details(self, value): |
|
self._vid_details = value |
|
|
|
def age_check(self): |
|
"""If the video has any age restrictions, you must confirm that you wish to continue. |
|
|
|
Originally the WEB client was used, but with the implementation of PoToken we switched to MWEB. |
|
""" |
|
|
|
self.client = 'TV' |
|
innertube = InnerTube( |
|
client=self.client, |
|
use_oauth=self.use_oauth, |
|
allow_cache=self.allow_oauth_cache, |
|
token_file=self.token_file, |
|
oauth_verifier=self.oauth_verifier, |
|
use_po_token=self.use_po_token, |
|
po_token_verifier=self.po_token_verifier |
|
) |
|
|
|
if innertube.require_js_player: |
|
innertube.innertube_context.update(self.signature_timestamp) |
|
|
|
innertube.verify_age(self.video_id) |
|
|
|
innertube_response = innertube.player(self.video_id) |
|
|
|
playability_status = innertube_response['playabilityStatus'].get('status', None) |
|
|
|
|
|
if playability_status != 'OK': |
|
if playability_status == 'UNPLAYABLE': |
|
raise exceptions.AgeCheckRequiredAccountError(self.video_id) |
|
else: |
|
raise exceptions.AgeCheckRequiredError(self.video_id) |
|
|
|
self._vid_info = innertube_response |
|
|
|
@property |
|
def caption_tracks(self) -> List[pytubefix.Caption]: |
|
"""Get a list of :class:`Caption <Caption>`. |
|
|
|
:rtype: List[Caption] |
|
""" |
|
|
|
innertube_response = InnerTube( |
|
client='WEB' if not self.use_oauth else self.client, |
|
use_oauth=self.use_oauth, |
|
allow_cache=self.allow_oauth_cache, |
|
token_file=self.token_file, |
|
oauth_verifier=self.oauth_verifier, |
|
use_po_token=self.use_po_token, |
|
po_token_verifier=self.po_token_verifier |
|
).player(self.video_id) |
|
|
|
raw_tracks = ( |
|
innertube_response.get("captions", {}) |
|
.get("playerCaptionsTracklistRenderer", {}) |
|
.get("captionTracks", []) |
|
) |
|
return [pytubefix.Caption(track) for track in raw_tracks] |
|
|
|
@property |
|
def captions(self) -> pytubefix.CaptionQuery: |
|
"""Interface to query caption tracks. |
|
|
|
:rtype: :class:`CaptionQuery <CaptionQuery>`. |
|
""" |
|
return pytubefix.CaptionQuery(self.caption_tracks) |
|
|
|
@property |
|
def chapters(self) -> List[pytubefix.Chapter]: |
|
"""Get a list of :class:`Chapter <Chapter>`. |
|
|
|
:rtype: List[Chapter] |
|
""" |
|
try: |
|
chapters_data = [] |
|
markers_map = self.initial_data['playerOverlays']['playerOverlayRenderer'][ |
|
'decoratedPlayerBarRenderer']['decoratedPlayerBarRenderer']['playerBar'][ |
|
'multiMarkersPlayerBarRenderer']['markersMap'] |
|
for marker in markers_map: |
|
if marker['key'].upper() == 'DESCRIPTION_CHAPTERS': |
|
chapters_data = marker['value']['chapters'] |
|
break |
|
except (KeyError, IndexError): |
|
return [] |
|
|
|
result: List[pytubefix.Chapter] = [] |
|
|
|
for i, chapter_data in enumerate(chapters_data): |
|
chapter_start = int( |
|
chapter_data['chapterRenderer']['timeRangeStartMillis'] / 1000 |
|
) |
|
|
|
if i == len(chapters_data) - 1: |
|
chapter_end = self.length |
|
else: |
|
chapter_end = int( |
|
chapters_data[i + 1]['chapterRenderer']['timeRangeStartMillis'] / 1000 |
|
) |
|
|
|
result.append(pytubefix.Chapter(chapter_data, chapter_end - chapter_start)) |
|
|
|
return result |
|
|
|
@property |
|
def key_moments(self) -> List[pytubefix.KeyMoment]: |
|
"""Get a list of :class:`KeyMoment <KeyMoment>`. |
|
|
|
:rtype: List[KeyMoment] |
|
""" |
|
try: |
|
mutations = self.initial_data['frameworkUpdates']['entityBatchUpdate']['mutations'] |
|
found = False |
|
for mutation in mutations: |
|
if mutation.get('payload', {}).get('macroMarkersListEntity', {}).get('markersList', {}).get( |
|
'markerType') == "MARKER_TYPE_TIMESTAMPS": |
|
key_moments_data = mutation['payload']['macroMarkersListEntity']['markersList']['markers'] |
|
found = True |
|
break |
|
|
|
if not found: |
|
return [] |
|
except (KeyError, IndexError): |
|
return [] |
|
|
|
result: List[pytubefix.KeyMoment] = [] |
|
|
|
for i, key_moment_data in enumerate(key_moments_data): |
|
key_moment_start = int(key_moment_data['startMillis']) // 1000 |
|
|
|
if i == len(key_moments_data) - 1: |
|
key_moment_end = self.length |
|
else: |
|
key_moment_end = int(key_moments_data[i + 1]['startMillis']) // 1000 |
|
|
|
result.append(pytubefix.KeyMoment(key_moment_data, key_moment_end - key_moment_start)) |
|
|
|
return result |
|
|
|
@property |
|
def replayed_heatmap(self) -> List[Dict[str, float]]: |
|
"""Get a list of : `Dict<str, float>`. |
|
|
|
:rtype: List[Dict[str, float]] |
|
""" |
|
try: |
|
mutations = self.initial_data['frameworkUpdates']['entityBatchUpdate']['mutations'] |
|
found = False |
|
for mutation in mutations: |
|
if mutation.get('payload', {}).get('macroMarkersListEntity', {}).get('markersList', {}).get( |
|
'markerType') == "MARKER_TYPE_HEATMAP": |
|
heatmaps_data = mutation['payload']['macroMarkersListEntity']['markersList']['markers'] |
|
found = True |
|
break |
|
|
|
if not found: |
|
return [] |
|
except (KeyError, IndexError): |
|
return [] |
|
|
|
result: List[Dict[str, float]] = [] |
|
|
|
for heatmap_data in heatmaps_data: |
|
heatmap_start = int(heatmap_data['startMillis']) / 1000 |
|
duration = int(heatmap_data['durationMillis']) / 1000 |
|
|
|
norm_intensity = float(heatmap_data['intensityScoreNormalized']) |
|
|
|
result.append({ |
|
"start_seconds": heatmap_start, |
|
"duration": duration, |
|
"norm_intensity": norm_intensity |
|
}) |
|
|
|
return result |
|
|
|
@property |
|
def streams(self) -> StreamQuery: |
|
"""Interface to query both adaptive (DASH) and progressive streams. |
|
|
|
:rtype: :class:`StreamQuery <StreamQuery>`. |
|
""" |
|
self.check_availability() |
|
return StreamQuery(self.fmt_streams) |
|
|
|
@property |
|
def thumbnail_url(self) -> str: |
|
"""Get the thumbnail url image. |
|
|
|
:rtype: str |
|
""" |
|
thumbnail_details = ( |
|
self.vid_info.get("videoDetails", {}) |
|
.get("thumbnail", {}) |
|
.get("thumbnails") |
|
) |
|
if thumbnail_details: |
|
thumbnail_details = thumbnail_details[-1] |
|
return thumbnail_details["url"] |
|
|
|
return f"https://img.youtube.com/vi/{self.video_id}/maxresdefault.jpg" |
|
|
|
@property |
|
def publish_date(self): |
|
"""Get the publish date. |
|
|
|
:rtype: datetime |
|
""" |
|
if self._publish_date: |
|
return self._publish_date |
|
self._publish_date = extract.publish_date(self.watch_html) |
|
return self._publish_date |
|
|
|
@publish_date.setter |
|
def publish_date(self, value): |
|
"""Sets the publish date.""" |
|
self._publish_date = value |
|
|
|
@property |
|
def title(self) -> str: |
|
"""Get the video title. |
|
|
|
:rtype: str |
|
""" |
|
self._author = self.vid_info.get("videoDetails", {}).get( |
|
"author", "unknown" |
|
) |
|
|
|
if self._title: |
|
return self._title |
|
|
|
try: |
|
|
|
|
|
if 'title' in self.vid_info['videoDetails']: |
|
self._title = self.vid_info['videoDetails']['title'] |
|
logger.debug('Found title in vid_info') |
|
else: |
|
if 'singleColumnWatchNextResults' in self.vid_details['contents']: |
|
contents = self.vid_details['contents'][ |
|
'singleColumnWatchNextResults'][ |
|
'results'][ |
|
'results'][ |
|
'contents'][0][ |
|
'itemSectionRenderer'][ |
|
'contents'][0] |
|
|
|
if 'videoMetadataRenderer' in contents: |
|
self._title = contents['videoMetadataRenderer']['title']['runs'][0]['text'] |
|
else: |
|
|
|
self._title = contents['musicWatchMetadataRenderer']['title']['simpleText'] |
|
|
|
|
|
|
|
elif 'twoColumnWatchNextResults' in self.vid_details['contents']: |
|
self._title = self.vid_details['contents'][ |
|
'twoColumnWatchNextResults'][ |
|
'results'][ |
|
'results'][ |
|
'contents'][0][ |
|
'videoPrimaryInfoRenderer'][ |
|
'title'][ |
|
'runs'][0][ |
|
'text'] |
|
|
|
logger.debug('Found title in vid_details') |
|
except KeyError as e: |
|
|
|
|
|
self.check_availability() |
|
raise exceptions.PytubeFixError( |
|
( |
|
f'Exception while accessing title of {self.watch_url}. ' |
|
'Please file a bug report at https://github.com/JuanBindez/pytubefix' |
|
) |
|
) from e |
|
|
|
return self._title |
|
|
|
@title.setter |
|
def title(self, value): |
|
"""Sets the title value.""" |
|
self._title = value |
|
|
|
@property |
|
def description(self) -> str: |
|
"""Get the video description. |
|
|
|
:rtype: str |
|
""" |
|
return self.vid_info.get("videoDetails", {}).get("shortDescription") |
|
|
|
@property |
|
def rating(self) -> float: |
|
"""Get the video average rating. |
|
|
|
:rtype: float |
|
|
|
""" |
|
return self.vid_info.get("videoDetails", {}).get("averageRating") |
|
|
|
@property |
|
def length(self) -> int: |
|
"""Get the video length in seconds. |
|
|
|
:rtype: int |
|
""" |
|
return int(self.vid_info.get('videoDetails', {}).get('lengthSeconds')) |
|
|
|
@property |
|
def views(self) -> int: |
|
"""Get the number of the times the video has been viewed. |
|
|
|
:rtype: int |
|
""" |
|
return int(self.vid_info.get("videoDetails", {}).get("viewCount", "0")) |
|
|
|
@property |
|
def author(self) -> str: |
|
"""Get the video author. |
|
:rtype: str |
|
""" |
|
if self._author: |
|
return self._author |
|
self._author = self.vid_info.get("videoDetails", {}).get( |
|
"author", "unknown" |
|
) |
|
return self._author |
|
|
|
@author.setter |
|
def author(self, value): |
|
"""Set the video author.""" |
|
self._author = value |
|
|
|
@property |
|
def keywords(self) -> List[str]: |
|
"""Get the video keywords. |
|
|
|
:rtype: List[str] |
|
""" |
|
return self.vid_info.get('videoDetails', {}).get('keywords', []) |
|
|
|
@property |
|
def channel_id(self) -> str: |
|
"""Get the video poster's channel id. |
|
|
|
:rtype: str |
|
""" |
|
return self.vid_info.get('videoDetails', {}).get('channelId', None) |
|
|
|
@property |
|
def channel_url(self) -> str: |
|
"""Construct the channel url for the video's poster from the channel id. |
|
|
|
:rtype: str |
|
""" |
|
return f'https://www.youtube.com/channel/{self.channel_id}' |
|
|
|
@property |
|
def likes(self): |
|
"""Get the video likes |
|
|
|
:rtype: str |
|
""" |
|
try: |
|
return self.vid_details[ |
|
'contents'][ |
|
'twoColumnWatchNextResults'][ |
|
'results'][ |
|
'results'][ |
|
'contents'][ |
|
0][ |
|
'videoPrimaryInfoRenderer'][ |
|
'videoActions'][ |
|
'menuRenderer'][ |
|
'topLevelButtons'][ |
|
0][ |
|
'segmentedLikeDislikeButtonViewModel'][ |
|
'likeCountEntity'][ |
|
'likeCountIfLikedNumber'] |
|
except (KeyError, IndexError): |
|
return None |
|
|
|
@property |
|
def metadata(self) -> Optional[YouTubeMetadata]: |
|
"""Get the metadata for the video. |
|
|
|
:rtype: YouTubeMetadata |
|
""" |
|
if not self._metadata: |
|
self._metadata = extract.metadata( |
|
self.initial_data) |
|
return self._metadata |
|
|
|
def register_on_progress_callback(self, func: Callable[[Any, bytes, int], None]): |
|
"""Register a download progress callback function post initialization. |
|
|
|
:param callable func: |
|
A callback function that takes ``stream``, ``chunk``, |
|
and ``bytes_remaining`` as parameters. |
|
|
|
:rtype: None |
|
|
|
""" |
|
self.stream_monostate.on_progress = func |
|
|
|
def register_on_complete_callback(self, func: Callable[[Any, Optional[str]], None]): |
|
"""Register a download complete callback function post initialization. |
|
|
|
:param callable func: |
|
A callback function that takes ``stream`` and ``file_path``. |
|
|
|
:rtype: None |
|
|
|
""" |
|
self.stream_monostate.on_complete = func |
|
|
|
@staticmethod |
|
def from_id(video_id: str) -> "YouTube": |
|
"""Construct a :class:`YouTube <YouTube>` object from a video id. |
|
|
|
:param str video_id: |
|
The video id of the YouTube video. |
|
|
|
:rtype: :class:`YouTube <YouTube>` |
|
""" |
|
return YouTube(f"https://www.youtube.com/watch?v={video_id}") |
|
|