dyryu1208
commit
920dfd0
import gradio as gr
from PIL import Image
import threading
import time
import os
import shutil
from pathlib import Path
from huggingface_hub import hf_hub_download
from run_backend import main as run_backend_main, analysis_results, search_results
# Files Prequisites
def prepare_dataset_files():
# Create local directory
os.makedirs("data", exist_ok=True)
# Check current directory
print(f"Current working directory: {os.getcwd()}")
# Define list of required files
files_to_download = [
"aws.mp4", "aws.png", "aws.wav",
"aws_bundesliga.mp4", "aws_bundesliga.png", "aws_bundesliga.wav",
"summit_sungwoo.mp4", "summit_sungwoo.png", "summit_sungwoo.wav"
]
repo_id = "cloudplayer/hackathon_data"
repo_type = "dataset"
try:
for file_name in files_to_download:
local_path = os.path.join("data", file_name)
# Skip if file already exists
if os.path.exists(local_path):
print(f"File already exists, skipping download: {local_path}")
continue
# Download file from Hub
downloaded_path = hf_hub_download(
repo_id=repo_id,
filename=file_name,
repo_type=repo_type,
local_dir="data",
local_dir_use_symlinks=False # Download actual file
)
print(f"Downloaded file: {downloaded_path}")
# Check downloaded files
print(f"Files in data directory: {os.listdir('data')}")
return True
except Exception as e:
print(f"Error downloading files: {e}")
import traceback
traceback.print_exc()
return False
prepare_dataset_files()
# Set the directory path based on the current file
BASE_DIR = "."
DATA_DIR = "./data"
TRANSCRIPT_FILE = "./transcribe_texts"
# Analysis Status Management
analysis_running = False
analysis_thread = None
last_update_time = 0
def read_transcript():
"""Read Transcript File"""
try:
with open(TRANSCRIPT_FILE, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
return "Loading Script..."
def get_current_content():
"""Get Current Content"""
global last_update_time
if not analysis_running:
return "", "", "", "", ""
try:
current_time = time.time()
if current_time - last_update_time < 1.0: # If less than 1 second, do not update
return None
last_update_time = current_time
transcript = read_transcript()
current_analysis = analysis_results[-1] if analysis_results else ""
current_search = search_results[-1] if search_results else ""
# 이전 결과 업데이트
if len(analysis_results) > 1:
prev_analysis_text = "\n\n".join([
f"#### Summary #{i+1}\n{result}"
for i, result in enumerate(analysis_results[:-1])
])
else:
prev_analysis_text = "No previous analysis results."
if len(search_results) > 1:
prev_search_text = "\n\n".join([
f"#### Search Result #{i+1}\n{result}"
for i, result in enumerate(search_results[:-1])
])
else:
prev_search_text = "No previous search results."
return transcript, current_analysis, current_search, prev_analysis_text, prev_search_text
except Exception as e:
print(f"Error occurred while updating content: {e}")
return None
def start_analysis(party, video_path):
"""Start Analysis"""
global analysis_running, analysis_thread, last_update_time
if not analysis_running:
analysis_running = True
last_update_time = time.time()
# Initialize Transcript File
try:
with open(TRANSCRIPT_FILE, "w", encoding="utf-8") as f:
f.write("")
except Exception as e:
pass
# Start Analysis Thread
analysis_thread = threading.Thread(target=run_backend_main, args=(party,))
analysis_thread.daemon = True
analysis_thread.start()
return gr.Markdown(f"# {party} Analysis"), gr.update(value=video_path)
def create_ui():
"""Create UI"""
with gr.Blocks(title="Real-Time AI Video Summarization Service", theme=gr.themes.Soft()) as demo:
# State Variables
party = gr.State("")
container_visible = gr.State(True)
selection_visible = gr.State(True)
auto_update = gr.State(False) # Auto Update State
update_trigger = gr.State(0) # Update Trigger
# Add Timer Component (10 second interval)
timer = gr.Timer(10.0, active=False)
# Add User Guide at the top
gr.Markdown("""
## How to Use:
1. Wait until the thumbnail images for each video fully appear.
2. Select the video title located just below the thumbnail image, then click the video play button in "Sample Video".
(You can select any video, but we recommend choosing "Data, AI & Soccer How Bundesliga is transforming the fan experience" due to language considerations.)
3. When you press the Auto Update button at the bottom, the Real-Time Script, AI Summary Result, and Keyword Search Result will be updated every 10 seconds in real-time according to the agent workflow.
* The Real-Time Script is the execution result of the Speech Recognition Agent that converts video content to text using AWS Transcribe.
* The AI Summary Result is the execution result of the Summarization Agent.
* The Keyword Search Result is the execution result of the Knowledge Retrieval Agent.
4. By pressing the Refresh button, you can immediately check the results up to that point.
""")
with gr.Column(visible=lambda: container_visible.value) as aws_container:
gr.Markdown("### AWS Lecture - Select the video to perform AI summarization")
with gr.Row(equal_height=True):
with gr.Column(scale=1, min_width=400):
aws_image_2 = gr.Image(
value=str(DATA_DIR + "/aws_bundesliga.png"),
label="aws_bundesliga",
show_label=True,
height=300, # Fixed Image Height
width=400, # Fixed Image Width
elem_id="aws_bundesliga"
)
aws_button_2 = gr.Button(
"Data, AI & Soccer How Bundesliga is transforming the fan experience",
variant="primary",
size="lg",
elem_id="aws_button_2"
)
with gr.Column(scale=1, min_width=400):
aws_image_1 = gr.Image(
value=str(DATA_DIR + "/summit_sungwoo.png"),
label="summit_sungwoo",
show_label=True,
height=300, # Fixed Image Height
width=400, # Fixed Image Width
elem_id="summit_sungwoo"
)
aws_button_1 = gr.Button(
"The Future of AI is Here! Agents for Amazon Bedrock",
variant="primary",
size="lg",
elem_id="aws_button_1"
)
with gr.Column(scale=1, min_width=400):
aws_image_3 = gr.Image(
value=str(DATA_DIR + "/aws.png"),
label="aws",
show_label=True,
height=300,
width=400,
elem_id="aws"
)
aws_button_3 = gr.Button(
"Discover the New AWS Services with AWS Heroes in 2024",
variant="primary",
size="lg",
elem_id="aws_button_3"
)
# Add CSS Style
gr.Markdown("""
<style>
#summit_sungwoo, #aws_bundesliga, #aws{
object-fit: contain !important;
background-color: #f8f9fa;
border-radius: 10px;
padding: 10px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
#aws_button_1, #aws_button_2, #aws_button_3 {
margin-top: 20px;
width: 100%;
height: 50px;
font-size: 1.2em;
font-weight: bold;
border-radius: 8px;
transition: all 0.3s ease;
}
#aws_button_1:hover, #aws_button_2:hover, #aws_button_3:hover {
transform: translateY(-2px);
box-shadow: 0 4px 8px rgba(0,0,0,0.2);
}
</style>
""")
# Analysis Container (Initially Hidden)
with gr.Column(visible=lambda: selection_visible.value) as analysis_container:
title = gr.Markdown("# Video Analysis")
with gr.Row():
# Left: Video
with gr.Column(scale=3):
video = gr.Video(
label="Sample Video",
show_label=True,
interactive=False,
value=str(DATA_DIR + "/summit_sungwoo.mp4"), # Default Value
elem_id="debate_video"
)
# Right: Analysis Results Tabs
with gr.Column(scale=2):
with gr.Tabs() as tabs:
with gr.TabItem("Real-Time Script"):
transcript = gr.Textbox(
label="Real-Time Script",
show_label=True,
lines=20,
interactive=False,
value="Loading Script...", # Initial Value
elem_id="transcript_box"
)
with gr.TabItem("AI Summary Result"):
analysis = gr.Markdown(
value="Loading Analysis Result...", # Initial Value
elem_id="analysis_result"
)
with gr.Accordion("View Previous Analysis Results", open=False):
prev_analysis = gr.Markdown(
value="No previous analysis results.", # Initial Value
elem_id="prev_analysis"
)
with gr.TabItem("Keyword Search Result"):
search = gr.Markdown(
value="Loading Search Result...", # Initial Value
elem_id="search_result"
)
with gr.Accordion("View Previous Keyword Search Results", open=False):
prev_search = gr.Markdown(
value="No previous search results.", # Initial Value
elem_id="prev_search"
)
# Show Status
status = gr.Markdown("Analysis is in progress...")
# Update Button
with gr.Row():
update_button = gr.Button(
"Refresh",
variant="secondary",
size="lg",
elem_id="update_button"
)
auto_update_button = gr.Button(
"Auto Update",
variant="secondary",
size="lg",
elem_id="auto_update_button"
)
# Add CSS Style for Analysis Page
gr.Markdown("""
<style>
#debate_video {
border-radius: 10px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
#transcript_box {
font-family: 'Noto Sans KR', sans-serif;
line-height: 1.6;
}
#analysis_result, #search_result, #prev_analysis, #prev_search {
font-family: 'Noto Sans KR', sans-serif;
line-height: 1.8;
padding: 15px;
background-color: #f8f9fa;
border-radius: 8px;
}
#update_button, #auto_update_button {
margin: 10px;
transition: all 0.3s ease;
}
#update_button:hover, #auto_update_button:hover {
transform: translateY(-2px);
box-shadow: 0 4px 8px rgba(0,0,0,0.2);
}
</style>
""")
def on_aws_select(content_name, video_file):
"""AWS Lecture Selection Processing"""
party.value = content_name
video_path = str(DATA_DIR + f"/{video_file}")
container_visible.value = False
selection_visible.value = True
return start_analysis(content_name, video_path)
def trigger_update():
"""Increase Update Trigger"""
update_trigger.value += 1
return update_trigger.value
def update_content(trigger):
"""Update Content"""
if not analysis_running:
return (
"Analysis has not started.",
"No analysis results.",
"No search results.",
"No previous analysis results.",
"No previous search results.",
trigger
)
result = get_current_content()
if result is None:
return (
transcript.value,
analysis.value,
search.value,
prev_analysis.value,
prev_search.value,
trigger
)
return (*result, trigger)
def toggle_auto_update():
"""Toggle Auto Update"""
auto_update.value = not auto_update.value
if auto_update.value:
# Start Auto Update - Increase Trigger
trigger_update()
return "Auto Update has started. It will be updated every 10 seconds.", gr.Timer(active=True)
else:
return "Auto Update has stopped.", gr.Timer(active=False)
aws_button_1.click(
fn=lambda: on_aws_select("Agents for Amazon Bedrock", "summit_sungwoo.mp4"),
outputs=[title, video]
)
aws_button_2.click(
fn=lambda: on_aws_select("Bundesliga Fan Experience", "aws_bundesliga.mp4"),
outputs=[title, video]
)
aws_button_3.click(
fn=lambda: on_aws_select("AWS_2024_recap", "aws.mp4"),
outputs=[title, video]
)
# Update Button Click Event
update_button.click(
fn=trigger_update,
outputs=[update_trigger]
)
# Auto Update Button Click Event
auto_update_button.click(
fn=toggle_auto_update,
outputs=[status, timer]
)
# Timer Tick Event
timer.tick(
fn=lambda: trigger_update() if auto_update.value else None,
outputs=[update_trigger]
)
# Update Trigger Change Event
update_trigger.change(
fn=update_content,
inputs=[update_trigger],
outputs=[transcript, analysis, search, prev_analysis, prev_search, update_trigger]
)
# Initial Load - Set Update Trigger
demo.load(
fn=lambda: (update_trigger.value + 1),
outputs=[update_trigger]
)
return demo
if __name__ == "__main__":
demo = create_ui()
demo.queue() # Enable Queue
demo.launch(share=True)