Spaces:
Running
Running
File size: 7,537 Bytes
544863c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# -*- coding: utf-8 -*-
# Install required libraries if running outside Colab
# !pip install gradio yt-dlp moviepy pillow speechrecognition llama-index lancedb google-generativeai
import gradio as gr
from moviepy import VideoFileClip
from pathlib import Path
import speech_recognition as sr
from PIL import Image
import os
import shutil
import json
import matplotlib.pyplot as plt
# Add your existing methods here (download_video, video_to_images, video_to_audio, audio_to_text, prepare_video...)
def plot_images(image_paths):
images_shown = 0
plt.figure(figsize=(16, 9))
img_files = []
for img_path in image_paths:
if os.path.isfile(img_path):
img_files.append(img_path)
images_shown += 1
if images_shown >= 7:
break
return img_files
def download_video(video_url, output_video_path="./video_data/"):
ydl_opts = {
"format": "bestvideo+bestaudio/best",
"merge_output_format": "mp4",
"outtmpl": f"{output_video_path}/input_vid.mp4",
"noplaylist": True,
"quiet": False,
# Uncomment and set your cookie file path if required
# "cookiefile": "cookies.txt",
}
Path(output_video_path).mkdir(parents=True, exist_ok=True)
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
info = ydl.sanitize_info(info)
return {
"title": info.get("title"),
"uploader": info.get("uploader"),
"views": info.get("view_count"),
}
def video_to_images(video_path, output_folder):
Path(output_folder).mkdir(parents=True, exist_ok=True)
clip = VideoFileClip(video_path)
clip.write_images_sequence(
os.path.join(output_folder, "frame%04d.png"), fps=0.2
)
def video_to_audio(video_path, output_audio_path):
clip = VideoFileClip(video_path)
audio = clip.audio
audio.write_audiofile(output_audio_path)
def audio_to_text(audio_path):
recognizer = sr.Recognizer()
try:
with sr.AudioFile(audio_path) as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data)
return text
except sr.UnknownValueError:
print("Google Speech Recognition could not understand the audio.")
except sr.RequestError as e:
print(f"Could not request results: {e}")
return None
def prepare_video(video_url,
output_video_path="./video_data/",
output_folder="./mixed_data/",
output_audio_path="./mixed_data/output_audio.wav"):
filepath = os.path.join(output_video_path, "input_vid.mp4")
#meta = download_video(video_url, output_video_path)
video_to_images(filepath, output_folder)
video_to_audio(filepath, output_audio_path)
text_data = audio_to_text(output_audio_path)
text_path = os.path.join(output_folder, "output_text.txt")
with open(text_path, "w") as file:
file.write(text_data if text_data else "")
os.remove(output_audio_path)
meta = {
"title": "test",
"uploader": "uploader",
"views": "view_count",
}
return meta, text_data
from llama_index.core.indices import MultiModalVectorStoreIndex
from llama_index.core import SimpleDirectoryReader, StorageContext
from llama_index.vector_stores.lancedb import LanceDBVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import Settings
def create_vector_db(image_txt_folder_path: str):
text_store = LanceDBVectorStore(uri="lancedb", table_name="text_collection")
image_store = LanceDBVectorStore(uri="lancedb", table_name="image_collection")
storage_context = StorageContext.from_defaults(
vector_store=text_store, image_store=image_store
)
Settings.embed_model = HuggingFaceEmbedding(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
documents = SimpleDirectoryReader(image_txt_folder_path).load_data()
index = MultiModalVectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
)
retriever_engine = index.as_retriever(
similarity_top_k=2, image_similarity_top_k=3
)
return retriever_engine
from llama_index.core.schema import ImageNode
def retrieve(retriever_engine, query_str):
retrieval_results = retriever_engine.retrieve(query_str)
retrieved_image = []
retrieved_text = []
for res_node in retrieval_results:
if isinstance(res_node.node, ImageNode):
retrieved_image.append(res_node.node.metadata["file_path"])
else:
retrieved_text.append(res_node.text)
return retrieved_image, retrieved_text
qa_tmpl_str = (
"Given the provided information, including relevant images and retrieved context from the video, \
accurately and precisely answer the query without any additional prior knowledge.\n"
"Please ensure honesty and responsibility, refraining from any racist or sexist remarks.\n"
"---------------------\n"
"Context: {context_str}\n"
"Metadata for video: {metadata_str} \n"
"---------------------\n"
"Query: {query_str}\n"
"Answer: "
)
import google.generativeai as genai
def get_response(retriever_engine, query_str, metadata_str, output_folder):
img, txt = retrieve(retriever_engine=retriever_engine, query_str=query_str)
context_str = "".join(txt)
prompt = qa_tmpl_str.format(
context_str=context_str, query_str=query_str, metadata_str=metadata_str
)
GOOGLE_API_KEY = "AIzaSyD0sn-z1CmYcyhzSyE_4t2_nSQFGmnKFWc"
genai.configure(api_key=GOOGLE_API_KEY)
gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest')
content_parts = [prompt]
image_paths = []
for img_path in img:
try:
image = Image.open(img_path)
content_parts.append(image)
image_paths.append(img_path)
except Exception as e:
print(f"Error loading image {img_path}: {e}")
response_1 = gemini_model.generate_content(content_parts)
result_text = response_1.text if hasattr(response_1, 'text') else str(response_1)
return result_text, image_paths
# Gradio interface function
def gradio_chat(query):
output_video_path = "./video_data/"
output_folder = "./mixed_data/"
output_audio_path = "./mixed_data/output_audio.wav"
video_url=""
try:
metadata_vid, text_data = prepare_video(
video_url, output_video_path, output_folder, output_audio_path
)
metadata_str = json.dumps(metadata_vid)
retriever_engine = create_vector_db(output_folder)
result_text, image_paths = get_response(
retriever_engine, query, metadata_str, output_folder
)
# Cleanup
#if os.path.exists(output_video_path):
# shutil.rmtree(output_video_path)
#if os.path.exists(output_folder):
# shutil.rmtree(output_folder)
# Gradio can return text plus images (as list of file paths)
return result_text, image_paths
except Exception as e:
return f"Error: {str(e)}", []
# Gradio UI
gradio_ui = gr.Interface(
fn=gradio_chat,
inputs=[
gr.Textbox(label="Query"),
],
outputs=[
gr.Textbox(label="Chat Response"),
gr.Gallery(label="Relevant Images", allow_preview=True),
],
title="",
description=""
)
if __name__ == "__main__":
gradio_ui.launch(share=True)
|