File size: 5,540 Bytes
af1516d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import logging
from xml.etree.ElementTree import ParseError

from typing import Any, Dict, Generator, List, Optional, Sequence, Union
from urllib.parse import parse_qs, urlparse
from langchain_core.documents import Document
from open_webui.env import SRC_LOG_LEVELS

log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["RAG"])

ALLOWED_SCHEMES = {"http", "https"}
ALLOWED_NETLOCS = {
    "youtu.be",
    "m.youtube.com",
    "youtube.com",
    "www.youtube.com",
    "www.youtube-nocookie.com",
    "vid.plus",
}


def _parse_video_id(url: str) -> Optional[str]:
    """Parse a YouTube URL and return the video ID if valid, otherwise None."""
    parsed_url = urlparse(url)

    if parsed_url.scheme not in ALLOWED_SCHEMES:
        return None

    if parsed_url.netloc not in ALLOWED_NETLOCS:
        return None

    path = parsed_url.path

    if path.endswith("/watch"):
        query = parsed_url.query
        parsed_query = parse_qs(query)
        if "v" in parsed_query:
            ids = parsed_query["v"]
            video_id = ids if isinstance(ids, str) else ids[0]
        else:
            return None
    else:
        path = parsed_url.path.lstrip("/")
        video_id = path.split("/")[-1]

    if len(video_id) != 11:  # Video IDs are 11 characters long
        return None

    return video_id


class YoutubeLoader:
    """Load `YouTube` video transcripts."""

    def __init__(
        self,
        video_id: str,
        language: Union[str, Sequence[str]] = "en",
        proxy_url: Optional[str] = None,
    ):
        """Initialize with YouTube video ID."""
        _video_id = _parse_video_id(video_id)
        self.video_id = _video_id if _video_id is not None else video_id
        self._metadata = {"source": video_id}
        self.proxy_url = proxy_url

        # Ensure language is a list
        if isinstance(language, str):
            self.language = [language]
        else:
            self.language = list(language)

        # Add English as fallback if not already in the list
        if "en" not in self.language:
            self.language.append("en")

    def load(self) -> List[Document]:
        """Load YouTube transcripts into `Document` objects."""
        try:
            from youtube_transcript_api import (
                NoTranscriptFound,
                TranscriptsDisabled,
                YouTubeTranscriptApi,
            )
        except ImportError:
            raise ImportError(
                'Could not import "youtube_transcript_api" Python package. '
                "Please install it with `pip install youtube-transcript-api`."
            )

        if self.proxy_url:
            youtube_proxies = {
                "http": self.proxy_url,
                "https": self.proxy_url,
            }
            log.debug(f"Using proxy URL: {self.proxy_url[:14]}...")
        else:
            youtube_proxies = None

        try:
            transcript_list = YouTubeTranscriptApi.list_transcripts(
                self.video_id, proxies=youtube_proxies
            )
        except Exception as e:
            log.exception("Loading YouTube transcript failed")
            return []

        # Try each language in order of priority
        for lang in self.language:
            try:
                transcript = transcript_list.find_transcript([lang])
                if transcript.is_generated:
                    log.debug(f"Found generated transcript for language '{lang}'")
                    try:
                        transcript = transcript_list.find_manually_created_transcript(
                            [lang]
                        )
                        log.debug(f"Found manual transcript for language '{lang}'")
                    except NoTranscriptFound:
                        log.debug(
                            f"No manual transcript found for language '{lang}', using generated"
                        )
                        pass

                log.debug(f"Found transcript for language '{lang}'")
                try:
                    transcript_pieces: List[Dict[str, Any]] = transcript.fetch()
                except ParseError:
                    log.debug(f"Empty or invalid transcript for language '{lang}'")
                    continue

                if not transcript_pieces:
                    log.debug(f"Empty transcript for language '{lang}'")
                    continue

                transcript_text = " ".join(
                    map(
                        lambda transcript_piece: (
                            transcript_piece.text.strip(" ")
                            if hasattr(transcript_piece, "text")
                            else ""
                        ),
                        transcript_pieces,
                    )
                )
                return [Document(page_content=transcript_text, metadata=self._metadata)]
            except NoTranscriptFound:
                log.debug(f"No transcript found for language '{lang}'")
                continue
            except Exception as e:
                log.info(f"Error finding transcript for language '{lang}'")
                raise e

        # If we get here, all languages failed
        languages_tried = ", ".join(self.language)
        log.warning(
            f"No transcript found for any of the specified languages: {languages_tried}. Verify if the video has transcripts, add more languages if needed."
        )
        raise NoTranscriptFound(self.video_id, self.language, list(transcript_list))