Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import torch | |
from PIL import Image | |
from transformers import AutoModel, AutoTokenizer | |
from decord import VideoReader, cpu | |
from scipy.spatial import cKDTree | |
import numpy as np | |
import math | |
import time | |
import spaces | |
# Model initialization | |
model = None | |
tokenizer = None | |
MAX_NUM_FRAMES = 180 | |
MAX_NUM_PACKING = 3 | |
TIME_SCALE = 0.1 | |
def load_model(): | |
global model, tokenizer | |
if model is None: | |
gr.Info("Loading model... This may take a moment.") | |
model = AutoModel.from_pretrained( | |
'openbmb/MiniCPM-V-4_5', | |
trust_remote_code=True, | |
attn_implementation='sdpa', | |
torch_dtype=torch.bfloat16 | |
) | |
model = model.eval() | |
tokenizer = AutoTokenizer.from_pretrained( | |
'openbmb/MiniCPM-V-4_5', | |
trust_remote_code=True | |
) | |
gr.Success("Model loaded successfully!") | |
return model, tokenizer | |
def map_to_nearest_scale(values, scale): | |
tree = cKDTree(np.asarray(scale)[:, None]) | |
_, indices = tree.query(np.asarray(values)[:, None]) | |
return np.asarray(scale)[indices] | |
def group_array(arr, size): | |
return [arr[i:i+size] for i in range(0, len(arr), size)] | |
def encode_video(video_path, choose_fps=3, force_packing=None): | |
def uniform_sample(l, n): | |
gap = len(l) / n | |
idxs = [int(i * gap + gap / 2) for i in range(n)] | |
return [l[i] for i in idxs] | |
vr = VideoReader(video_path, ctx=cpu(0)) | |
fps = vr.get_avg_fps() | |
video_duration = len(vr) / fps | |
if choose_fps * int(video_duration) <= MAX_NUM_FRAMES: | |
packing_nums = 1 | |
choose_frames = round(min(choose_fps, round(fps)) * min(MAX_NUM_FRAMES, video_duration)) | |
else: | |
packing_nums = math.ceil(video_duration * choose_fps / MAX_NUM_FRAMES) | |
if packing_nums <= MAX_NUM_PACKING: | |
choose_frames = round(video_duration * choose_fps) | |
else: | |
choose_frames = round(MAX_NUM_FRAMES * MAX_NUM_PACKING) | |
packing_nums = MAX_NUM_PACKING | |
frame_idx = [i for i in range(0, len(vr))] | |
frame_idx = np.array(uniform_sample(frame_idx, choose_frames)) | |
if force_packing: | |
packing_nums = min(force_packing, MAX_NUM_PACKING) | |
frames = vr.get_batch(frame_idx).asnumpy() | |
frame_idx_ts = frame_idx / fps | |
scale = np.arange(0, video_duration, TIME_SCALE) | |
frame_ts_id = map_to_nearest_scale(frame_idx_ts, scale) / TIME_SCALE | |
frame_ts_id = frame_ts_id.astype(np.int32) | |
assert len(frames) == len(frame_ts_id) | |
frames = [Image.fromarray(v.astype('uint8')).convert('RGB') for v in frames] | |
frame_ts_id_group = group_array(frame_ts_id, packing_nums) | |
return frames, frame_ts_id_group, video_duration, len(frame_idx), packing_nums | |
def process_video_and_question(video, question, fps, force_packing, history): | |
if video is None: | |
gr.Warning("Please upload a video first.") | |
return history, "" | |
if not question: | |
gr.Warning("Please enter a question.") | |
return history, "" | |
try: | |
# Load model if not already loaded | |
model, tokenizer = load_model() | |
model = model.cuda() | |
# Encode video | |
gr.Info(f"Processing video with {fps} FPS...") | |
frames, frame_ts_id_group, duration, num_frames, packing_nums = encode_video( | |
video, | |
fps, | |
force_packing=force_packing if force_packing > 0 else None | |
) | |
# Prepare messages | |
msgs = [ | |
{'role': 'user', 'content': frames + [question]}, | |
] | |
# Get model response | |
gr.Info("Generating response...") | |
answer = model.chat( | |
msgs=msgs, | |
tokenizer=tokenizer, | |
use_image_id=False, | |
max_slice_nums=1, | |
temporal_ids=frame_ts_id_group | |
) | |
# Update chat history | |
history.append({ | |
"role": "user", | |
"content": f"📹 [Video: {duration:.1f}s, {num_frames} frames, packing: {packing_nums}]\n{question}" | |
}) | |
history.append({ | |
"role": "assistant", | |
"content": answer | |
}) | |
return history, "" | |
except Exception as e: | |
gr.Error(f"Error processing video: {str(e)}") | |
return history, "" | |
def clear_chat(): | |
return [], None, "", 3, 0 | |
# Create Gradio interface with theme | |
theme = gr.themes.Soft( | |
primary_hue=gr.themes.colors.blue, | |
secondary_hue=gr.themes.colors.gray, | |
neutral_hue=gr.themes.colors.gray, | |
spacing_size="md", | |
radius_size="md", | |
text_size="md", | |
font=[gr.themes.GoogleFont("Inter"), "SF Pro Display", "-apple-system", "BlinkMacSystemFont", "sans-serif"], | |
font_mono=[gr.themes.GoogleFont("SF Mono"), "Monaco", "Menlo", "monospace"] | |
).set( | |
body_background_fill="*neutral_50", | |
body_background_fill_dark="*neutral_950", | |
button_primary_background_fill="*primary_500", | |
button_primary_background_fill_hover="*primary_600", | |
button_primary_text_color="white", | |
button_primary_border_color="*primary_500", | |
block_background_fill="white", | |
block_background_fill_dark="*neutral_900", | |
block_border_width="1px", | |
block_border_color="*neutral_200", | |
block_border_color_dark="*neutral_800", | |
block_radius="*radius_lg", | |
block_shadow="0px 1px 3px 0px rgba(0, 0, 0, 0.02), 0px 0px 0px 1px rgba(0, 0, 0, 0.05)", | |
block_shadow_dark="0px 1px 3px 0px rgba(0, 0, 0, 0.1), 0px 0px 0px 1px rgba(255, 255, 255, 0.05)", | |
input_background_fill="*neutral_50", | |
input_background_fill_dark="*neutral_900", | |
input_border_color="*neutral_300", | |
input_border_color_dark="*neutral_700", | |
input_border_width="1px", | |
input_radius="*radius_md", | |
slider_color="*primary_500", | |
) | |
with gr.Blocks(theme=theme, title="Video Chat with MiniCPM-V") as demo: | |
gr.Markdown( | |
""" | |
# 🎥 Video Chat with MiniCPM-V-4.5 | |
Upload a video and ask questions about it! The model uses advanced 3D-resampler compression | |
to process multiple frames efficiently. | |
**Note:** First run will download the model (~8GB), which may take a few minutes. | |
""" | |
) | |
with gr.Row(): | |
# Main video area (takes most of the space) | |
with gr.Column(scale=3): | |
video_input = gr.Video( | |
label="Upload Video", | |
height=600 | |
) | |
# Sidebar with all controls | |
with gr.Column(scale=1): | |
chatbot = gr.Chatbot( | |
label="Chat", | |
height=300, | |
type="messages" | |
) | |
with gr.Row(): | |
question_input = gr.Textbox( | |
label="Ask about the video", | |
placeholder="e.g., Describe what happens in this video...", | |
lines=2, | |
scale=4 | |
) | |
submit_btn = gr.Button("Send", variant="primary", scale=1) | |
with gr.Row(): | |
clear_btn = gr.Button("Clear Chat", variant="secondary", size="sm") | |
example_btn1 = gr.Button("Describe", size="sm") | |
example_btn2 = gr.Button("Action", size="sm") | |
example_btn3 = gr.Button("People", size="sm") | |
with gr.Accordion("Advanced Settings", open=False): | |
fps_slider = gr.Slider( | |
minimum=1, | |
maximum=10, | |
value=3, | |
step=1, | |
label="FPS for frame extraction", | |
info="Higher FPS captures more detail but uses more memory" | |
) | |
force_packing_slider = gr.Slider( | |
minimum=0, | |
maximum=MAX_NUM_PACKING, | |
value=0, | |
step=1, | |
label="Force Packing", | |
info=f"0 = auto, 1-{MAX_NUM_PACKING} = force specific packing number" | |
) | |
with gr.Accordion("ℹ️ Video Info", open=False): | |
gr.Markdown( | |
""" | |
- **Max frames:** 180 × 3 packing = 540 frames | |
- **Temporal compression:** 64 tokens per video | |
- **Supported formats:** MP4, AVI, MOV, etc. | |
""" | |
) | |
# Example questions | |
example_btn1.click( | |
lambda: "Describe this video in detail.", | |
outputs=question_input | |
) | |
example_btn2.click( | |
lambda: "What actions or events occur in this video?", | |
outputs=question_input | |
) | |
example_btn3.click( | |
lambda: "Are there any people in this video? If so, what are they doing?", | |
outputs=question_input | |
) | |
# Event handlers | |
submit_btn.click( | |
fn=process_video_and_question, | |
inputs=[video_input, question_input, fps_slider, force_packing_slider, chatbot], | |
outputs=[chatbot, question_input] | |
) | |
question_input.submit( | |
fn=process_video_and_question, | |
inputs=[video_input, question_input, fps_slider, force_packing_slider, chatbot], | |
outputs=[chatbot, question_input] | |
) | |
clear_btn.click( | |
fn=clear_chat, | |
outputs=[chatbot, video_input, question_input, fps_slider, force_packing_slider] | |
) | |
# Examples | |
gr.Examples( | |
examples=[ | |
["Describe what happens in this video"], | |
["What is the main subject of this video?"], | |
["Count the number of objects or people in the video"], | |
["What emotions or mood does this video convey?"], | |
["Summarize the key moments in this video"], | |
], | |
inputs=question_input, | |
label="Example Questions" | |
) | |
if __name__ == "__main__": | |
demo.launch() |