Spaces:
Running
Running
import gradio as gr | |
import torch | |
import os | |
import shutil | |
import json | |
import base64 | |
import tempfile | |
import time # To simulate delays and show progress more smoothly | |
from pathlib import Path | |
from downloader import download_youtube_video | |
from video_processing import extract_frames_with_timestamps, generate_frame_descriptions | |
from audio_processing import transcribe_audio | |
from model_api import get_device_and_dtype | |
device, dtype = get_device_and_dtype() | |
gui_header_element = """# π Aura AI Scan: Deep Analysis of YouTube Videos | |
([alpha version](https://en.wikipedia.org/wiki/Software_release_life_cycle)) | |
Aura AI Scan is an MCP tool designed to deeply analyse YouTube videos, providing a frame-by-frame description with timestamps and full transcription of the audio track. | |
The following technologies were used in the implementation: | |
- [VSCode devcontainer](https://code.visualstudio.com/docs/devcontainers/containers), to simplify the development process | |
- [Docker](https://huggingface.co/docs/hub/spaces-sdks-docker), to simplify local deployment. This space is also implemented as [HuggingFace Docker Space](https://huggingface.co/docs/hub/spaces-sdks-docker). | |
- You can view the final results on the server's complete GUI in three formats: visual, audio, and JSON. | |
- Flexible settings that can be changed (prompt used during analysis, quality of the analysed video, time interval between frames). | |
- ability to include audio and frames used in the analysis process in the resulting JSON. | |
[Quick video tutorial]https://drive.google.com/file/d/1LiZ9v5KsT3C_pJ8xLYmYkZDiM54vftjm/view?usp=drive_link) | |
[Chrome Cookies extraction plugin](https://chromewebstore.google.com/detail/get-cookiestxt-locally/cclelndahbckbenkjhflpdbgdldlbecc) | |
[FireFox Cookies extraction plugin](https://addons.mozilla.org/en-US/firefox/addon/cookies-txt/) | |
""" | |
DEFAULT_PROMPT = """You are an expert at analyzing video, so pay close attention. Your main goal is to analyze the frame and find information in it to answer the MAIN QUESTION. Pay attention to details. | |
Provide the analysis for each frame in the following format, focusing on the frame at timestamp {timestamp}: | |
FRAME: {timestamp} | |
OBJECTS: List of objects with their count, for example: Bengal tiger - 1, Volvo car - 1, Person - 2 (male, female). Mentioning an object in the text on the frame does not count as a separate object. | |
If there are no objects, the field is equal to NONE. | |
BACKGROUND: Description of background and surroundings, e.g.: Muddy brown water. A road in the distance. An abandoned building on the horizon. | |
ACTION: A detailed description of what is happening in the frame, for example: A Bengal tiger swimming in murky water, its head and part of its back visible above the surface. | |
The shot is taken from above, above the tiger. A blue Volvo car is driving along the road in the distance. A part of a tree is visible in the right part of the frame. | |
If there are no actions, the field is equal to NONE. | |
RECOGNIZED TEXT: Any text recognized in the frame, e.g.: "STOP", "VOLVO", "EXIT 25". Only the text that is present in the frame, if it is not present, this field is NONE. | |
OBJECTS: | |
BACKGROUND: | |
ACTION: | |
RECOGNIZED TEXT: | |
""" | |
def analyze_video_data( | |
youtube_url: str, | |
quality: str = '720', | |
time_step: str = '5.0', | |
include_audio_data: str = 'False', | |
include_frame_data: str = 'False' | |
) -> str: | |
""" | |
This tool returns a text description of the frames from a YouTube clip and a full transcription of the audio track of that clip. | |
Analyzing clips can be time-consuming (depending on the specified quality). You should always wait for the process to complete. | |
Args: | |
youtube_url: (str) String containing the URL of the YouTube clip. | |
quality: (str, optional) Desired video quality for uploading (e.g. '144', '240', '360', '480', '720', '1080', '1440', '2160'). | |
This parameter is optional, if it is not set, the default value of '720' will be used. | |
time_step: (str, optional) The interval in seconds at which frames will be extracted | |
from the video. This parameter is optional, if it is not set, the default value '5.0' will be used. | |
include_audio_data: (str, optional) If 'True', the base64 encoded audio data (MP3) will be included in the JSON results. Defaults to 'False'. | |
include_frame_data: (str, optional) If 'True', base64 encoded image data (JPG) for each extracted frame will be included in the JSON results. Defaults to 'False'. | |
Returns: | |
str: A JSON string containing the analysis results. | |
On success, it includes 'status': 'success', 'frame_analysis' (list of dictionaries | |
with 'timestamp', 'description', and optional 'image_base64'), | |
'audio_transcription', and optional 'audio_base64'. | |
On error, it includes 'status': 'error' and a 'message' detailing the error. | |
""" | |
results = { | |
"status": "success", | |
"message": "", | |
"frame_analysis": [], | |
"audio_transcription": "", | |
"audio_base64": "" | |
} | |
try: | |
# DEBUG | |
print("Function analyze_video_data passed arguments: {youtube_url}, {quality}, {time_step}, {include_audio_data}, {include_frame_data}.") | |
print("Arguments types: ", type(youtube_url), type(quality), type(time_step), type(include_audio_data), type(include_frame_data)) | |
# The tool call agent uses strings, so all function arguments must be of string type. | |
quality_int = int(quality) | |
time_step_float = float(time_step) | |
include_frame_data_bool = include_frame_data.lower() == 'true' | |
include_audio_data_bool = include_audio_data.lower() == 'true' | |
# For debugging purpose | |
print(f'Starting pprocessing tast with {youtube_url}, quality: {quality}, time step: {time_step}, include audio: {include_audio_data}, include frames {include_frame_data}.') | |
video_data = download_youtube_video( | |
url=youtube_url, video_quality=quality_int, # youtube_cookies=cookies | |
) | |
frames_dict = extract_frames_with_timestamps( | |
video_path=video_data["video_path"], | |
output_dir=video_data["data_path"], | |
time_step=time_step_float, | |
hw_device="cuda", | |
) | |
descriptions = generate_frame_descriptions( | |
frames_dict=frames_dict, | |
custom_prompt=DEFAULT_PROMPT, | |
device=device, | |
torch_dtype=dtype, | |
) | |
transcription_text = transcribe_audio(video_data["audio_path"]) | |
for timestamp, frame_path in frames_dict.items(): | |
description = descriptions.get(timestamp, "No description available") | |
frame_entry = {"timestamp": timestamp, "description": description, "image_base64": ""} | |
if include_frame_data_bool and os.path.exists(frame_path): | |
with open(frame_path, "rb") as f: | |
frame_entry["image_base64"] = base64.b64encode(f.read()).decode("utf-8") | |
results["frame_analysis"].append(frame_entry) | |
results["audio_transcription"] = transcription_text | |
if include_audio_data_bool and os.path.exists(video_data["audio_path"]): | |
with open(video_data["audio_path"], "rb") as f: | |
results["audio_base64"] = base64.b64encode(f.read()).decode("utf-8") | |
return json.dumps(results, indent=2, ensure_ascii=False) | |
except Exception as e: | |
# DEBUG | |
print("Function analyze_video_data passed arguments: {youtube_url}, {quality}, {time_step}, {include_audio_data}, {include_frame_data}.") | |
print("Arguments types: ", type(youtube_url), type(quality), type(time_step), type(include_audio_data), type(include_frame_data)) | |
error_message = f"Processing error: {str(e)}" | |
results["status"] = "error" | |
results["message"] = error_message | |
results["frame_analysis"] = [] | |
results["audio_transcription"] = "" | |
results["audio_base64"] = "" | |
for frame_entry in results["frame_analysis"]: | |
frame_entry["image_base64"] = "" | |
return json.dumps(results, indent=2, ensure_ascii=False) | |
def get_video_html_from_json(json_string: str) -> str: | |
try: | |
data = json.loads(json_string) | |
if data["status"] == "error": | |
return f"<p style='color:red;'>Error: {data['message']}</p>" | |
html_content = "" | |
if not data["frame_analysis"]: | |
html_content += "<p>No frames analyzed or included.</p>" | |
else: | |
for frame in data["frame_analysis"]: | |
timestamp = frame.get("timestamp", "N/A") | |
description = frame.get("description", "No description available") | |
image_base64 = frame.get("image_base64", "") | |
html_content += f"<div style='margin-bottom: 20px; border: 1px solid #eee; padding: 10px; border-radius: 8px;'>" | |
html_content += f"<h3>FRAME: {timestamp}</h3>" | |
if image_base64: | |
html_content += f"<img src='data:image/jpeg;base64,{image_base64}' style='max-width: 100%; height: auto; border-radius: 4px; margin-bottom: 10px;'><br>" | |
else: | |
html_content += f"<p>Image data not included for this frame (checkbox 'Include Frame Data' was not selected).</p>" | |
html_content += f"<p><strong>Description:</strong> {description}</p>" | |
html_content += "</div>" | |
return html_content | |
except json.JSONDecodeError: | |
return "<p style='color:red;'>Invalid JSON response.</p>" | |
except Exception as e: | |
return f"<p style='color:red;'>Error processing video data for display: {str(e)}</p>" | |
def get_audio_data_from_json(json_string: str) -> tuple[str, str | None]: | |
try: | |
data = json.loads(json_string) | |
if data["status"] == "error": | |
return f"Error: {data['message']}", None | |
transcription = data.get("audio_transcription", "No transcription available.") | |
audio_base64 = data.get("audio_base64", "") | |
if audio_base64: | |
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_audio_file: | |
temp_audio_file.write(base64.b64decode(audio_base64)) | |
temp_audio_path = temp_audio_file.name | |
return transcription, temp_audio_path | |
else: | |
transcription += "\n\nAudio data not included (checkbox 'Include Audio Data' was not selected)." | |
return transcription, None | |
except json.JSONDecodeError: | |
return "Invalid JSON response for audio.", None | |
except Exception as e: | |
return f"Error processing audio data for display: {str(e)}", None | |
# Wrapper function for analysis with progress bar | |
def analyze_video_data_with_progress_wrapper( | |
youtube_url: str, | |
prompt: str, | |
quality: int, | |
time_step: float, | |
include_audio_data: bool, | |
include_frame_data: bool, | |
progress=gr.Progress() | |
): | |
results = { | |
"status": "pending", | |
"message": "Starting analysis...", | |
"frame_analysis": [], | |
"audio_transcription": "", | |
"audio_base64": "" | |
} | |
try: | |
progress(0, desc="Downloading video...") | |
# Step 1: Downloading a YouTube video | |
video_data = download_youtube_video( | |
url=youtube_url, video_quality=quality, # youtube_cookies=cookies | |
) | |
progress(0.25, desc="Extracting frames...") | |
# Step 2: Extract frames from video | |
frames_dict = extract_frames_with_timestamps( | |
video_path=video_data["video_path"], | |
output_dir=video_data["data_path"], | |
time_step=time_step, | |
hw_device="cuda", | |
) | |
progress(0.5, desc="Generating frame descriptions...") | |
# Step 3: Generate frames descriptions | |
descriptions = generate_frame_descriptions( | |
frames_dict=frames_dict, | |
custom_prompt=prompt, | |
device=device, | |
torch_dtype=dtype, | |
) | |
progress(0.75, desc="Transcribing audio...") | |
# Step 4: Transcribe the audio | |
transcription_text = transcribe_audio(video_data["audio_path"]) | |
progress(0.9, desc="Consolidating results...") | |
# Build the final results dictionary | |
for timestamp, frame_path in frames_dict.items(): | |
description = descriptions.get(timestamp, "No description available") | |
frame_entry = {"timestamp": timestamp, "description": description, "image_base64": ""} | |
if include_frame_data and os.path.exists(frame_path): | |
with open(frame_path, "rb") as f: | |
frame_entry["image_base64"] = base64.b64encode(f.read()).decode("utf-8") | |
results["frame_analysis"].append(frame_entry) | |
results["audio_transcription"] = transcription_text | |
if include_audio_data and os.path.exists(video_data["audio_path"]): | |
with open(video_data["audio_path"], "rb") as f: | |
results["audio_base64"] = base64.b64encode(f.read()).decode("utf-8") | |
results["status"] = "success" | |
results["message"] = "Analysis complete!" | |
progress(1.0, desc="Analysis complete!") | |
yield json.dumps(results, indent=2, ensure_ascii=False) | |
except Exception as e: | |
error_message = f"Processing error: {str(e)}" | |
results["status"] = "error" | |
results["message"] = error_message | |
results["frame_analysis"] = [] | |
results["audio_transcription"] = "" | |
results["audio_base64"] = "" | |
progress(1.0, desc="Analysis failed!") | |
yield json.dumps(results, indent=2, ensure_ascii=False) | |
# The path where the cookie.txt file is saved | |
working_cookies_file_path = "/home/mcp_user/app_srv/cookies.txt" | |
# Global variable to store the path to the last temporary Gradio file | |
gradio_temp_cookies_file_path = None | |
def upload_cookies_file(file): | |
""" | |
Copies the uploaded file from Gradio temporary storage to working_cookies_file_path. | |
Saves the path of the temporary file to a global variable for later deletion. | |
""" | |
global gradio_temp_cookies_file_path # Declare global variable to be changed | |
if file is None: | |
return "Please first select a cookie file to upload." | |
source_path = file.name | |
gradio_temp_cookies_file_path = source_path | |
try: | |
shutil.copy(source_path, working_cookies_file_path) | |
return (f"File successfully copied and saved as: {working_cookies_file_path}.\n" | |
f"Path to the Gradio temporary file: {source_path}.") | |
except Exception as e: | |
return f"Error occurred while file copying: {e}" | |
def clear_cookies_files(): | |
""" | |
Deletes working_cookies_file_path and also attempts to delete the last temporary Gradio file, | |
path to which is stored globally. | |
""" | |
global gradio_temp_cookies_file_path # Declare global variable to be changed | |
status_messages = [] # List to collect status messages | |
if os.path.exists(working_cookies_file_path): | |
try: | |
os.remove(working_cookies_file_path) | |
status_messages.append(f"File {working_cookies_file_path} successfully deleted.") | |
except Exception as e: | |
status_messages.append(f"Error while deleting a file {working_cookies_file_path}: {e}.") | |
else: | |
status_messages.append(f"File {working_cookies_file_path} not found.") | |
if gradio_temp_cookies_file_path and os.path.exists(gradio_temp_cookies_file_path): | |
try: | |
os.remove(gradio_temp_cookies_file_path) | |
status_messages.append(f"Temporary Gradio file ({os.path.basename(gradio_temp_cookies_file_path)}) was successfully deleted.") | |
except Exception as e: | |
status_messages.append(f"Error deleting a temporary Gradio file ({os.path.basename(gradio_temp_cookies_file_path)}): {e}.") | |
finally: | |
gradio_temp_cookies_file_path = None | |
elif gradio_temp_cookies_file_path is None: | |
status_messages.append("Path to the Gradio temporary file was unknown.") | |
else: | |
status_messages.append(f"Temporary Gradio file ({os.path.basename(gradio_temp_cookies_file_path)}) no longer exists.") | |
# Merge all status messages | |
return "\n".join(status_messages) | |
with gr.Blocks(title="Video Analysis Tool",) as demo: | |
gr.Markdown(gui_header_element) | |
with gr.Row(): | |
youtube_url = gr.Textbox( | |
label="YouTube Video URL", | |
value="https://www.youtube.com/watch?v=FK3dav4bA4s&t=36s", | |
lines=1, | |
scale=5 | |
) | |
with gr.Row(): | |
prompt = gr.Textbox( | |
label="Analysis Prompt", | |
value=DEFAULT_PROMPT, | |
lines=3, | |
scale=4 | |
) | |
with gr.Column(scale=2, min_width=200): | |
file_input = gr.File(label="Select a cookie file to upload", file_count="single", height=263) | |
output_message = gr.Textbox(label="Status of uploading file with cookies") | |
upload_cookies_file_button = gr.Button("Save file with cookies") | |
with gr.Column(scale=2, min_width=200): | |
quality = gr.Dropdown( | |
label="Video Quality", | |
choices=[144, 240, 360, 480, 720, 1080, 1440, 2160], | |
value=720 | |
) | |
time_step = gr.Slider( | |
label="Frame Interval (seconds)", | |
minimum=0.5, | |
maximum=30, | |
step=0.5, | |
value=30 | |
) | |
include_audio_data = gr.Checkbox( | |
label="Include Audio Data (MP3) in Results", value=False | |
) | |
include_frame_data = gr.Checkbox( | |
label="Include Frame Data (JPG) in Results", value=False | |
) | |
t1 = gr.Textbox(value="Waiting for task...",label="Task Progress", show_label=True, lines=3, interactive=False) | |
# Button to create an MCP server point | |
submit_btn = gr.Button("Start Video Analysis (No Progress Bar, for MCP Server use)", variant="primary", visible=False) | |
# Analyze button with progress bar | |
submit_btn_with_progress = gr.Button("Analyze Video", variant="secondary") | |
with gr.Tabs() as results_tabs: | |
with gr.TabItem("Video"): | |
video_output_html = gr.HTML(label="Video Frames Analysis", elem_id="video-output-html") | |
with gr.TabItem("Audio"): | |
audio_player_output = gr.Audio(label="Play Audio", type="filepath", render=True) | |
audio_transcription_output = gr.Textbox(label="Audio Transcription", lines=10) | |
with gr.TabItem("JSON"): | |
results_json_viewer = gr.JSON( | |
label="Raw Analysis Results (JSON)", | |
elem_classes=["output-box", "results-output"], | |
) | |
raw_json_output = gr.State() | |
# Logic for normal button (without progress bar), this button becomes the MCP server point. | |
submit_btn.click( | |
fn=analyze_video_data, | |
inputs=[youtube_url, quality, time_step, include_audio_data, include_frame_data], | |
outputs=[raw_json_output], | |
api_name="analyze_video_data", | |
show_api=True | |
).then( | |
fn=get_video_html_from_json, | |
inputs=[raw_json_output], | |
outputs=[video_output_html], | |
show_api=False | |
).then( | |
fn=get_audio_data_from_json, | |
inputs=[raw_json_output], | |
outputs=[audio_transcription_output, audio_player_output], | |
show_api=False | |
).then( | |
fn=lambda x: json.loads(x), | |
inputs=[raw_json_output], | |
outputs=[results_json_viewer], | |
show_api=False | |
) | |
# Logic for button with progress bar | |
submit_btn_with_progress.click( | |
fn=analyze_video_data_with_progress_wrapper, | |
inputs=[youtube_url, prompt, quality, time_step, include_audio_data, include_frame_data], | |
outputs=[raw_json_output], | |
api_name="analyze_video_data_with_progress_button", | |
show_progress_on=t1, | |
show_api=False | |
).then( | |
fn=get_video_html_from_json, | |
inputs=[raw_json_output], | |
outputs=[video_output_html], | |
show_api=False | |
).then( | |
fn=get_audio_data_from_json, | |
inputs=[raw_json_output], | |
outputs=[audio_transcription_output, audio_player_output], | |
show_api=False | |
).then( | |
fn=lambda x: json.loads(x), | |
inputs=[raw_json_output], | |
outputs=[results_json_viewer], | |
show_api=False | |
) | |
# Logic of processing cookies | |
upload_cookies_file_button.click( | |
fn=upload_cookies_file, | |
inputs=file_input, | |
outputs=output_message, | |
show_api=False | |
) | |
file_input.clear( | |
fn=clear_cookies_files, | |
inputs=None, | |
outputs=output_message, | |
show_api=False | |
) | |
if __name__ == "__main__": | |
demo.launch(mcp_server=True) |