#!/usr/bin/python3 # -*- coding: utf-8 -*- """ https://github.com/googleapis/google-api-python-client https://console.cloud.google.com/apis/api/youtube.googleapis.com/quotas?inv=1&invt=Ab3P9A&project=youtube-manager-20250703 """ import json import os import argparse from pathlib import Path import logging from typing import List import httplib2 from googleapiclient.discovery import build, Resource from googleapiclient.http import MediaFileUpload, HttpRequest from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from tqdm import tqdm from project_settings import project_path, environment logger = logging.getLogger("toolbox") class VideoManager(object): """ https://developers.google.com/youtube/v3/docs/videos?hl=zh-cn scopes: https://developers.google.com/identity/protocols/oauth2/scopes?hl=zh-cn#youtube https://developers.google.com/youtube/v3/guides/uploading_a_video?hl=zh-cn """ def __init__(self, client_secrets_file: str = None, authorized_user_file: str = None, authorized_user_info: dict = None, scopes: List[str] = None, api_service_name: str = "youtube", api_version: str = "v3", ): if client_secrets_file is None and authorized_user_file is None and authorized_user_info is None: raise AssertionError self.client_secrets_file = client_secrets_file self.authorized_user_info = authorized_user_info self.authorized_user_file = authorized_user_file # https://developers.google.com/identity/protocols/oauth2/scopes?hl=zh-cn#youtube self.scopes = scopes or [ "https://www.googleapis.com/auth/youtube", # "https://www.googleapis.com/auth/youtube.force-ssl", "https://www.googleapis.com/auth/youtube.upload", "https://www.googleapis.com/auth/youtube.readonly", "https://www.googleapis.com/auth/youtube.channel-memberships.creator", # "https://www.googleapis.com/auth/youtubepartner", # "https://www.googleapis.com/auth/youtubepartner-channel-audit", ] self.api_service_name = api_service_name self.api_version = api_version self.client = self.get_client() @staticmethod def save_credentials(credentials: Credentials, filename: str): # save credentials state = credentials.to_json() # print(state) state = json.loads(state) Path(filename).parent.mkdir(parents=True, exist_ok=True) with open(filename, "w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=False, indent=2) return filename def get_credentials(self): if self.authorized_user_info is not None: credentials = Credentials.from_authorized_user_info(self.authorized_user_info) elif os.path.exists(self.authorized_user_file): credentials = Credentials.from_authorized_user_file(self.authorized_user_file) else: flow = InstalledAppFlow.from_client_secrets_file( client_secrets_file=self.client_secrets_file, scopes=self.scopes, ) credentials: Credentials = flow.run_local_server(port=0) self.save_credentials(credentials, self.authorized_user_file) # expired check if not credentials.valid: credentials.refresh(Request()) if self.authorized_user_file is not None: self.save_credentials(credentials, self.authorized_user_file) return credentials def get_client(self) -> Resource: """通过 OAuth 2.0 进行认证""" credentials = self.get_credentials() # print(credentials.to_json()) client: Resource = build( serviceName=self.api_service_name, version=self.api_version, credentials=credentials ) return client def list_video_categories(self): """ https://developers.google.com/youtube/v3/docs/videoCategories/list """ video_categories: Resource = self.client.videoCategories() request: HttpRequest = video_categories.list( part="snippet", alt="json", regionCode="TW", ) js = request.execute() return js def upload_video(self, filename: str, metadata: dict): """ https://developers.google.com/youtube/v3/docs/videos/insert?hl=zh-cn """ logger.info(f"start to upload a video: {filename}") media = MediaFileUpload( filename, mimetype="video/*", resumable=True, chunksize=10 * 1024 * 1024 # 10MB分块上传 # chunksize=256 * 1024 ) videos: Resource = self.client.videos() request: HttpRequest = videos.insert( part=",".join(metadata.keys()), body=metadata, media_body=media ) progress_bar = tqdm( total=100, desc="Uploading", ) progress_rate = 0 response = None while response is None: status, response = request.next_chunk() if status: progress_rate_ = int(status.progress() * 100) progress_rate_delta = progress_rate_ - progress_rate progress_bar.update(progress_rate_delta) progress_rate = progress_rate_ logger.info(f"uploading: {progress_rate_}%, , filename: {filename}") logger.info(f"upload video finished: {response["id"]}, filename: {filename}") video_id = response["id"] return video_id def list_video(self): channels: Resource = self.client.channels() channels_response = channels.list( mine=True, part="contentDetails" ).execute() channel_id = channels_response["items"][0]["id"] videos = [] next_page_token = None while True: search_engine: Resource = self.client.search() search_response = search_engine.list( channelId=channel_id, type="video", part="id,snippet", maxResults=50, pageToken=next_page_token ).execute() videos.extend(search_response["items"]) next_page_token = search_response.get("nextPageToken") if not next_page_token: break return videos def list_playlists(self): playlists: Resource = self.client.playlists() request = playlists.list( part="snippet,contentDetails", maxResults=50, mine=True ) response = request.execute() items = response["items"] result = list() for item in items: playlist_id = item["id"] snippet = item["snippet"] channel_id = snippet["channelId"] title = snippet["title"] channel_title = snippet["channelTitle"] row = { "playlist_id": playlist_id, "channel_id": channel_id, "title": title, "channel_title": channel_title, } result.append(row) return result def add_to_playlist(self, playlist_id: str, video_id: str): """ https://developers.google.com/youtube/v3/docs/playlists/insert?hl=zh-cn """ playlist_items: Resource = self.client.playlistItems() request_body = { 'snippet': { 'playlistId': playlist_id, 'resourceId': { 'kind': 'youtube#video', 'videoId': video_id } } } response = playlist_items.insert( part='snippet', body=request_body ).execute() return response def get_args(): parser = argparse.ArgumentParser() parser.add_argument( "--client_secrets_file", default=(project_path / "dotenv/client_secret_2_407880303838-ke7g6d12sa3msca9prjl8dr12njen9r6.apps.googleusercontent.com.json").as_posix(), type=str ) parser.add_argument( "--authorized_user_file", # default=(project_path / "dotenv/authorized_user_file.json").as_posix(), default=(project_path / "dotenv/chenjiesen_authorized_user_file.json").as_posix(), type=str ) # parser.add_argument( # "--api_service_name", # default="youtube", # type=str # ) # parser.add_argument( # "--api_version", # default="v3", # type=str # ) parser.add_argument( "--video_file", default=(project_path / "data/video/douyin/老陈come back/[7518968327880871202][20250623_103744]合肥LV柜姐爸爸事件.mp4").as_posix(), type=str ) args = parser.parse_args() return args def main1(): import log from project_settings import project_path, log_directory, time_zone_info, environment log.setup_size_rotating(log_directory=log_directory, tz_info=time_zone_info) args = get_args() # 上传视频。 manager = VideoManager( # client_secrets_file=args.client_secrets_file, # authorized_user_file=args.authorized_user_file, authorized_user_info=environment.get("youtube_chenjiesen_credentials", dtype=json.loads) ) video_metadata = { "snippet": { "title": "合肥LV柜姐爸爸事件", "description": "", "tags": [], "categoryId": "22", # 教育类,参见:https://developers.google.com/youtube/v3/docs/videoCategories/list "playlistId": None }, "status": { "privacyStatus": "private", # 可选:public/private/unlisted "selfDeclaredMadeForKids": False } } manager.upload_video( filename=args.video_file, metadata=video_metadata, ) return def main2(): args = get_args() # 查询视频列表 manager = VideoManager( # client_secrets_file=args.client_secrets_file, # authorized_user_file=args.authorized_user_file, authorized_user_info=environment.get("youtube_xiaofeng_credentials", dtype=json.loads) ) videos = manager.list_video() print(f"videos count: {len(videos)}") for video in videos: # print(video) video_id = video["id"]["videoId"] snippet = video["snippet"] published_at = snippet["publishedAt"] channel_id = snippet["channelId"] title = snippet["title"] channel_title = snippet["channelTitle"] publish_time = snippet["publishTime"] row = { "video_id": video_id, "published_at": published_at, "channel_id": channel_id, "title": title, "channel_title": channel_title, "publish_time": publish_time, } print(row) return def main3(): import log from project_settings import project_path, log_directory, time_zone_info, environment log.setup_size_rotating(log_directory=log_directory, tz_info=time_zone_info) args = get_args() manager = VideoManager( # client_secrets_file=args.client_secrets_file, authorized_user_file=args.authorized_user_file, # authorized_user_info=environment.get("youtube_chenjiesen_credentials", dtype=json.loads) ) # js = manager.list_video_categories() js = manager.list_playlists() # js = manager.add_to_playlist( # playlist_id="PL1KtQ49rVMElugHudIdyKLAmgmMVdqoxQ", # video_id="ZYmSi5wO2A0", # ) print(json.dumps(js, ensure_ascii=False, indent=4)) return if __name__ == "__main__": main3()