Hubermanbot2 / Data /yt_transcript.py
Nightwing11's picture
Path issue resolved
a9725a0
raw
history blame
3.4 kB
from youtube_transcript_api import YouTubeTranscriptApi
from Data.get_video_link import video_links_main
from pathlib import Path
from datetime import datetime
# Dynamically get the root directory of the project
PROJECT_ROOT = Path(__file__).resolve().parent.parent # Moves up from /Data/
TRANSCRIPTS_FOLDER = PROJECT_ROOT / "Data" / "transcripts"
def save_transcript(video_id, transcript_text):
"""
Saves transcripts to the local folder
"""
# Ensure the transcripts folder exists
TRANSCRIPTS_FOLDER.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
filename = f"{video_id}_{timestamp}.txt"
file_path = TRANSCRIPTS_FOLDER / filename
file_path.write_text('\n'.join(transcript_text), encoding="utf-8")
return file_path
def get_video_id(video_links_list):
return [link.replace("https://www.youtube.com/watch?v=", "") for link in video_links_list]
def fetch_yt_transcript(video_ids):
"""
Fetches YouTube transcripts using video IDs.
"""
video_transcripts = {}
for video_id in video_ids:
print(f"Fetching transcript for: {video_id}")
try:
output = YouTubeTranscriptApi.get_transcript(video_id)
transcript_text = [item['text'] for item in output]
# Save transcript and get file path
file_path = save_transcript(video_id, transcript_text)
video_transcripts[video_id] = {
'text': transcript_text,
'file_path': str(file_path)
}
print(f"Transcript saved to: {file_path}")
except Exception as e:
print(f"Transcript not found for video: {video_id}")
video_transcripts[video_id] = {
'text': [],
'file_path': None
}
return video_transcripts
def all_video_transcript_pipeline():
"""
Handles fetching and storing transcripts, checking for new videos.
"""
print(f"Looking for transcripts in: {TRANSCRIPTS_FOLDER}")
video_links_list, new_video_added, new_videos_link = video_links_main()
video_transcripts = {}
# Always load existing transcripts
if TRANSCRIPTS_FOLDER.exists():
existing_files = list(TRANSCRIPTS_FOLDER.glob("*.txt"))
print(f"Found {len(existing_files)} transcript files.")
for file in existing_files:
video_id = file.stem.split("_")[0] # Extract video ID
try:
transcript_text = file.read_text(encoding="utf-8").splitlines()
video_transcripts[video_id] = {
'text': transcript_text,
'file_path': str(file)
}
print(f"Loaded transcript for video: {video_id}")
except Exception as e:
print(f"Error loading transcript {file.name}: {e}")
else:
print(f"Transcripts folder not found at: {TRANSCRIPTS_FOLDER}, creating it.")
TRANSCRIPTS_FOLDER.mkdir(parents=True, exist_ok=True)
# Fetch new transcripts if needed
if new_video_added and new_videos_link:
print("New videos detected... Fetching transcripts.")
new_video_ids = [url.split("v=")[1] for url in new_videos_link] # Extract video IDs
new_transcripts = fetch_yt_transcript(new_video_ids)
print(f"Total transcripts loaded: {len(video_transcripts)}")