File size: 1,461 Bytes
5ce022b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from pathlib import Path
from typing import Tuple, Union
import io
import numpy as np
from PIL import Image
import ffmpeg 
from pydub import AudioSegment

#  helpers 
def _to_path(p: Union[str, dict, Path]) -> str:
    if isinstance(p, dict):
        return p.get("name") or p.get("path") or p.get("data") or ""
    return str(p)

def _audiosegment_float32(seg: AudioSegment) -> np.ndarray:
    seg = seg.set_frame_rate(16000).set_channels(1).set_sample_width(2)  # 16-bit
    samples = np.array(seg.get_array_of_samples(), dtype=np.int16)
    return (samples.astype(np.float32) / 32768.0)

#  public API
def video_to_frame_audio(video_in) -> Tuple[Image.Image, np.ndarray]:
    video_path = _to_path(video_in)
    if not video_path:
        raise ValueError("Empty video path")

    try:
        out, _ = (
            ffmpeg
            .input(video_path)
            .output('pipe:', vframes=1, format='image2', vcodec='mjpeg')
            .run(capture_stdout=True, capture_stderr=True)
        )
        frame = Image.open(io.BytesIO(out)).convert("RGB")
    except ffmpeg.Error as e:
        raise RuntimeError(f"ffmpeg frame extract failed: {e.stderr.decode()[:2000]}")

    
    seg = AudioSegment.from_file(video_path)
    audio16k = _audiosegment_float32(seg)
    return frame, audio16k

def load_audio_16k(audio_path_like) -> np.ndarray:
    path = _to_path(audio_path_like)
    seg = AudioSegment.from_file(path)
    return _audiosegment_float32(seg)