File size: 3,766 Bytes
a0f484b 69bba7a a0f484b 61a09f9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
import os
import cv2
import base64
import gradio as gr
from openai import OpenAI
# 1. Frame Extraction
def extract_frames(video_path: str, num_frames: int = 8, max_resolution: int = 720):
frames_base64 = []
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Cannot open video file: {video_path}")
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
step = max(total_frames // num_frames, 1)
frame_indices = [min(i * step, total_frames - 1) for i in range(num_frames)]
for index in frame_indices:
cap.set(cv2.CAP_PROP_POS_FRAMES, index)
ret, frame = cap.read()
if not ret or frame is None:
continue
h, w, _ = frame.shape
if max(h, w) > max_resolution:
scale = max_resolution / float(max(h, w))
frame = cv2.resize(frame, (int(w * scale), int(h * scale)))
success, buffer = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
if success:
b64 = base64.b64encode(buffer).decode("utf-8")
data_uri = f"data:image/jpeg;base64,{b64}"
frames_base64.append(data_uri)
cap.release()
return frames_base64
# 2. Prompt Construction
def build_prompt(frames, question):
content = [{"type": "text", "text": question}]
for image_data_uri in frames:
content.append({
"type": "image_url",
"image_url": {"url": image_data_uri}
})
return content
# 3. Nebius Inference Call
def query_qwen(prompt_content):
api_key = os.getenv("NEBIUS_API_KEY")
print(api_key) # Debugging line to check if API key is loaded correctly
if not api_key:
raise ValueError("NEBIUS_API_KEY not found in environment variables.")
client = OpenAI(api_key=api_key, base_url="https://api.studio.nebius.ai/v1/")
try:
response = client.chat.completions.create(
model="Qwen/Qwen2.5-VL-72B-Instruct",
messages=[{"role": "user", "content": prompt_content}],
temperature=0.2,
max_tokens=512
)
return response
except Exception as e:
return {"error": str(e)}
# 4. Parse Response
def parse_response(response):
if isinstance(response, dict) and "error" in response:
return f"Error: {response['error']}"
try:
choice = response.choices[0]
if hasattr(choice, "message"):
return choice.message.content.strip()
else:
return choice.get("message", {}).get("content", "No message received.")
except Exception as e:
return f"Failed to parse response: {str(e)}"
# MCP Core Function
def answer_question(video_path: str, question: str) -> str:
try:
frames = extract_frames(video_path)
prompt = build_prompt(frames, question)
response = query_qwen(prompt)
return parse_response(response)
except Exception as e:
return f"Something went wrong: {str(e)}"
# Gradio App UI
def gradio_interface(video, question):
return answer_question(video, question)
with gr.Blocks(title="🎥 Video QA with Qwen2.5-VL") as demo:
gr.Markdown("## 🎥 Interactive Video Question Answering\nUpload a video and ask a question about it.")
with gr.Row():
video_input = gr.Video(label="Upload Video")
question_input = gr.Textbox(label="Your Question", placeholder="e.g., What color was the car in the first scene?")
answer_output = gr.Textbox(label="Model Answer", lines=3)
submit_btn = gr.Button("Get Answer")
submit_btn.click(fn=gradio_interface, inputs=[video_input, question_input], outputs=answer_output)
# Launch the interface and MCP server
if __name__ == "__main__":
demo.launch(mcp_server=True) |