from youtube_transcript_api import YouTubeTranscriptApi # from get_video_link import video_links_main from Data.get_video_link import video_links_main import os from datetime import datetime transcripts = [] import os from datetime import datetime def save_transcript(video_id, transcript_text, folder_name="Data/transcripts"): """ Saves transcripts to the local folders :param video_id: :param transcript_text: :param folder_name: :return: """ #using abosule path base_dir = os.path.dirname(os.path.abspath(__file__)) folder_path = os.path.join(base_dir, folder_name) if not os.path.exists(folder_path): os.makedirs(folder_path) timestamp = datetime.now().strftime("%Y%m%d%H%M%S") filename = f"{video_id}_{timestamp}.txt" filepath = os.path.join(folder_path, filename) with open(filepath, "w", encoding="utf-8") as f: f.write('\n'.join(transcript_text)) return filepath def get_video_id(video_links_list): video_ids = [] for links in video_links_list: video_id = links.replace("https://www.youtube.com/watch?v=", "") video_ids.append(video_id) return video_ids def fetch_yt_transcript(video_ids): """ fetched youtube transcirpts using videoids :param video_ids: :return: None """ video_transcripts = {} for video_id in video_ids: print(f"Fetching transcript for: {video_id}") try: output = YouTubeTranscriptApi.get_transcript(video_id) transcript_text = [item['text'] for item in output] # Save transcript and get file path file_path = save_transcript(video_id, transcript_text) video_transcripts[video_id] = { 'text': transcript_text, 'file_path': file_path } print(f"Transcript saved to: {file_path}") except Exception as e: print(f"Transcript not found for video: {video_id}") video_transcripts[video_id] = { 'text': [], 'file_path': None } return video_transcripts def all_video_transcript_pipeline(): """ Check if there is local folder called transcripts, if not then create one. if there is then look if new video is added or not if new video is added then fetch transcripts for the new video and saved it locally. :return:None """ CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) transcripts_folder = os.path.join(CURRENT_DIR, "transcripts") print(f"Looking for transcripts in: {transcripts_folder}") video_links_list, new_video_added, new_videos_link = video_links_main() video_transcripts = {} # Always load existing transcripts if os.path.exists(transcripts_folder): existing_files = os.listdir(transcripts_folder) print(f"Found {len(existing_files)} files in transcripts folder") for file in existing_files: if file.endswith('.txt'): # Make sure we only process text files video_id = file.split("_")[0] file_path = os.path.join(transcripts_folder, file) try: with open(file_path, "r", encoding="utf-8") as f: transcript_text = f.read().splitlines() video_transcripts[video_id] = { 'text': transcript_text, 'file_path': file_path } print(f"Loaded transcript for video: {video_id}") except Exception as e: print(f"Error loading transcript {file}: {e}") else: print(f"Transcripts folder not found at: {transcripts_folder}") os.makedirs(transcripts_folder) print(f"Created transcripts folder at: {transcripts_folder}") # Then fetch new transcripts if there are any if new_video_added and new_videos_link: print("New videos have been added... Fetching transcripts for new videos") new_video_ids = [url.split("v=")[1] for url in new_videos_link] # Extract video IDs new_transcripts = fetch_yt_transcript(new_video_ids) # Merge new transcripts with existing ones # video_transcripts.update(new_transcripts) # print(f"Added {len(new_transcripts)} new transcripts") print(f"Total transcripts loaded: {len(video_transcripts)}")