File size: 5,540 Bytes
af1516d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
import logging
from xml.etree.ElementTree import ParseError
from typing import Any, Dict, Generator, List, Optional, Sequence, Union
from urllib.parse import parse_qs, urlparse
from langchain_core.documents import Document
from open_webui.env import SRC_LOG_LEVELS
log = logging.getLogger(__name__)
log.setLevel(SRC_LOG_LEVELS["RAG"])
ALLOWED_SCHEMES = {"http", "https"}
ALLOWED_NETLOCS = {
"youtu.be",
"m.youtube.com",
"youtube.com",
"www.youtube.com",
"www.youtube-nocookie.com",
"vid.plus",
}
def _parse_video_id(url: str) -> Optional[str]:
"""Parse a YouTube URL and return the video ID if valid, otherwise None."""
parsed_url = urlparse(url)
if parsed_url.scheme not in ALLOWED_SCHEMES:
return None
if parsed_url.netloc not in ALLOWED_NETLOCS:
return None
path = parsed_url.path
if path.endswith("/watch"):
query = parsed_url.query
parsed_query = parse_qs(query)
if "v" in parsed_query:
ids = parsed_query["v"]
video_id = ids if isinstance(ids, str) else ids[0]
else:
return None
else:
path = parsed_url.path.lstrip("/")
video_id = path.split("/")[-1]
if len(video_id) != 11: # Video IDs are 11 characters long
return None
return video_id
class YoutubeLoader:
"""Load `YouTube` video transcripts."""
def __init__(
self,
video_id: str,
language: Union[str, Sequence[str]] = "en",
proxy_url: Optional[str] = None,
):
"""Initialize with YouTube video ID."""
_video_id = _parse_video_id(video_id)
self.video_id = _video_id if _video_id is not None else video_id
self._metadata = {"source": video_id}
self.proxy_url = proxy_url
# Ensure language is a list
if isinstance(language, str):
self.language = [language]
else:
self.language = list(language)
# Add English as fallback if not already in the list
if "en" not in self.language:
self.language.append("en")
def load(self) -> List[Document]:
"""Load YouTube transcripts into `Document` objects."""
try:
from youtube_transcript_api import (
NoTranscriptFound,
TranscriptsDisabled,
YouTubeTranscriptApi,
)
except ImportError:
raise ImportError(
'Could not import "youtube_transcript_api" Python package. '
"Please install it with `pip install youtube-transcript-api`."
)
if self.proxy_url:
youtube_proxies = {
"http": self.proxy_url,
"https": self.proxy_url,
}
log.debug(f"Using proxy URL: {self.proxy_url[:14]}...")
else:
youtube_proxies = None
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(
self.video_id, proxies=youtube_proxies
)
except Exception as e:
log.exception("Loading YouTube transcript failed")
return []
# Try each language in order of priority
for lang in self.language:
try:
transcript = transcript_list.find_transcript([lang])
if transcript.is_generated:
log.debug(f"Found generated transcript for language '{lang}'")
try:
transcript = transcript_list.find_manually_created_transcript(
[lang]
)
log.debug(f"Found manual transcript for language '{lang}'")
except NoTranscriptFound:
log.debug(
f"No manual transcript found for language '{lang}', using generated"
)
pass
log.debug(f"Found transcript for language '{lang}'")
try:
transcript_pieces: List[Dict[str, Any]] = transcript.fetch()
except ParseError:
log.debug(f"Empty or invalid transcript for language '{lang}'")
continue
if not transcript_pieces:
log.debug(f"Empty transcript for language '{lang}'")
continue
transcript_text = " ".join(
map(
lambda transcript_piece: (
transcript_piece.text.strip(" ")
if hasattr(transcript_piece, "text")
else ""
),
transcript_pieces,
)
)
return [Document(page_content=transcript_text, metadata=self._metadata)]
except NoTranscriptFound:
log.debug(f"No transcript found for language '{lang}'")
continue
except Exception as e:
log.info(f"Error finding transcript for language '{lang}'")
raise e
# If we get here, all languages failed
languages_tried = ", ".join(self.language)
log.warning(
f"No transcript found for any of the specified languages: {languages_tried}. Verify if the video has transcripts, add more languages if needed."
)
raise NoTranscriptFound(self.video_id, self.language, list(transcript_list))
|