File size: 7,629 Bytes
e29ca88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3dddfe4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import glob
import os.path
import subprocess

from yt_dlp import YoutubeDL
from pymediainfo import MediaInfo

from configs import settings


def download_video(
        url: str,
        output_dir: str = None,
        max_resolution: int = 1080,
        max_fps: float = 60,
        extension: str = 'mp4'
) -> tuple[str, dict]:
    """Download a video from YouTube or other supported sites. Returns the file path and video metadata.

    Args:
        url (str): The URL of the video.
        output_dir (str, optional): Directory to save the downloaded video. Defaults to current directory.
        max_resolution (int, optional): Maximum resolution of the video to download. Defaults to 1080.
        max_fps (float, optional): Maximum frames per second of the video to download. Defaults to 60.
        extension (str, optional): File extension for the downloaded video. Defaults to 'mp4'.

    Returns:
        tuple[str, dict]: A tuple containing the path to the downloaded video file and its metadata.
    """

    ydl_opts = {
        'format': f'bestvideo[height<={max_resolution}][fps<={max_fps}][ext={extension}]+'
                  f'bestaudio/best[height<={max_resolution}][fps<={max_fps}][ext={extension}]/best',
        'merge_output_format': extension,
        'outtmpl': f'{output_dir or "."}/%(title)s.%(ext)s',
        'noplaylist': True,
        'cookiefile': settings.COOKIES_FILE
    }
    with YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=True)
        ydl.download([url])
        if output_dir:
            output_path = os.path.join(output_dir, ydl.prepare_filename(info))
        else:
            output_path = ydl.prepare_filename(info)

    return output_path, info


def extract_video_frames(
    video_path: str,
    output_dir: str,
    frame_rate: float = 1,
    max_size: int = None,
    extension: str = 'jpg',
) -> list[str]:
    """Extract frames from a video file at a specified frame rate and maximum size.

    Args:
        video_path (str): Path to the video file.
        output_dir (str): Directory to save the extracted frames.
        frame_rate (float, optional): Frame rate for extraction. Defaults to 1 frame per second.
        max_size (int, optional): Maximum width or height for output images. Aspect ratio is preserved.
        extension (str, optional): File extension for the extracted frames. Defaults to 'jpg'.

    Returns:
        list[str]: A sorted list of paths to the extracted frame images.
    """
    os.makedirs(output_dir, exist_ok=True)

    vf_filters = [f'fps={frame_rate}']
    if max_size:
        vf_filters.append(
            f'scale=\'if(gt(iw,ih),min(iw,{max_size}),-1)\':\'if(gt(ih,iw),min(ih,{max_size}),-1)\''
        )

    vf_option = ','.join(vf_filters)

    subprocess.run(
        [
            settings.FFMPEG_PATH,
            '-i', video_path,
            '-vf', vf_option,
            '-y',
            f'{output_dir or "."}/%d.{extension}'
        ],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )
    # Get all extracted frames
    results = sorted(
        glob.glob(f'{output_dir or "."}/*.{extension}'),
        key=lambda x: int(os.path.splitext(os.path.basename(x))[0])
    )
    if not results:
        raise FileNotFoundError(f'No frames found in "{output_dir}" for video "{video_path}"')

    return results


def extract_audio(video_path: str, output_dir: str = None, extension: str = 'm4a') -> str:
    """Extract audio from a video file and save it as an M4A file.

    Args:
        video_path (str): Path to the video file.
        output_dir (str, optional): Directory to save the extracted audio. Defaults to the same directory as the video.
        extension (str, optional): File extension for the extracted audio. Defaults to 'm4a'.
    Returns:
        str: Path to the extracted audio file.
    """
    if output_dir is None:
        output_dir = os.path.dirname(video_path)

    audio_path = os.path.join(output_dir, f'{os.path.splitext(os.path.basename(video_path))[0]}.{extension}')

    subprocess.run(
        [
            settings.FFMPEG_PATH,
            '-i', video_path,
            '-q:a', '0',
            '-map', 'a',
            '-y',
            audio_path
        ],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )

    if not os.path.exists(audio_path):
        raise FileNotFoundError(f'Audio extraction failed: "{audio_path}" does not exist.')

    return audio_path


def split_media_file(file_path: str, output_dir: str, segment_length: int = 60) -> list[str]:
    """Split a media file into segments of specified length in seconds.

    Args:
        file_path (str): Path to the media file to be split.
        output_dir (str): Directory to save the split segments.
        segment_length (int, optional): Length of each segment in seconds. Defaults to 60 seconds.

    Returns:
        list[str]: A sorted list of paths to the split media segments.
    """
    os.makedirs(output_dir, exist_ok=True)

    base_name = os.path.splitext(os.path.basename(file_path))[0]
    extension = os.path.splitext(file_path)[1]
    segment_pattern = os.path.join(output_dir, f'{base_name}_%03d.{extension}')

    subprocess.run(
        [
            settings.FFMPEG_PATH,
            '-i', file_path,
            '-c', 'copy',
            '-map', '0',
            '-segment_time', str(segment_length),
            '-f', 'segment',
            '-y',
            segment_pattern
        ],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )

    return sorted(glob.glob(f'{output_dir}/*{base_name}_*.{extension}'))


def get_media_duration(file_path: str) -> float:
    """Get the duration of a media file in seconds."""
    # use pymediainfo to get the duration
    media_info = MediaInfo.parse(file_path)
    for track in media_info.tracks:
        if track.track_type == 'General':
            return track.duration / 1000.0
    raise ValueError(f'Could not determine duration for file: {file_path}')


def span_iou(span1: tuple[float, float], span2: tuple[float, float]) -> float:
    """Calculate the Intersection over Union (IoU) of two spans."""
    start1, end1 = span1
    start2, end2 = span2

    intersection_start = max(start1, start2)
    intersection_end = min(end1, end2)

    if intersection_start >= intersection_end:
        return 0.0  # No overlap

    intersection_length = intersection_end - intersection_start
    union_length = (end1 - start1) + (end2 - start2) - intersection_length

    return intersection_length / union_length if union_length > 0 else 0.0


def seconds_to_hms(total_seconds: int, drop_hours: bool = False) -> str:
    """Convert a number of seconds to a string formatted as HH:MM:SS."""
    # Ensure we’re working with non-negative integers
    if total_seconds < 0:
        raise ValueError('total_seconds must be non-negative')

    hours, remainder = divmod(total_seconds, 3600)
    minutes, seconds = divmod(remainder, 60)

    if drop_hours and hours == 0:
        return f'{minutes:02d}:{seconds:02d}'

    return f'{hours:02d}:{minutes:02d}:{seconds:02d}'


def hms_to_seconds(hms: str) -> int:
    """Convert a string formatted as HH:MM:SS to total seconds."""
    parts = hms.split(':')
    if len(parts) == 2:  # MM:SS format
        minutes, seconds = map(int, parts)
        return minutes * 60 + seconds
    elif len(parts) == 3:  # HH:MM:SS format
        hours, minutes, seconds = map(int, parts)
        return hours * 3600 + minutes * 60 + seconds
    else:
        raise ValueError('Invalid time format. Use HH:MM:SS or MM:SS.')