qgyd2021's picture
first commit
f176037
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
https://github.com/googleapis/google-api-python-client
https://console.cloud.google.com/apis/api/youtube.googleapis.com/quotas?inv=1&invt=Ab3P9A&project=youtube-manager-20250703
"""
import json
import os
import argparse
from pathlib import Path
import logging
from typing import List
import httplib2
from googleapiclient.discovery import build, Resource
from googleapiclient.http import MediaFileUpload, HttpRequest
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from tqdm import tqdm
from project_settings import project_path, environment
logger = logging.getLogger("toolbox")
class VideoManager(object):
"""
https://developers.google.com/youtube/v3/docs/videos?hl=zh-cn
scopes:
https://developers.google.com/identity/protocols/oauth2/scopes?hl=zh-cn#youtube
https://developers.google.com/youtube/v3/guides/uploading_a_video?hl=zh-cn
"""
def __init__(self,
client_secrets_file: str = None,
authorized_user_file: str = None,
authorized_user_info: dict = None,
scopes: List[str] = None,
api_service_name: str = "youtube",
api_version: str = "v3",
):
if client_secrets_file is None and authorized_user_file is None and authorized_user_info is None:
raise AssertionError
self.client_secrets_file = client_secrets_file
self.authorized_user_info = authorized_user_info
self.authorized_user_file = authorized_user_file
# https://developers.google.com/identity/protocols/oauth2/scopes?hl=zh-cn#youtube
self.scopes = scopes or [
"https://www.googleapis.com/auth/youtube",
# "https://www.googleapis.com/auth/youtube.force-ssl",
"https://www.googleapis.com/auth/youtube.upload",
"https://www.googleapis.com/auth/youtube.readonly",
"https://www.googleapis.com/auth/youtube.channel-memberships.creator",
# "https://www.googleapis.com/auth/youtubepartner",
# "https://www.googleapis.com/auth/youtubepartner-channel-audit",
]
self.api_service_name = api_service_name
self.api_version = api_version
self.client = self.get_client()
@staticmethod
def save_credentials(credentials: Credentials, filename: str):
# save credentials
state = credentials.to_json()
# print(state)
state = json.loads(state)
Path(filename).parent.mkdir(parents=True, exist_ok=True)
with open(filename, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
return filename
def get_credentials(self):
if self.authorized_user_info is not None:
credentials = Credentials.from_authorized_user_info(self.authorized_user_info)
elif os.path.exists(self.authorized_user_file):
credentials = Credentials.from_authorized_user_file(self.authorized_user_file)
else:
flow = InstalledAppFlow.from_client_secrets_file(
client_secrets_file=self.client_secrets_file,
scopes=self.scopes,
)
credentials: Credentials = flow.run_local_server(port=0)
self.save_credentials(credentials, self.authorized_user_file)
# expired check
if not credentials.valid:
credentials.refresh(Request())
if self.authorized_user_file is not None:
self.save_credentials(credentials, self.authorized_user_file)
return credentials
def get_client(self) -> Resource:
"""通过 OAuth 2.0 进行认证"""
credentials = self.get_credentials()
# print(credentials.to_json())
client: Resource = build(
serviceName=self.api_service_name,
version=self.api_version,
credentials=credentials
)
return client
def list_video_categories(self):
"""
https://developers.google.com/youtube/v3/docs/videoCategories/list
"""
video_categories: Resource = self.client.videoCategories()
request: HttpRequest = video_categories.list(
part="snippet",
alt="json",
regionCode="TW",
)
js = request.execute()
return js
def upload_video(self, filename: str, metadata: dict):
"""
https://developers.google.com/youtube/v3/docs/videos/insert?hl=zh-cn
"""
logger.info(f"start to upload a video: {filename}")
media = MediaFileUpload(
filename,
mimetype="video/*",
resumable=True,
chunksize=10 * 1024 * 1024 # 10MB分块上传
# chunksize=256 * 1024
)
videos: Resource = self.client.videos()
request: HttpRequest = videos.insert(
part=",".join(metadata.keys()),
body=metadata,
media_body=media
)
progress_bar = tqdm(
total=100,
desc="Uploading",
)
progress_rate = 0
response = None
while response is None:
status, response = request.next_chunk()
if status:
progress_rate_ = int(status.progress() * 100)
progress_rate_delta = progress_rate_ - progress_rate
progress_bar.update(progress_rate_delta)
progress_rate = progress_rate_
logger.info(f"uploading: {progress_rate_}%, , filename: {filename}")
logger.info(f"upload video finished: {response["id"]}, filename: {filename}")
video_id = response["id"]
return video_id
def list_video(self):
channels: Resource = self.client.channels()
channels_response = channels.list(
mine=True,
part="contentDetails"
).execute()
channel_id = channels_response["items"][0]["id"]
videos = []
next_page_token = None
while True:
search_engine: Resource = self.client.search()
search_response = search_engine.list(
channelId=channel_id,
type="video",
part="id,snippet",
maxResults=50,
pageToken=next_page_token
).execute()
videos.extend(search_response["items"])
next_page_token = search_response.get("nextPageToken")
if not next_page_token:
break
return videos
def list_playlists(self):
playlists: Resource = self.client.playlists()
request = playlists.list(
part="snippet,contentDetails",
maxResults=50,
mine=True
)
response = request.execute()
items = response["items"]
result = list()
for item in items:
playlist_id = item["id"]
snippet = item["snippet"]
channel_id = snippet["channelId"]
title = snippet["title"]
channel_title = snippet["channelTitle"]
row = {
"playlist_id": playlist_id,
"channel_id": channel_id,
"title": title,
"channel_title": channel_title,
}
result.append(row)
return result
def add_to_playlist(self, playlist_id: str, video_id: str):
"""
https://developers.google.com/youtube/v3/docs/playlists/insert?hl=zh-cn
"""
playlist_items: Resource = self.client.playlistItems()
request_body = {
'snippet': {
'playlistId': playlist_id,
'resourceId': {
'kind': 'youtube#video',
'videoId': video_id
}
}
}
response = playlist_items.insert(
part='snippet',
body=request_body
).execute()
return response
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--client_secrets_file",
default=(project_path / "dotenv/client_secret_2_407880303838-ke7g6d12sa3msca9prjl8dr12njen9r6.apps.googleusercontent.com.json").as_posix(),
type=str
)
parser.add_argument(
"--authorized_user_file",
# default=(project_path / "dotenv/authorized_user_file.json").as_posix(),
default=(project_path / "dotenv/chenjiesen_authorized_user_file.json").as_posix(),
type=str
)
# parser.add_argument(
# "--api_service_name",
# default="youtube",
# type=str
# )
# parser.add_argument(
# "--api_version",
# default="v3",
# type=str
# )
parser.add_argument(
"--video_file",
default=(project_path / "data/video/douyin/老陈come back/[7518968327880871202][20250623_103744]合肥LV柜姐爸爸事件.mp4").as_posix(),
type=str
)
args = parser.parse_args()
return args
def main1():
import log
from project_settings import project_path, log_directory, time_zone_info, environment
log.setup_size_rotating(log_directory=log_directory, tz_info=time_zone_info)
args = get_args()
# 上传视频。
manager = VideoManager(
# client_secrets_file=args.client_secrets_file,
# authorized_user_file=args.authorized_user_file,
authorized_user_info=environment.get("youtube_chenjiesen_credentials", dtype=json.loads)
)
video_metadata = {
"snippet": {
"title": "合肥LV柜姐爸爸事件",
"description": "",
"tags": [],
"categoryId": "22", # 教育类,参见:https://developers.google.com/youtube/v3/docs/videoCategories/list
"playlistId": None
},
"status": {
"privacyStatus": "private", # 可选:public/private/unlisted
"selfDeclaredMadeForKids": False
}
}
manager.upload_video(
filename=args.video_file,
metadata=video_metadata,
)
return
def main2():
args = get_args()
# 查询视频列表
manager = VideoManager(
# client_secrets_file=args.client_secrets_file,
# authorized_user_file=args.authorized_user_file,
authorized_user_info=environment.get("youtube_xiaofeng_credentials", dtype=json.loads)
)
videos = manager.list_video()
print(f"videos count: {len(videos)}")
for video in videos:
# print(video)
video_id = video["id"]["videoId"]
snippet = video["snippet"]
published_at = snippet["publishedAt"]
channel_id = snippet["channelId"]
title = snippet["title"]
channel_title = snippet["channelTitle"]
publish_time = snippet["publishTime"]
row = {
"video_id": video_id,
"published_at": published_at,
"channel_id": channel_id,
"title": title,
"channel_title": channel_title,
"publish_time": publish_time,
}
print(row)
return
def main3():
import log
from project_settings import project_path, log_directory, time_zone_info, environment
log.setup_size_rotating(log_directory=log_directory, tz_info=time_zone_info)
args = get_args()
manager = VideoManager(
# client_secrets_file=args.client_secrets_file,
authorized_user_file=args.authorized_user_file,
# authorized_user_info=environment.get("youtube_chenjiesen_credentials", dtype=json.loads)
)
# js = manager.list_video_categories()
js = manager.list_playlists()
# js = manager.add_to_playlist(
# playlist_id="PL1KtQ49rVMElugHudIdyKLAmgmMVdqoxQ",
# video_id="ZYmSi5wO2A0",
# )
print(json.dumps(js, ensure_ascii=False, indent=4))
return
if __name__ == "__main__":
main3()