{ "cells": [ { "cell_type": "markdown", "id": "7f265e58", "metadata": {}, "source": [ "# Download video and audio from YouTube" ] }, { "cell_type": "code", "execution_count": null, "id": "69ee0ec3", "metadata": {}, "outputs": [], "source": [ "import yt_dlp\n", "import os\n", "import uuid\n", "import json\n", "from pathlib import Path\n", "from typing import Dict, Any\n", "from datetime import datetime\n", "\n", "\n", "def download_youtube_media(url: str,\n", " base_dir: str = \"./downloads\",\n", " video_quality: int = 720) -> Dict[str, str]:\n", " \"\"\"\n", " Downloads video and audio from YouTube, saving them to a unique GUID folder.\n", " Metadata is saved in JSON format including download datetime and timezone.\n", " \n", " Args:\n", " url (str): YouTube video URL\n", " base_dir (str): Base download directory (default './downloads')\n", " video_quality (int): preferred quality of the downloaded video, acceptable values 144, 240, 360, 480, 720, 1080, 1440, 2160.\n", " \n", " Returns:\n", " dict: Dictionary with file paths and information:\n", " {\n", " 'data_path': str, # Path to download directory\n", " 'video_path': str, # Full path to video.mp4\n", " 'audio_path': str, # Full path to audio.mp3\n", " 'metadata_path': str # Full path to metadata.json\n", " }\n", " \n", " Raises:\n", " RuntimeError: If download fails\n", " \"\"\"\n", " \n", " youtube_quality = [144, 240, 360, 480, 720, 1080, 1440, 2160]\n", "\n", "\n", " if video_quality not in youtube_quality:\n", " raise ValueError(\n", " f\"Invalid video quality: '{video_quality}'. \"\n", " f\"Allowed qualities are: {', '.join(map(str, youtube_quality))}\"\n", " )\n", "\n", " try:\n", " # Generate GUID and create folder\n", " guid = str(uuid.uuid4())\n", " download_dir = Path(base_dir) / guid\n", " os.makedirs(download_dir, exist_ok=True)\n", " \n", " # File paths\n", " video_path = download_dir / \"video.mp4\"\n", " audio_path = download_dir / \"audio.mp3\"\n", " metadata_path = download_dir / \"metadata.json\"\n", "\n", " # Record exact download start time\n", " download_datetime = datetime.now()\n", " current_timezone = download_datetime.astimezone().tzinfo\n", " \n", " # 1. Download video (MP4)\n", " video_opts = {\n", " 'format': (\n", " f\"bestvideo[height={video_quality}][ext=mp4]\"\n", " f\"/worstvideo[height>{video_quality}][ext=mp4]\"\n", " f\"/bestvideo[height<={video_quality}][ext=mp4]\"\n", " ),\n", " 'outtmpl': str(video_path),\n", " 'quiet': True,\n", " 'no_warnings': True,\n", " 'restrict_filenames': True,\n", " }\n", " \n", " with yt_dlp.YoutubeDL(video_opts) as ydl:\n", " video_info = ydl.extract_info(url, download=True)\n", " \n", " # 2. Download audio (MP3)\n", " audio_opts = {\n", " 'format': 'bestaudio/best',\n", " 'outtmpl': str(audio_path),\n", " 'quiet': True,\n", " 'postprocessors': [{\n", " 'key': 'FFmpegExtractAudio',\n", " 'preferredcodec': 'mp3',\n", " 'preferredquality': '128',\n", " }],\n", " }\n", " \n", " with yt_dlp.YoutubeDL(audio_opts) as ydl:\n", " audio_info = ydl.extract_info(url, download=True)\n", " \n", " # Format date and time for storage\n", " formatted_date = download_datetime.strftime('%Y-%m-%d')\n", " formatted_time = download_datetime.strftime('%H:%M:%S')\n", " \n", " # 3. Save metadata to JSON\n", " metadata = {\n", " 'original_url': url,\n", " 'guid': guid,\n", " 'download_info': {\n", " 'date': formatted_date,\n", " 'time': formatted_time,\n", " 'timezone': str(current_timezone),\n", " 'datetime_iso': download_datetime.isoformat(),\n", " },\n", " 'video': {\n", " 'path': str(video_path),\n", " 'title': video_info.get('title'),\n", " 'duration': video_info.get('duration'),\n", " 'resolution': video_info.get('resolution'),\n", " 'upload_date': video_info.get('upload_date'),\n", " },\n", " 'audio': {\n", " 'path': str(audio_path),\n", " 'bitrate': audio_info.get('abr'),\n", " 'codec': 'mp3',\n", " },\n", " }\n", " \n", " with open(metadata_path, 'w', encoding='utf-8') as f:\n", " json.dump(metadata, f, indent=2, ensure_ascii=False)\n", " \n", " return {\n", " 'data_path': str(download_dir.absolute()),\n", " 'video': str(video_path.absolute()),\n", " 'audio': str(audio_path.absolute()) + \".mp3\",\n", " 'metadata': str(metadata_path),\n", " }\n", " \n", " except Exception as e:\n", " raise RuntimeError(f\"Media download error: {str(e)}\")\n", "\n", "if __name__ == \"__main__\":\n", " video_url = \"https://www.youtube.com/watch?v=FK3dav4bA4s\"\n", " downloaded_video = download_youtube_media(video_url, \"./temp\")\n", " print(downloaded_video)" ] }, { "cell_type": "code", "execution_count": null, "id": "e79c5071", "metadata": {}, "outputs": [], "source": [ "downloaded_video" ] }, { "cell_type": "code", "execution_count": null, "id": "745320a1", "metadata": {}, "outputs": [], "source": [ "import copy\n", "test = copy.deepcopy(downloaded_video)\n", "\n", "print(test)" ] }, { "cell_type": "markdown", "id": "f62e8b83", "metadata": {}, "source": [ "# Split video to frames in jpg" ] }, { "cell_type": "code", "execution_count": null, "id": "5461045d", "metadata": {}, "outputs": [], "source": [ "import os\n", "from pathlib import Path\n", "from typing import Dict\n", "import av\n", "\n", "def extract_frames_with_timestamps(\n", " video_path: str,\n", " output_dir: str,\n", " time_step: float = 1.0,\n", " quality: int = 95,\n", " frame_prefix: str = \"frame\",\n", " use_hw_accel: bool = True,\n", " hw_device: str = \"cuda\"\n", ") -> Dict[str, str]:\n", " \"\"\"\n", " Extracts frames from video with NVIDIA hardware acceleration (NVDEC/CUDA).\n", " \n", " Args:\n", " video_path: Path to the video file\n", " output_dir: Directory to save frames\n", " time_step: Interval between frames (seconds)\n", " quality: JPEG quality (1-100)\n", " frame_prefix: Prefix for saved frames\n", " use_hw_accel: Enable NVIDIA hardware decoding\n", " hw_device: GPU device (e.g., 'cuda:0')\n", "\n", " Returns:\n", " Dict of {timestamp: frame_path}\n", " \"\"\"\n", " result = {}\n", " try:\n", " video_path = Path(video_path).absolute()\n", " output_dir = Path(output_dir).absolute()\n", " \n", " if not video_path.exists():\n", " raise ValueError(f\"Video file not found: {video_path}\")\n", "\n", " frames_dir = output_dir / \"frames\"\n", " frames_dir.mkdir(parents=True, exist_ok=True)\n", "\n", " # Configure hardware acceleration\n", " options = {}\n", " if use_hw_accel:\n", " options.update({\n", " 'hwaccel': 'cuda',\n", " 'hwaccel_device': hw_device,\n", " 'hwaccel_output_format': 'cuda' # Keep frames in GPU memory\n", " })\n", "\n", " # Open video with hardware acceleration\n", " container = av.open(str(video_path), options=options)\n", " video_stream = next(s for s in container.streams if s.type == 'video')\n", "\n", " fps = float(video_stream.average_rate)\n", " if fps <= 0:\n", " raise RuntimeError(\"Invalid frame rate\")\n", "\n", " frame_interval = max(1, int(round(fps * time_step)))\n", " frame_count = 0\n", "\n", " for frame in container.decode(video_stream):\n", " if frame_count % frame_interval == 0:\n", " current_time = float(frame.pts * video_stream.time_base)\n", " hh = int(current_time // 3600)\n", " mm = int((current_time % 3600) // 60)\n", " ss = current_time % 60\n", " \n", " timestamp = f\"{hh:02d}:{mm:02d}:{ss:06.3f}\"\n", " safe_timestamp = timestamp.replace(':', '_').replace('.', '_')\n", " frame_path = frames_dir / f\"{frame_prefix}_{safe_timestamp}.jpg\"\n", "\n", " # Convert GPU frame to CPU if needed\n", " if hasattr(frame, 'to_ndarray'): # CUDA frame\n", " img = frame.to_ndarray(format='rgb24')\n", " img = av.VideoFrame.from_ndarray(img, format='rgb24')\n", " else:\n", " img = frame\n", "\n", " img.to_image().save(str(frame_path), quality=quality)\n", " result[timestamp] = str(frame_path)\n", "\n", " frame_count += 1\n", "\n", " return result\n", "\n", " except Exception as e:\n", " for path in result.values():\n", " try: os.remove(path)\n", " except: pass\n", " raise RuntimeError(f\"Frame extraction failed: {str(e)}\")\n", "\n", "if __name__ == \"__main__\":\n", " frames = extract_frames_with_timestamps(downloaded_video['video'], downloaded_video['data_path'], time_step=2)\n", " print(frames)\n" ] }, { "cell_type": "markdown", "id": "ba7b44d6", "metadata": {}, "source": [ "# Video Analyzer" ] }, { "cell_type": "code", "execution_count": null, "id": "418ae84e", "metadata": {}, "outputs": [], "source": [ "# pip install autoawq --upgrade" ] }, { "cell_type": "code", "execution_count": null, "id": "fe840ffa", "metadata": {}, "outputs": [], "source": [ "from transformers import Qwen2_5_VLForConditionalGeneration, AutoTokenizer, AutoProcessor\n", "from qwen_vl_utils import process_vision_info\n", "import torch\n", "\n", "# default: Load the model on the available device(s)\n", "model = Qwen2_5_VLForConditionalGeneration.from_pretrained(\n", " \"Qwen/Qwen2.5-VL-7B-Instruct-AWQ\",\n", " torch_dtype=torch.float16,\n", " device_map=\"auto\",\n", ")\n", "\n", "# default processer\n", "processor = AutoProcessor.from_pretrained(\"Qwen/Qwen2.5-VL-7B-Instruct-AWQ\")\n", "\n", "messages = [\n", " {\n", " \"role\": \"user\",\n", " \"content\": [\n", " {\"type\": \"image\", \"image\": \"file:///workspaces/Video_Analyser/temp/fcaaa3e8-d99d-47c5-b464-617e4c9a1b1a/frames/frame_00_02_51_171.jpg\"},\n", " {\"type\": \"text\", \"text\": \"Describe this image in\"},\n", " ],\n", " }\n", "]\n", "\n", "# Preparation for inference\n", "text = processor.apply_chat_template(\n", " messages, tokenize=False, add_generation_prompt=True\n", ")\n", "\n", "image_inputs, video_inputs = process_vision_info(messages)\n", "inputs = processor(\n", " text=[text],\n", " images=image_inputs,\n", " videos=video_inputs,\n", " padding=True,\n", " return_tensors=\"pt\",\n", ")\n", "inputs = inputs.to(\"cuda\")\n", "\n", "# Inference: Generation of the output\n", "generated_ids = model.generate(**inputs, max_new_tokens=128)\n", "generated_ids_trimmed = [\n", " out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)\n", "]\n", "output_text = processor.batch_decode(\n", " generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False\n", ")\n", "print(output_text)\n" ] }, { "cell_type": "markdown", "id": "3ba7ef97", "metadata": {}, "source": [ "# Audio content" ] }, { "cell_type": "code", "execution_count": null, "id": "bf798dcf", "metadata": {}, "outputs": [], "source": [ "import torch\n", "from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline\n", "from datasets import load_dataset\n", "\n", "\n", "device = \"cuda:0\" if torch.cuda.is_available() else \"cpu\"\n", "torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32\n", "\n", "model_id = \"openai/whisper-large-v3-turbo\"\n", "\n", "model = AutoModelForSpeechSeq2Seq.from_pretrained(\n", " model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True\n", ").to(device)\n", "\n", "pipe = pipeline(\n", " \"automatic-speech-recognition\",\n", " model=model,\n", " torch_dtype=torch_dtype,\n", " device=device,\n", " return_timestamps=True\n", ")\n", "\n", "\n", "result = pipe(\"/workspaces/Video_Analyser/app_srv/temp/a6fba6eb-038e-4f4e-bcb7-f41d87ee1422/audio.mp3.mp3\")\n", "\n", "result" ] }, { "cell_type": "code", "execution_count": null, "id": "980e9742", "metadata": {}, "outputs": [], "source": [ "! pip install librosa" ] }, { "cell_type": "code", "execution_count": null, "id": "a66eabd3", "metadata": {}, "outputs": [], "source": [ "! pip install -U openai-whisper" ] }, { "cell_type": "code", "execution_count": null, "id": "e6d0e5fd", "metadata": {}, "outputs": [], "source": [ "import torch\n", "from transformers import pipeline\n", "from typing import Dict, Union\n", "\n", "def transcribe_with_timestamps_optimized(\n", " audio_path: str,\n", " model_name: str = \"openai/whisper-small\",\n", " language: str = \"en\",\n", " chunk_length_s: int = 5,\n", " stride_length_s: Union[int, tuple] = (2, 2)\n", ") -> Dict[float, str]:\n", " device = \"cuda:0\" if torch.cuda.is_available() else \"cpu\"\n", " print(f\"Используемое устройство: {device}\")\n", "\n", " try:\n", " transcriber = pipeline(\n", " \"automatic-speech-recognition\",\n", " model=model_name,\n", " chunk_length_s=chunk_length_s,\n", " stride_length_s=stride_length_s,\n", " device=device,\n", " )\n", " except Exception as e:\n", " print(f\"Ошибка при инициализации pipeline: {e}\")\n", " print(\"Убедитесь, что модель установлена или доступна на Hugging Face Hub.\")\n", " raise\n", "\n", " print(f\"Начало транскрипции файла: {audio_path}\")\n", " try:\n", " result = transcriber(\n", " audio_path,\n", " return_timestamps=\"True\",\n", " generate_kwargs={\"language\": language} if language else {}\n", " )\n", " except Exception as e:\n", " print(f\"Ошибка при транскрипции аудиофайла: {e}\")\n", " return {}\n", "\n", " transcribed_segments = {}\n", " if \"chunks\" in result and result[\"chunks\"]:\n", " for chunk in result[\"chunks\"]:\n", " start_time = chunk[\"timestamp\"][0] if chunk[\"timestamp\"][0] is not None else 0.0\n", " text = chunk[\"text\"].strip()\n", " transcribed_segments[float(start_time)] = text\n", " else:\n", " if \"text\" in result:\n", " transcribed_segments[0.0] = result[\"text\"].strip()\n", " print(\"Предупреждение: получена только общая транскрипция, без посегментных временных меток.\")\n", " print(\"Убедитесь, что 'return_timestamps=\\\"True\\\"' или 'return_timestamps=\\\"word\\\"' используется.\")\n", " else:\n", " print(\"Не удалось получить транскрипцию или временные метки.\")\n", "\n", " print(\"Транскрипция завершена.\")\n", " return transcribed_segments\n", "\n", "\n", "# Пример использования\n", "if __name__ == \"__main__\":\n", " \n", " result = transcribe_with_timestamps_optimized(\n", " audio_path=\"/workspaces/Video_Analyser/app_srv/temp/a6fba6eb-038e-4f4e-bcb7-f41d87ee1422/audio.mp3.mp3\",\n", " )\n", "\n", " print(result)" ] }, { "cell_type": "code", "execution_count": null, "id": "ca9a4832", "metadata": {}, "outputs": [], "source": [ "from transformers import pipeline\n", "import librosa\n", "\n", "def transcribe_with_pipeline(audio_path):\n", " pipe = pipeline(\n", " \"automatic-speech-recognition\",\n", " model=\"openai/whisper-small\",\n", " chunk_length_s=30, # разбивает на чанки по 30 секунд\n", " stride_length_s=2, # перекрытие между чанками\n", " )\n", " \n", " result = pipe(audio_path, return_timestamps=True)\n", " return result['text']\n", "\n", "result = transcribe_with_pipeline(\"/workspaces/Video_Analyser/app_srv/temp/a6fba6eb-038e-4f4e-bcb7-f41d87ee1422/audio.mp3.mp3\")" ] }, { "cell_type": "code", "execution_count": null, "id": "7cd4e28e", "metadata": {}, "outputs": [], "source": [ "result" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 5 }