import gradio as gr from PIL import Image import threading import time import os import shutil from pathlib import Path from huggingface_hub import hf_hub_download from run_backend import main as run_backend_main, analysis_results, search_results # Files Prequisites def prepare_dataset_files(): # Create local directory os.makedirs("data", exist_ok=True) # Check current directory print(f"Current working directory: {os.getcwd()}") # Define list of required files files_to_download = [ "aws.mp4", "aws.png", "aws.wav", "aws_bundesliga.mp4", "aws_bundesliga.png", "aws_bundesliga.wav", "summit_sungwoo.mp4", "summit_sungwoo.png", "summit_sungwoo.wav" ] repo_id = "cloudplayer/hackathon_data" repo_type = "dataset" try: for file_name in files_to_download: local_path = os.path.join("data", file_name) # Skip if file already exists if os.path.exists(local_path): print(f"File already exists, skipping download: {local_path}") continue # Download file from Hub downloaded_path = hf_hub_download( repo_id=repo_id, filename=file_name, repo_type=repo_type, local_dir="data", local_dir_use_symlinks=False # Download actual file ) print(f"Downloaded file: {downloaded_path}") # Check downloaded files print(f"Files in data directory: {os.listdir('data')}") return True except Exception as e: print(f"Error downloading files: {e}") import traceback traceback.print_exc() return False prepare_dataset_files() # Set the directory path based on the current file BASE_DIR = "." DATA_DIR = "./data" TRANSCRIPT_FILE = "./transcribe_texts" # Analysis Status Management analysis_running = False analysis_thread = None last_update_time = 0 def read_transcript(): """Read Transcript File""" try: with open(TRANSCRIPT_FILE, "r", encoding="utf-8") as f: return f.read() except Exception as e: return "Loading Script..." def get_current_content(): """Get Current Content""" global last_update_time if not analysis_running: return "", "", "", "", "" try: current_time = time.time() if current_time - last_update_time < 1.0: # If less than 1 second, do not update return None last_update_time = current_time transcript = read_transcript() current_analysis = analysis_results[-1] if analysis_results else "" current_search = search_results[-1] if search_results else "" # 이전 결과 업데이트 if len(analysis_results) > 1: prev_analysis_text = "\n\n".join([ f"#### Summary #{i+1}\n{result}" for i, result in enumerate(analysis_results[:-1]) ]) else: prev_analysis_text = "No previous analysis results." if len(search_results) > 1: prev_search_text = "\n\n".join([ f"#### Search Result #{i+1}\n{result}" for i, result in enumerate(search_results[:-1]) ]) else: prev_search_text = "No previous search results." return transcript, current_analysis, current_search, prev_analysis_text, prev_search_text except Exception as e: print(f"Error occurred while updating content: {e}") return None def start_analysis(party, video_path): """Start Analysis""" global analysis_running, analysis_thread, last_update_time if not analysis_running: analysis_running = True last_update_time = time.time() # Initialize Transcript File try: with open(TRANSCRIPT_FILE, "w", encoding="utf-8") as f: f.write("") except Exception as e: pass # Start Analysis Thread analysis_thread = threading.Thread(target=run_backend_main, args=(party,)) analysis_thread.daemon = True analysis_thread.start() return gr.Markdown(f"# {party} Analysis"), gr.update(value=video_path) def create_ui(): """Create UI""" with gr.Blocks(title="Real-Time AI Video Summarization Service", theme=gr.themes.Soft()) as demo: # State Variables party = gr.State("") container_visible = gr.State(True) selection_visible = gr.State(True) auto_update = gr.State(False) # Auto Update State update_trigger = gr.State(0) # Update Trigger # Add Timer Component (10 second interval) timer = gr.Timer(10.0, active=False) # Add User Guide at the top gr.Markdown(""" ## How to Use: 1. Wait until the thumbnail images for each video fully appear. 2. Select the video title located just below the thumbnail image, then click the video play button in "Sample Video". (You can select any video, but we recommend choosing "Data, AI & Soccer How Bundesliga is transforming the fan experience" due to language considerations.) 3. When you press the Auto Update button at the bottom, the Real-Time Script, AI Summary Result, and Keyword Search Result will be updated every 10 seconds in real-time according to the agent workflow. * The Real-Time Script is the execution result of the Speech Recognition Agent that converts video content to text using AWS Transcribe. * The AI Summary Result is the execution result of the Summarization Agent. * The Keyword Search Result is the execution result of the Knowledge Retrieval Agent. 4. By pressing the Refresh button, you can immediately check the results up to that point. """) with gr.Column(visible=lambda: container_visible.value) as aws_container: gr.Markdown("### AWS Lecture - Select the video to perform AI summarization") with gr.Row(equal_height=True): with gr.Column(scale=1, min_width=400): aws_image_2 = gr.Image( value=str(DATA_DIR + "/aws_bundesliga.png"), label="aws_bundesliga", show_label=True, height=300, # Fixed Image Height width=400, # Fixed Image Width elem_id="aws_bundesliga" ) aws_button_2 = gr.Button( "Data, AI & Soccer How Bundesliga is transforming the fan experience", variant="primary", size="lg", elem_id="aws_button_2" ) with gr.Column(scale=1, min_width=400): aws_image_1 = gr.Image( value=str(DATA_DIR + "/summit_sungwoo.png"), label="summit_sungwoo", show_label=True, height=300, # Fixed Image Height width=400, # Fixed Image Width elem_id="summit_sungwoo" ) aws_button_1 = gr.Button( "The Future of AI is Here! Agents for Amazon Bedrock", variant="primary", size="lg", elem_id="aws_button_1" ) with gr.Column(scale=1, min_width=400): aws_image_3 = gr.Image( value=str(DATA_DIR + "/aws.png"), label="aws", show_label=True, height=300, width=400, elem_id="aws" ) aws_button_3 = gr.Button( "Discover the New AWS Services with AWS Heroes in 2024", variant="primary", size="lg", elem_id="aws_button_3" ) # Add CSS Style gr.Markdown(""" """) # Analysis Container (Initially Hidden) with gr.Column(visible=lambda: selection_visible.value) as analysis_container: title = gr.Markdown("# Video Analysis") with gr.Row(): # Left: Video with gr.Column(scale=3): video = gr.Video( label="Sample Video", show_label=True, interactive=False, value=str(DATA_DIR + "/summit_sungwoo.mp4"), # Default Value elem_id="debate_video" ) # Right: Analysis Results Tabs with gr.Column(scale=2): with gr.Tabs() as tabs: with gr.TabItem("Real-Time Script"): transcript = gr.Textbox( label="Real-Time Script", show_label=True, lines=20, interactive=False, value="Loading Script...", # Initial Value elem_id="transcript_box" ) with gr.TabItem("AI Summary Result"): analysis = gr.Markdown( value="Loading Analysis Result...", # Initial Value elem_id="analysis_result" ) with gr.Accordion("View Previous Analysis Results", open=False): prev_analysis = gr.Markdown( value="No previous analysis results.", # Initial Value elem_id="prev_analysis" ) with gr.TabItem("Keyword Search Result"): search = gr.Markdown( value="Loading Search Result...", # Initial Value elem_id="search_result" ) with gr.Accordion("View Previous Keyword Search Results", open=False): prev_search = gr.Markdown( value="No previous search results.", # Initial Value elem_id="prev_search" ) # Show Status status = gr.Markdown("Analysis is in progress...") # Update Button with gr.Row(): update_button = gr.Button( "Refresh", variant="secondary", size="lg", elem_id="update_button" ) auto_update_button = gr.Button( "Auto Update", variant="secondary", size="lg", elem_id="auto_update_button" ) # Add CSS Style for Analysis Page gr.Markdown(""" """) def on_aws_select(content_name, video_file): """AWS Lecture Selection Processing""" party.value = content_name video_path = str(DATA_DIR + f"/{video_file}") container_visible.value = False selection_visible.value = True return start_analysis(content_name, video_path) def trigger_update(): """Increase Update Trigger""" update_trigger.value += 1 return update_trigger.value def update_content(trigger): """Update Content""" if not analysis_running: return ( "Analysis has not started.", "No analysis results.", "No search results.", "No previous analysis results.", "No previous search results.", trigger ) result = get_current_content() if result is None: return ( transcript.value, analysis.value, search.value, prev_analysis.value, prev_search.value, trigger ) return (*result, trigger) def toggle_auto_update(): """Toggle Auto Update""" auto_update.value = not auto_update.value if auto_update.value: # Start Auto Update - Increase Trigger trigger_update() return "Auto Update has started. It will be updated every 10 seconds.", gr.Timer(active=True) else: return "Auto Update has stopped.", gr.Timer(active=False) aws_button_1.click( fn=lambda: on_aws_select("Agents for Amazon Bedrock", "summit_sungwoo.mp4"), outputs=[title, video] ) aws_button_2.click( fn=lambda: on_aws_select("Bundesliga Fan Experience", "aws_bundesliga.mp4"), outputs=[title, video] ) aws_button_3.click( fn=lambda: on_aws_select("AWS_2024_recap", "aws.mp4"), outputs=[title, video] ) # Update Button Click Event update_button.click( fn=trigger_update, outputs=[update_trigger] ) # Auto Update Button Click Event auto_update_button.click( fn=toggle_auto_update, outputs=[status, timer] ) # Timer Tick Event timer.tick( fn=lambda: trigger_update() if auto_update.value else None, outputs=[update_trigger] ) # Update Trigger Change Event update_trigger.change( fn=update_content, inputs=[update_trigger], outputs=[transcript, analysis, search, prev_analysis, prev_search, update_trigger] ) # Initial Load - Set Update Trigger demo.load( fn=lambda: (update_trigger.value + 1), outputs=[update_trigger] ) return demo if __name__ == "__main__": demo = create_ui() demo.queue() # Enable Queue demo.launch(share=True)