File size: 2,488 Bytes
81e73d0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import ffmpeg

from app.config import Settings

settings = Settings()


class Stream:
    def __init__(self, stream_data: dict):
        self.stream_data = stream_data
        self.url = stream_data["url"]
        self.format = stream_data["format"]
        self.quality = stream_data["quality"]
        self.mime_type = stream_data["mimeType"]
        self.codec = stream_data["codec"]
        self.audio_track_id = stream_data["audioTrackId"]
        self.audio_track_name = stream_data["audioTrackName"]
        self.audio_track_locale = stream_data["audioTrackLocale"]
        self.video_only = stream_data["videoOnly"]
        self.itag = stream_data["itag"]
        self.bitrate = stream_data["bitrate"]
        self.init_start = stream_data["initStart"]
        self.init_end = stream_data["initEnd"]
        self.index_start = stream_data["indexStart"]
        self.index_end = stream_data["indexEnd"]
        self.width = stream_data["width"]
        self.height = stream_data["height"]
        self.fps = stream_data["fps"]
        self.content_length = stream_data["contentLength"]


def download_stream(
    stream: Stream,
    title: str,
    file_type: str,
) -> str:
    output_path = f"{settings.download_path}/{file_type}"

    file_name = f"{file_type}_{title}"

    file_path = f"{output_path}/{file_name}.{get_file_ext_from_format(stream.format)}"

    if file_type == "video":
        video_stream = ffmpeg.input(stream.url)
        ffmpeg.output(
            video_stream,
            filename=file_path,
            format=get_file_ext_from_format(stream.format),
            vcodec="copy",
            strict="experimental",
        ).overwrite_output().run()
    elif file_type == "audio":
        audio_stream = ffmpeg.input(stream.url)
        ffmpeg.output(
            audio_stream,
            filename=file_path,
            format=get_file_ext_from_format(stream.format),
            acodec="copy",
            strict="experimental",
        ).overwrite_output().run()

    return file_path


def get_file_ext_from_format(format: str) -> str:
    match format:
        case "M4A":
            return "mp4"
        case "WEBMA_OPUS":
            return "webm"
        case "WEBMA_VORBIS":
            return "webm"
        case "WEBM":
            return "webm"
        case "WEBM_OPUS":
            return "webm"
        case "WEBM_VORBIS":
            return "webm"
        case "MPEG_4":
            return "mp4"
        case _:
            return "mp4"