File size: 3,397 Bytes
1804d45
7f23ebc
a9725a0
ca6f751
 
a9725a0
 
 
ca6f751
a9725a0
147ae7a
a9725a0
147ae7a
a9725a0
 
1804d45
1047115
 
a9725a0
ca6f751
a9725a0
 
1047115
 
1804d45
a9725a0
1804d45
 
2d4a391
147ae7a
a9725a0
147ae7a
 
1047115
 
 
4a2aa81
1047115
 
 
 
 
 
 
a9725a0
1047115
 
 
4a2aa81
1047115
 
 
 
 
 
 
2d4a391
 
 
147ae7a
a9725a0
147ae7a
a9725a0
927f731
 
 
6671679
a9725a0
 
 
6671679
927f731
a9725a0
 
 
 
 
 
 
 
 
 
6671679
a9725a0
 
927f731
a9725a0
6671679
a9725a0
6671679
 
927f731
6671679
147ae7a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from youtube_transcript_api import YouTubeTranscriptApi
from Data.get_video_link import video_links_main
from pathlib import Path
from datetime import datetime

# Dynamically get the root directory of the project
PROJECT_ROOT = Path(__file__).resolve().parent.parent  # Moves up from /Data/
TRANSCRIPTS_FOLDER = PROJECT_ROOT / "Data" / "transcripts"

def save_transcript(video_id, transcript_text):
    """
    Saves transcripts to the local folder
    """
    # Ensure the transcripts folder exists
    TRANSCRIPTS_FOLDER.mkdir(parents=True, exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    filename = f"{video_id}_{timestamp}.txt"
    file_path = TRANSCRIPTS_FOLDER / filename

    file_path.write_text('\n'.join(transcript_text), encoding="utf-8")
    return file_path


def get_video_id(video_links_list):
    return [link.replace("https://www.youtube.com/watch?v=", "") for link in video_links_list]


def fetch_yt_transcript(video_ids):
    """
    Fetches YouTube transcripts using video IDs.
    """
    video_transcripts = {}

    for video_id in video_ids:
        print(f"Fetching transcript for: {video_id}")
        try:
            output = YouTubeTranscriptApi.get_transcript(video_id)
            transcript_text = [item['text'] for item in output]

            # Save transcript and get file path
            file_path = save_transcript(video_id, transcript_text)
            video_transcripts[video_id] = {
                'text': transcript_text,
                'file_path': str(file_path)
            }
            print(f"Transcript saved to: {file_path}")

        except Exception as e:
            print(f"Transcript not found for video: {video_id}")
            video_transcripts[video_id] = {
                'text': [],
                'file_path': None
            }

    return video_transcripts


def all_video_transcript_pipeline():
    """
    Handles fetching and storing transcripts, checking for new videos.
    """
    print(f"Looking for transcripts in: {TRANSCRIPTS_FOLDER}")
    video_links_list, new_video_added, new_videos_link = video_links_main()
    video_transcripts = {}

    # Always load existing transcripts
    if TRANSCRIPTS_FOLDER.exists():
        existing_files = list(TRANSCRIPTS_FOLDER.glob("*.txt"))
        print(f"Found {len(existing_files)} transcript files.")

        for file in existing_files:
            video_id = file.stem.split("_")[0]  # Extract video ID
            try:
                transcript_text = file.read_text(encoding="utf-8").splitlines()
                video_transcripts[video_id] = {
                    'text': transcript_text,
                    'file_path': str(file)
                }
                print(f"Loaded transcript for video: {video_id}")
            except Exception as e:
                print(f"Error loading transcript {file.name}: {e}")
    else:
        print(f"Transcripts folder not found at: {TRANSCRIPTS_FOLDER}, creating it.")
        TRANSCRIPTS_FOLDER.mkdir(parents=True, exist_ok=True)

    # Fetch new transcripts if needed
    if new_video_added and new_videos_link:
        print("New videos detected... Fetching transcripts.")
        new_video_ids = [url.split("v=")[1] for url in new_videos_link]  # Extract video IDs
        new_transcripts = fetch_yt_transcript(new_video_ids)

    print(f"Total transcripts loaded: {len(video_transcripts)}")