# Download video and audio from YouTube

In [None]:
import yt_dlp
import os
import uuid
import json
from pathlib import Path
from typing import Dict, Any
from datetime import datetime


def download_youtube_media(url: str,
 base_dir: str = "./downloads",
 video_quality: int = 720) -> Dict[str, str]:
 """
 Downloads video and audio from YouTube, saving them to a unique GUID folder.
 Metadata is saved in JSON format including download datetime and timezone.
 
 Args:
 url (str): YouTube video URL
 base_dir (str): Base download directory (default './downloads')
 video_quality (int): preferred quality of the downloaded video, acceptable values 144, 240, 360, 480, 720, 1080, 1440, 2160.
 
 Returns:
 dict: Dictionary with file paths and information:
 {
 'data_path': str, # Path to download directory
 'video_path': str, # Full path to video.mp4
 'audio_path': str, # Full path to audio.mp3
 'metadata_path': str # Full path to metadata.json
 }
 
 Raises:
 RuntimeError: If download fails
 """
 
 youtube_quality = [144, 240, 360, 480, 720, 1080, 1440, 2160]


 if video_quality not in youtube_quality:
 raise ValueError(
 f"Invalid video quality: '{video_quality}'. "
 f"Allowed qualities are: {', '.join(map(str, youtube_quality))}"
 )

 try:
 # Generate GUID and create folder
 guid = str(uuid.uuid4())
 download_dir = Path(base_dir) / guid
 os.makedirs(download_dir, exist_ok=True)
 
 # File paths
 video_path = download_dir / "video.mp4"
 audio_path = download_dir / "audio.mp3"
 metadata_path = download_dir / "metadata.json"

 # Record exact download start time
 download_datetime = datetime.now()
 current_timezone = download_datetime.astimezone().tzinfo
 
 # 1. Download video (MP4)
 video_opts = {
 'format': (
 f"bestvideo[height={video_quality}][ext=mp4]"
 f"/worstvideo[height>{video_quality}][ext=mp4]"
 f"/bestvideo[height<={video_quality}][ext=mp4]"
 ),
 'outtmpl': str(video_path),
 'quiet': True,
 'no_warnings': True,
 'restrict_filenames': True,
 }
 
 with yt_dlp.YoutubeDL(video_opts) as ydl:
 video_info = ydl.extract_info(url, download=True)
 
 # 2. Download audio (MP3)
 audio_opts = {
 'format': 'bestaudio/best',
 'outtmpl': str(audio_path),
 'quiet': True,
 'postprocessors': [{
 'key': 'FFmpegExtractAudio',
 'preferredcodec': 'mp3',
 'preferredquality': '128',
 }],
 }
 
 with yt_dlp.YoutubeDL(audio_opts) as ydl:
 audio_info = ydl.extract_info(url, download=True)
 
 # Format date and time for storage
 formatted_date = download_datetime.strftime('%Y-%m-%d')
 formatted_time = download_datetime.strftime('%H:%M:%S')
 
 # 3. Save metadata to JSON
 metadata = {
 'original_url': url,
 'guid': guid,
 'download_info': {
 'date': formatted_date,
 'time': formatted_time,
 'timezone': str(current_timezone),
 'datetime_iso': download_datetime.isoformat(),
 },
 'video': {
 'path': str(video_path),
 'title': video_info.get('title'),
 'duration': video_info.get('duration'),
 'resolution': video_info.get('resolution'),
 'upload_date': video_info.get('upload_date'),
 },
 'audio': {
 'path': str(audio_path),
 'bitrate': audio_info.get('abr'),
 'codec': 'mp3',
 },
 }
 
 with open(metadata_path, 'w', encoding='utf-8') as f:
 json.dump(metadata, f, indent=2, ensure_ascii=False)
 
 return {
 'data_path': str(download_dir.absolute()),
 'video': str(video_path.absolute()),
 'audio': str(audio_path.absolute()) + ".mp3",
 'metadata': str(metadata_path),
 }
 
 except Exception as e:
 raise RuntimeError(f"Media download error: {str(e)}")

if __name__ == "__main__":
 video_url = "https://www.youtube.com/watch?v=FK3dav4bA4s"
 downloaded_video = download_youtube_media(video_url, "./temp")
 print(downloaded_video)

In [None]:
downloaded_video

In [None]:
import copy
test = copy.deepcopy(downloaded_video)

print(test)

# Split video to frames in jpg

In [None]:
import os
from pathlib import Path
from typing import Dict
import av

def extract_frames_with_timestamps(
 video_path: str,
 output_dir: str,
 time_step: float = 1.0,
 quality: int = 95,
 frame_prefix: str = "frame",
 use_hw_accel: bool = True,
 hw_device: str = "cuda"
) -> Dict[str, str]:
 """
 Extracts frames from video with NVIDIA hardware acceleration (NVDEC/CUDA).
 
 Args:
 video_path: Path to the video file
 output_dir: Directory to save frames
 time_step: Interval between frames (seconds)
 quality: JPEG quality (1-100)
 frame_prefix: Prefix for saved frames
 use_hw_accel: Enable NVIDIA hardware decoding
 hw_device: GPU device (e.g., 'cuda:0')

 Returns:
 Dict of {timestamp: frame_path}
 """
 result = {}
 try:
 video_path = Path(video_path).absolute()
 output_dir = Path(output_dir).absolute()
 
 if not video_path.exists():
 raise ValueError(f"Video file not found: {video_path}")

 frames_dir = output_dir / "frames"
 frames_dir.mkdir(parents=True, exist_ok=True)

 # Configure hardware acceleration
 options = {}
 if use_hw_accel:
 options.update({
 'hwaccel': 'cuda',
 'hwaccel_device': hw_device,
 'hwaccel_output_format': 'cuda' # Keep frames in GPU memory
 })

 # Open video with hardware acceleration
 container = av.open(str(video_path), options=options)
 video_stream = next(s for s in container.streams if s.type == 'video')

 fps = float(video_stream.average_rate)
 if fps <= 0:
 raise RuntimeError("Invalid frame rate")

 frame_interval = max(1, int(round(fps * time_step)))
 frame_count = 0

 for frame in container.decode(video_stream):
 if frame_count % frame_interval == 0:
 current_time = float(frame.pts * video_stream.time_base)
 hh = int(current_time // 3600)
 mm = int((current_time % 3600) // 60)
 ss = current_time % 60
 
 timestamp = f"{hh:02d}:{mm:02d}:{ss:06.3f}"
 safe_timestamp = timestamp.replace(':', '_').replace('.', '_')
 frame_path = frames_dir / f"{frame_prefix}_{safe_timestamp}.jpg"

 # Convert GPU frame to CPU if needed
 if hasattr(frame, 'to_ndarray'): # CUDA frame
 img = frame.to_ndarray(format='rgb24')
 img = av.VideoFrame.from_ndarray(img, format='rgb24')
 else:
 img = frame

 img.to_image().save(str(frame_path), quality=quality)
 result[timestamp] = str(frame_path)

 frame_count += 1

 return result

 except Exception as e:
 for path in result.values():
 try: os.remove(path)
 except: pass
 raise RuntimeError(f"Frame extraction failed: {str(e)}")

if __name__ == "__main__":
 frames = extract_frames_with_timestamps(downloaded_video['video'], downloaded_video['data_path'], time_step=2)
 print(frames)


# Video Analyzer

In [None]:
# pip install autoawq --upgrade

In [None]:
from transformers import Qwen2_5_VLForConditionalGeneration, AutoTokenizer, AutoProcessor
from qwen_vl_utils import process_vision_info
import torch

# default: Load the model on the available device(s)
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
 "Qwen/Qwen2.5-VL-7B-Instruct-AWQ",
 torch_dtype=torch.float16,
 device_map="auto",
)

# default processer
processor = AutoProcessor.from_pretrained("Qwen/Qwen2.5-VL-7B-Instruct-AWQ")

messages = [
 {
 "role": "user",
 "content": [
 {"type": "image", "image": "file:///workspaces/Video_Analyser/temp/fcaaa3e8-d99d-47c5-b464-617e4c9a1b1a/frames/frame_00_02_51_171.jpg"},
 {"type": "text", "text": "Describe this image in"},
 ],
 }
]

# Preparation for inference
text = processor.apply_chat_template(
 messages, tokenize=False, add_generation_prompt=True
)

image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
 text=[text],
 images=image_inputs,
 videos=video_inputs,
 padding=True,
 return_tensors="pt",
)
inputs = inputs.to("cuda")

# Inference: Generation of the output
generated_ids = model.generate(**inputs, max_new_tokens=128)
generated_ids_trimmed = [
 out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
]
output_text = processor.batch_decode(
 generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)
print(output_text)


# Audio content

In [None]:
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
from datasets import load_dataset


device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32

model_id = "openai/whisper-large-v3-turbo"

model = AutoModelForSpeechSeq2Seq.from_pretrained(
 model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
).to(device)

pipe = pipeline(
 "automatic-speech-recognition",
 model=model,
 torch_dtype=torch_dtype,
 device=device,
 return_timestamps=True
)


result = pipe("/workspaces/Video_Analyser/app_srv/temp/a6fba6eb-038e-4f4e-bcb7-f41d87ee1422/audio.mp3.mp3")

result

In [None]:
! pip install librosa

In [None]:
! pip install -U openai-whisper

In [None]:
import torch
from transformers import pipeline
from typing import Dict, Union

def transcribe_with_timestamps_optimized(
 audio_path: str,
 model_name: str = "openai/whisper-small",
 language: str = "en",
 chunk_length_s: int = 5,
 stride_length_s: Union[int, tuple] = (2, 2)
) -> Dict[float, str]:
 device = "cuda:0" if torch.cuda.is_available() else "cpu"
 print(f"Используемое устройство: {device}")

 try:
 transcriber = pipeline(
 "automatic-speech-recognition",
 model=model_name,
 chunk_length_s=chunk_length_s,
 stride_length_s=stride_length_s,
 device=device,
 )
 except Exception as e:
 print(f"Ошибка при инициализации pipeline: {e}")
 print("Убедитесь, что модель установлена или доступна на Hugging Face Hub.")
 raise

 print(f"Начало транскрипции файла: {audio_path}")
 try:
 result = transcriber(
 audio_path,
 return_timestamps="True",
 generate_kwargs={"language": language} if language else {}
 )
 except Exception as e:
 print(f"Ошибка при транскрипции аудиофайла: {e}")
 return {}

 transcribed_segments = {}
 if "chunks" in result and result["chunks"]:
 for chunk in result["chunks"]:
 start_time = chunk["timestamp"][0] if chunk["timestamp"][0] is not None else 0.0
 text = chunk["text"].strip()
 transcribed_segments[float(start_time)] = text
 else:
 if "text" in result:
 transcribed_segments[0.0] = result["text"].strip()
 print("Предупреждение: получена только общая транскрипция, без посегментных временных меток.")
 print("Убедитесь, что 'return_timestamps=\"True\"' или 'return_timestamps=\"word\"' используется.")
 else:
 print("Не удалось получить транскрипцию или временные метки.")

 print("Транскрипция завершена.")
 return transcribed_segments


# Пример использования
if __name__ == "__main__":
 
 result = transcribe_with_timestamps_optimized(
 audio_path="/workspaces/Video_Analyser/app_srv/temp/a6fba6eb-038e-4f4e-bcb7-f41d87ee1422/audio.mp3.mp3",
 )

 print(result)

In [None]:
from transformers import pipeline
import librosa

def transcribe_with_pipeline(audio_path):
 pipe = pipeline(
 "automatic-speech-recognition",
 model="openai/whisper-small",
 chunk_length_s=30, # разбивает на чанки по 30 секунд
 stride_length_s=2, # перекрытие между чанками
 )
 
 result = pipe(audio_path, return_timestamps=True)
 return result['text']

result = transcribe_with_pipeline("/workspaces/Video_Analyser/app_srv/temp/a6fba6eb-038e-4f4e-bcb7-f41d87ee1422/audio.mp3.mp3")

In [None]:
result