Spaces:
Running
Running
# -*- coding: utf-8 -*- | |
# Install required libraries if running outside Colab | |
# !pip install gradio yt-dlp moviepy pillow speechrecognition llama-index lancedb google-generativeai | |
import gradio as gr | |
from moviepy import VideoFileClip | |
from pathlib import Path | |
import speech_recognition as sr | |
from PIL import Image | |
import os | |
import shutil | |
import json | |
import matplotlib.pyplot as plt | |
# Add your existing methods here (download_video, video_to_images, video_to_audio, audio_to_text, prepare_video...) | |
def plot_images(image_paths): | |
images_shown = 0 | |
plt.figure(figsize=(16, 9)) | |
img_files = [] | |
for img_path in image_paths: | |
if os.path.isfile(img_path): | |
img_files.append(img_path) | |
images_shown += 1 | |
if images_shown >= 7: | |
break | |
return img_files | |
def download_video(video_url, output_video_path="./video_data/"): | |
ydl_opts = { | |
"format": "bestvideo+bestaudio/best", | |
"merge_output_format": "mp4", | |
"outtmpl": f"{output_video_path}/input_vid.mp4", | |
"noplaylist": True, | |
"quiet": False, | |
# Uncomment and set your cookie file path if required | |
# "cookiefile": "cookies.txt", | |
} | |
Path(output_video_path).mkdir(parents=True, exist_ok=True) | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(video_url, download=True) | |
info = ydl.sanitize_info(info) | |
return { | |
"title": info.get("title"), | |
"uploader": info.get("uploader"), | |
"views": info.get("view_count"), | |
} | |
def video_to_images(video_path, output_folder): | |
Path(output_folder).mkdir(parents=True, exist_ok=True) | |
clip = VideoFileClip(video_path) | |
clip.write_images_sequence( | |
os.path.join(output_folder, "frame%04d.png"), fps=0.2 | |
) | |
def video_to_audio(video_path, output_audio_path): | |
clip = VideoFileClip(video_path) | |
audio = clip.audio | |
audio.write_audiofile(output_audio_path) | |
def audio_to_text(audio_path): | |
recognizer = sr.Recognizer() | |
try: | |
with sr.AudioFile(audio_path) as source: | |
audio_data = recognizer.record(source) | |
text = recognizer.recognize_google(audio_data) | |
return text | |
except sr.UnknownValueError: | |
print("Google Speech Recognition could not understand the audio.") | |
except sr.RequestError as e: | |
print(f"Could not request results: {e}") | |
return None | |
def prepare_video(video_url, | |
output_video_path="./video_data/", | |
output_folder="./mixed_data/", | |
output_audio_path="./mixed_data/output_audio.wav"): | |
filepath = os.path.join(output_video_path, "input_vid.mp4") | |
#meta = download_video(video_url, output_video_path) | |
video_to_images(filepath, output_folder) | |
video_to_audio(filepath, output_audio_path) | |
text_data = audio_to_text(output_audio_path) | |
text_path = os.path.join(output_folder, "output_text.txt") | |
with open(text_path, "w") as file: | |
file.write(text_data if text_data else "") | |
os.remove(output_audio_path) | |
meta = { | |
"title": "test", | |
"uploader": "uploader", | |
"views": "view_count", | |
} | |
return meta, text_data | |
from llama_index.core.indices import MultiModalVectorStoreIndex | |
from llama_index.core import SimpleDirectoryReader, StorageContext | |
from llama_index.vector_stores.lancedb import LanceDBVectorStore | |
from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
from llama_index.core import Settings | |
def create_vector_db(image_txt_folder_path: str): | |
text_store = LanceDBVectorStore(uri="lancedb", table_name="text_collection") | |
image_store = LanceDBVectorStore(uri="lancedb", table_name="image_collection") | |
storage_context = StorageContext.from_defaults( | |
vector_store=text_store, image_store=image_store | |
) | |
Settings.embed_model = HuggingFaceEmbedding( | |
model_name="sentence-transformers/all-MiniLM-L6-v2" | |
) | |
documents = SimpleDirectoryReader(image_txt_folder_path).load_data() | |
index = MultiModalVectorStoreIndex.from_documents( | |
documents, | |
storage_context=storage_context, | |
) | |
retriever_engine = index.as_retriever( | |
similarity_top_k=2, image_similarity_top_k=3 | |
) | |
return retriever_engine | |
from llama_index.core.schema import ImageNode | |
def retrieve(retriever_engine, query_str): | |
retrieval_results = retriever_engine.retrieve(query_str) | |
retrieved_image = [] | |
retrieved_text = [] | |
for res_node in retrieval_results: | |
if isinstance(res_node.node, ImageNode): | |
retrieved_image.append(res_node.node.metadata["file_path"]) | |
else: | |
retrieved_text.append(res_node.text) | |
return retrieved_image, retrieved_text | |
qa_tmpl_str = ( | |
"Given the provided information, including relevant images and retrieved context from the video, \ | |
accurately and precisely answer the query without any additional prior knowledge.\n" | |
"Please ensure honesty and responsibility, refraining from any racist or sexist remarks.\n" | |
"---------------------\n" | |
"Context: {context_str}\n" | |
"Metadata for video: {metadata_str} \n" | |
"---------------------\n" | |
"Query: {query_str}\n" | |
"Answer: " | |
) | |
import google.generativeai as genai | |
def get_response(retriever_engine, query_str, metadata_str, output_folder): | |
img, txt = retrieve(retriever_engine=retriever_engine, query_str=query_str) | |
context_str = "".join(txt) | |
prompt = qa_tmpl_str.format( | |
context_str=context_str, query_str=query_str, metadata_str=metadata_str | |
) | |
GOOGLE_API_KEY = "AIzaSyD0sn-z1CmYcyhzSyE_4t2_nSQFGmnKFWc" | |
genai.configure(api_key=GOOGLE_API_KEY) | |
gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest') | |
content_parts = [prompt] | |
image_paths = [] | |
for img_path in img: | |
try: | |
image = Image.open(img_path) | |
content_parts.append(image) | |
image_paths.append(img_path) | |
except Exception as e: | |
print(f"Error loading image {img_path}: {e}") | |
response_1 = gemini_model.generate_content(content_parts) | |
result_text = response_1.text if hasattr(response_1, 'text') else str(response_1) | |
return result_text, image_paths | |
# Gradio interface function | |
def gradio_chat(query): | |
output_video_path = "./video_data/" | |
output_folder = "./mixed_data/" | |
output_audio_path = "./mixed_data/output_audio.wav" | |
video_url="" | |
try: | |
metadata_vid, text_data = prepare_video( | |
video_url, output_video_path, output_folder, output_audio_path | |
) | |
metadata_str = json.dumps(metadata_vid) | |
retriever_engine = create_vector_db(output_folder) | |
result_text, image_paths = get_response( | |
retriever_engine, query, metadata_str, output_folder | |
) | |
# Cleanup | |
#if os.path.exists(output_video_path): | |
# shutil.rmtree(output_video_path) | |
#if os.path.exists(output_folder): | |
# shutil.rmtree(output_folder) | |
# Gradio can return text plus images (as list of file paths) | |
return result_text, image_paths | |
except Exception as e: | |
return f"Error: {str(e)}", [] | |
# Gradio UI | |
gradio_ui = gr.Interface( | |
fn=gradio_chat, | |
inputs=[ | |
gr.Textbox(label="Query"), | |
], | |
outputs=[ | |
gr.Textbox(label="Chat Response"), | |
gr.Gallery(label="Relevant Images", allow_preview=True), | |
], | |
title="", | |
description="" | |
) | |
if __name__ == "__main__": | |
gradio_ui.launch(share=True) | |