Spaces:
Sleeping
Sleeping
File size: 3,397 Bytes
1804d45 7f23ebc a9725a0 ca6f751 a9725a0 ca6f751 a9725a0 147ae7a a9725a0 147ae7a a9725a0 1804d45 1047115 a9725a0 ca6f751 a9725a0 1047115 1804d45 a9725a0 1804d45 2d4a391 147ae7a a9725a0 147ae7a 1047115 4a2aa81 1047115 a9725a0 1047115 4a2aa81 1047115 2d4a391 147ae7a a9725a0 147ae7a a9725a0 927f731 6671679 a9725a0 6671679 927f731 a9725a0 6671679 a9725a0 927f731 a9725a0 6671679 a9725a0 6671679 927f731 6671679 147ae7a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
from youtube_transcript_api import YouTubeTranscriptApi
from Data.get_video_link import video_links_main
from pathlib import Path
from datetime import datetime
# Dynamically get the root directory of the project
PROJECT_ROOT = Path(__file__).resolve().parent.parent # Moves up from /Data/
TRANSCRIPTS_FOLDER = PROJECT_ROOT / "Data" / "transcripts"
def save_transcript(video_id, transcript_text):
"""
Saves transcripts to the local folder
"""
# Ensure the transcripts folder exists
TRANSCRIPTS_FOLDER.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
filename = f"{video_id}_{timestamp}.txt"
file_path = TRANSCRIPTS_FOLDER / filename
file_path.write_text('\n'.join(transcript_text), encoding="utf-8")
return file_path
def get_video_id(video_links_list):
return [link.replace("https://www.youtube.com/watch?v=", "") for link in video_links_list]
def fetch_yt_transcript(video_ids):
"""
Fetches YouTube transcripts using video IDs.
"""
video_transcripts = {}
for video_id in video_ids:
print(f"Fetching transcript for: {video_id}")
try:
output = YouTubeTranscriptApi.get_transcript(video_id)
transcript_text = [item['text'] for item in output]
# Save transcript and get file path
file_path = save_transcript(video_id, transcript_text)
video_transcripts[video_id] = {
'text': transcript_text,
'file_path': str(file_path)
}
print(f"Transcript saved to: {file_path}")
except Exception as e:
print(f"Transcript not found for video: {video_id}")
video_transcripts[video_id] = {
'text': [],
'file_path': None
}
return video_transcripts
def all_video_transcript_pipeline():
"""
Handles fetching and storing transcripts, checking for new videos.
"""
print(f"Looking for transcripts in: {TRANSCRIPTS_FOLDER}")
video_links_list, new_video_added, new_videos_link = video_links_main()
video_transcripts = {}
# Always load existing transcripts
if TRANSCRIPTS_FOLDER.exists():
existing_files = list(TRANSCRIPTS_FOLDER.glob("*.txt"))
print(f"Found {len(existing_files)} transcript files.")
for file in existing_files:
video_id = file.stem.split("_")[0] # Extract video ID
try:
transcript_text = file.read_text(encoding="utf-8").splitlines()
video_transcripts[video_id] = {
'text': transcript_text,
'file_path': str(file)
}
print(f"Loaded transcript for video: {video_id}")
except Exception as e:
print(f"Error loading transcript {file.name}: {e}")
else:
print(f"Transcripts folder not found at: {TRANSCRIPTS_FOLDER}, creating it.")
TRANSCRIPTS_FOLDER.mkdir(parents=True, exist_ok=True)
# Fetch new transcripts if needed
if new_video_added and new_videos_link:
print("New videos detected... Fetching transcripts.")
new_video_ids = [url.split("v=")[1] for url in new_videos_link] # Extract video IDs
new_transcripts = fetch_yt_transcript(new_video_ids)
print(f"Total transcripts loaded: {len(video_transcripts)}")
|