Spaces:
Sleeping
Sleeping
from youtube_transcript_api import YouTubeTranscriptApi | |
from Data.get_video_link import video_links_main | |
from pathlib import Path | |
from datetime import datetime | |
# Dynamically get the root directory of the project | |
PROJECT_ROOT = Path(__file__).resolve().parent.parent # Moves up from /Data/ | |
TRANSCRIPTS_FOLDER = PROJECT_ROOT / "Data" / "transcripts" | |
def save_transcript(video_id, transcript_text): | |
""" | |
Saves transcripts to the local folder | |
""" | |
# Ensure the transcripts folder exists | |
TRANSCRIPTS_FOLDER.mkdir(parents=True, exist_ok=True) | |
timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
filename = f"{video_id}_{timestamp}.txt" | |
file_path = TRANSCRIPTS_FOLDER / filename | |
file_path.write_text('\n'.join(transcript_text), encoding="utf-8") | |
return file_path | |
def get_video_id(video_links_list): | |
return [link.replace("https://www.youtube.com/watch?v=", "") for link in video_links_list] | |
def fetch_yt_transcript(video_ids): | |
""" | |
Fetches YouTube transcripts using video IDs. | |
""" | |
video_transcripts = {} | |
for video_id in video_ids: | |
print(f"Fetching transcript for: {video_id}") | |
try: | |
output = YouTubeTranscriptApi.get_transcript(video_id) | |
transcript_text = [item['text'] for item in output] | |
# Save transcript and get file path | |
file_path = save_transcript(video_id, transcript_text) | |
video_transcripts[video_id] = { | |
'text': transcript_text, | |
'file_path': str(file_path) | |
} | |
print(f"Transcript saved to: {file_path}") | |
except Exception as e: | |
print(f"Transcript not found for video: {video_id}") | |
video_transcripts[video_id] = { | |
'text': [], | |
'file_path': None | |
} | |
return video_transcripts | |
def all_video_transcript_pipeline(): | |
""" | |
Handles fetching and storing transcripts, checking for new videos. | |
""" | |
print(f"Looking for transcripts in: {TRANSCRIPTS_FOLDER}") | |
video_links_list, new_video_added, new_videos_link = video_links_main() | |
video_transcripts = {} | |
# Always load existing transcripts | |
if TRANSCRIPTS_FOLDER.exists(): | |
existing_files = list(TRANSCRIPTS_FOLDER.glob("*.txt")) | |
print(f"Found {len(existing_files)} transcript files.") | |
for file in existing_files: | |
video_id = file.stem.split("_")[0] # Extract video ID | |
try: | |
transcript_text = file.read_text(encoding="utf-8").splitlines() | |
video_transcripts[video_id] = { | |
'text': transcript_text, | |
'file_path': str(file) | |
} | |
print(f"Loaded transcript for video: {video_id}") | |
except Exception as e: | |
print(f"Error loading transcript {file.name}: {e}") | |
else: | |
print(f"Transcripts folder not found at: {TRANSCRIPTS_FOLDER}, creating it.") | |
TRANSCRIPTS_FOLDER.mkdir(parents=True, exist_ok=True) | |
# Fetch new transcripts if needed | |
if new_video_added and new_videos_link: | |
print("New videos detected... Fetching transcripts.") | |
new_video_ids = [url.split("v=")[1] for url in new_videos_link] # Extract video IDs | |
new_transcripts = fetch_yt_transcript(new_video_ids) | |
print(f"Total transcripts loaded: {len(video_transcripts)}") | |