Spaces:
Sleeping
Sleeping
| from youtube_transcript_api import YouTubeTranscriptApi | |
| # from get_video_link import video_links_main | |
| from Data.get_video_link import video_links_main | |
| import os | |
| from datetime import datetime | |
| transcripts = [] | |
| import os | |
| from datetime import datetime | |
| def save_transcript(video_id, transcript_text, folder_name="Data/transcripts"): | |
| """ | |
| Saves transcripts to the local folders | |
| :param video_id: | |
| :param transcript_text: | |
| :param folder_name: | |
| :return: | |
| """ | |
| #using abosule path | |
| base_dir = os.path.dirname(os.path.abspath(__file__)) | |
| folder_path = os.path.join(base_dir, folder_name) | |
| if not os.path.exists(folder_path): | |
| os.makedirs(folder_path) | |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
| filename = f"{video_id}_{timestamp}.txt" | |
| filepath = os.path.join(folder_path, filename) | |
| with open(filepath, "w", encoding="utf-8") as f: | |
| f.write('\n'.join(transcript_text)) | |
| return filepath | |
| def get_video_id(video_links_list): | |
| video_ids = [] | |
| for links in video_links_list: | |
| video_id = links.replace("https://www.youtube.com/watch?v=", "") | |
| video_ids.append(video_id) | |
| return video_ids | |
| def fetch_yt_transcript(video_ids): | |
| """ | |
| fetched youtube transcirpts using videoids | |
| :param video_ids: | |
| :return: None | |
| """ | |
| video_transcripts = {} | |
| for video_id in video_ids: | |
| print(f"Fetching transcript for: {video_id}") | |
| try: | |
| output = YouTubeTranscriptApi.get_transcript(video_id) | |
| transcript_text = [item['text'] for item in output] | |
| # Save transcript and get file path | |
| file_path = save_transcript(video_id, transcript_text) | |
| video_transcripts[video_id] = { | |
| 'text': transcript_text, | |
| 'file_path': file_path | |
| } | |
| print(f"Transcript saved to: {file_path}") | |
| except Exception as e: | |
| print(f"Transcript not found for video: {video_id}") | |
| video_transcripts[video_id] = { | |
| 'text': [], | |
| 'file_path': None | |
| } | |
| return video_transcripts | |
| def all_video_transcript_pipeline(): | |
| """ | |
| Check if there is local folder called transcripts, if not then create one. | |
| if there is then look if new video is added or not | |
| if new video is added then fetch transcripts for the new video and saved it locally. | |
| :return:None | |
| """ | |
| CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| transcripts_folder = os.path.join(CURRENT_DIR, "transcripts") | |
| print(f"Looking for transcripts in: {transcripts_folder}") | |
| video_links_list, new_video_added, new_videos_link = video_links_main() | |
| video_transcripts = {} | |
| # Always load existing transcripts | |
| if os.path.exists(transcripts_folder): | |
| existing_files = os.listdir(transcripts_folder) | |
| print(f"Found {len(existing_files)} files in transcripts folder") | |
| for file in existing_files: | |
| if file.endswith('.txt'): # Make sure we only process text files | |
| video_id = file.split("_")[0] | |
| file_path = os.path.join(transcripts_folder, file) | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| transcript_text = f.read().splitlines() | |
| video_transcripts[video_id] = { | |
| 'text': transcript_text, | |
| 'file_path': file_path | |
| } | |
| print(f"Loaded transcript for video: {video_id}") | |
| except Exception as e: | |
| print(f"Error loading transcript {file}: {e}") | |
| else: | |
| print(f"Transcripts folder not found at: {transcripts_folder}") | |
| os.makedirs(transcripts_folder) | |
| print(f"Created transcripts folder at: {transcripts_folder}") | |
| # Then fetch new transcripts if there are any | |
| if new_video_added and new_videos_link: | |
| print("New videos have been added... Fetching transcripts for new videos") | |
| new_video_ids = [url.split("v=")[1] for url in new_videos_link] # Extract video IDs | |
| new_transcripts = fetch_yt_transcript(new_video_ids) | |
| # Merge new transcripts with existing ones | |
| # video_transcripts.update(new_transcripts) | |
| # print(f"Added {len(new_transcripts)} new transcripts") | |
| print(f"Total transcripts loaded: {len(video_transcripts)}") | |