# coding: utf-8 import argparse import glob import os import shutil from pathlib import Path import tempfile import gradio as gr from cloud_task_executor import CloudTaskExecutor from elevenlabs_helper import ElevenLabsHelper from common_util import CommonUtil # --- talk_key = "talk" valid_talking_expressions = [ f"{talk_key}-head", f"{talk_key}-neutral", ] valid_nontalking_expressions = [ "smile", "approve", "disapprove", "confused", "sad", "surprised", ] # --- def get_sorted_filenames_in_dir(dir_path: str, ext: str = ".jpg", throw_if_empty: bool = True) -> list: """Return the sorted filenames in the spedified directory.""" p = Path(dir_path) if not p.exists() and not p.is_dir(): raise RuntimeError(f"The path: {dir_path} does not exist") if not os.listdir(dir_path): message = f"The path: {dir_path} is empty" if throw_if_empty: raise RuntimeError(message) else: return [] search_string = str(dir_path) + "/*" + ext return sorted(glob.glob(search_string)) # --- # Core constants tmp_dir = "/tmp/gradio" data_dir = "./data" male_key = "male" female_key = "female" unknown_key = "unknown" media_height = 512 # Global variables temp_video_files = set() # Male/Female female_terms = ["Female", "Lady", "Woman"] male_terms = ["Male", "Lad", "Man"] # Elevenlabs Voices # all_voices = ElevenLabsHelper.get_voices() voices_ = [voice for voice in all_voices.voices if len(voice.name.split(" ")) < 2 and len(voice.name) < 10] female_voice_names = ElevenLabsHelper.select_voices(voices_, labels={"gender": female_key, "age": "young"}) male_voice_names = ElevenLabsHelper.select_voices(voices_, labels={"gender": male_key, "age": "young"}) male_voice_names.remove("Priya") voices = { female_key: female_voice_names, male_key: male_voice_names, unknown_key: female_voice_names + male_voice_names, } # Examples # Base Images example_base_image_dir = os.path.join(data_dir, "input_image_bases") example_base_images = { female_key: get_sorted_filenames_in_dir(os.path.join(example_base_image_dir, female_key), ext=".jpg"), male_key: get_sorted_filenames_in_dir(os.path.join(example_base_image_dir, male_key), ext=".jpg"), } # Base Videos example_base_video_dir = os.path.join(data_dir, "input_video_bases") example_source_videos = { female_key: get_sorted_filenames_in_dir(os.path.join(example_base_video_dir, female_key), ext=".mp4"), male_key: get_sorted_filenames_in_dir(os.path.join(example_base_video_dir, male_key), ext=".mp4"), } # Driving Audio example_driving_audio_dir = os.path.join(data_dir, "input_audio/gradio") example_driving_audios_male = get_sorted_filenames_in_dir(os.path.join(example_driving_audio_dir, male_key), ext=".mp3") example_driving_audios_female = get_sorted_filenames_in_dir( os.path.join(example_driving_audio_dir, female_key), ext=".mp3" ) example_driving_audios = {female_key: example_driving_audios_female, male_key: example_driving_audios_male} def get_audio_dropdown_choices(audio_paths, base_dir): return [ (path.replace(base_dir, "").lstrip("/"), path) for path in audio_paths ] example_driving_audio_base_dir = os.path.join("./data/input_audio/gradio/") example_driving_audio_dropdown_choices = ( get_audio_dropdown_choices(example_driving_audios[female_key], example_driving_audio_base_dir) + get_audio_dropdown_choices(example_driving_audios[male_key], example_driving_audio_base_dir) ) example_driving_audio_texts = [ "The 2026 World Cup final match is in New York.", "Enhance efficiency and cut costs with AI.", "A bee's wings beat more than 200 times per second.", "2026년 월드컵 결승전은 뉴욕에서 열립니다.", "AI로 효율성을 높이고 비용을 절감하세요.", "벌은 초당 200회 이상의 날개짓을 합니다.", "2026 विश्व कप फाइनल मैच न्यूयॉर्क में होगा।", "AI के साथ दक्षता बढ़ाएं और लागत कम करें।", "मधुमक्खी के पंख सेकंड में 200 बार से अधिक फड़फड़ाते हैं।", "Welcome to our kiosk, where you can easily purchase tickets, or access various services by simply tapping the display!", "Catch all the drama, emotion, and energy in my new film, now available on Netflix—it's a must-watch!", "This season of IPL is full of surprises, and I’d love to see you supporting us as we fight for victory on the ground.", "Transform your health with our latest fitness programs! Join us today and take the first step toward a stronger, energized you.", "A big black bug bit a big black dog on his big black nose.", "Fuzzy Wuzzy was a bear. Fuzzy Wuzzy had no hair. Fuzzy Wuzzy wasn't very fuzzy, was he?", ] example_showcase_dir = os.path.join(data_dir, "showcase_examples") examples_showcase = { "make_image_talk_multilingual": get_sorted_filenames_in_dir( os.path.join(example_showcase_dir, "make_image_talk_multilingual"), ext=".mp4" ), "make_image_talk_cartoon": get_sorted_filenames_in_dir( os.path.join(example_showcase_dir, "make_image_talk_cartoon"), ext=".mp4" ), "make_image_talk_diff_angles": get_sorted_filenames_in_dir( os.path.join(example_showcase_dir, "make_image_talk_diff_angles"), ext=".mp4" ), "make_image_talk_hb": get_sorted_filenames_in_dir( os.path.join(example_showcase_dir, "make_image_talk_hb"), ext=".mp4" ), "make_video_talk_multilingual": get_sorted_filenames_in_dir( os.path.join(example_showcase_dir, "make_video_talk_multilingual"), ext=".mp4" ), "make_video_talk_corp_msg": get_sorted_filenames_in_dir( os.path.join(example_showcase_dir, "make_video_talk_corp_msg"), ext=".mp4" ), "make_video_talk_rap_multii": get_sorted_filenames_in_dir( os.path.join(example_showcase_dir, "make_video_talk_rap_multii"), ext=".mp4" ), "dubbing_superpowerman": get_sorted_filenames_in_dir(os.path.join(example_showcase_dir, "dubbing_superpowerman"), ext=".mp4"), "make_image_talk_selfie": get_sorted_filenames_in_dir(os.path.join(example_showcase_dir, "make_image_talk_selfie"), ext=".mp4"), "dubbing_coffee": get_sorted_filenames_in_dir(os.path.join(example_showcase_dir, "dubbing_coffee"), ext=".mp4"), } def update_voices(media_path): def get_category(media_path): if media_path: for fterm in female_terms: if fterm in media_path or fterm.lower() in media_path: return female_key for mterm in male_terms: if mterm in media_path or mterm.lower() in media_path: return male_key return unknown_key category = get_category(media_path) driving_input_voice = gr.Dropdown( choices=voices[category], value=voices[category][0], interactive=True, ) return driving_input_voice def update_audio_tabs_visibility(motion_type): if motion_type == "talking": return gr.update(visible=True), gr.update(visible=True) else: return gr.update(visible=False), gr.update(visible=False) def task_executor_fn( input_base_path, base_motion_expression, input_driving_audio_path, driving_text_input, driving_voice_input ): return task_executor.execute_task( input_base_path, base_motion_expression, input_driving_audio_path, driving_text_input, driving_voice_input ) def check_and_convert_audio_sample_rate(audio_path): if not audio_path: return None try: _, _, is_audio, _, _, duration, sample_rate = CommonUtil.get_media_properties(audio_path) if not is_audio: raise gr.Error("⚠️ Not an audio file. Please upload a valid audio file (.mp3, .wav)") if not CommonUtil.check_sample_rate(sample_rate): try: temp_dir = tempfile.mkdtemp() base_name = os.path.splitext(os.path.basename(audio_path))[0] converted_path = os.path.join(temp_dir, f"{base_name}_22050Hz.mp3") CommonUtil.change_audio_sample_rate(audio_path, converted_path, target_sample_rate=22050) shutil.move(converted_path, audio_path) os.rmdir(temp_dir) except Exception as e: print(f"Error changing audio sample rate: {e}") raise gr.Error(f"Error changing audio sample rate: {str(e)}") if not CommonUtil.check_duration(duration): min_duration = CommonUtil.valid_min_media_duration max_duration = CommonUtil.valid_max_media_duration raise gr.Error(f"⚠️ Audio duration must be between {min_duration}-{max_duration} seconds.\n\nCurrent duration: {duration}s") return audio_path except gr.Error: # Re-raise gr.Error to show notification raise except Exception as e: print(f"Error processing audio sample rate: {e}") raise gr.Error(f"Error processing audio: {str(e)}") def check_and_convert_video_fps(video_path): if not video_path: return None try: _, is_video, _, width, height, duration, fps = CommonUtil.get_media_properties(video_path) if not is_video: raise gr.Error("Not a video file") if not CommonUtil.check_dim(width, height): min_dim = CommonUtil.valid_min_media_dim max_dim = CommonUtil.valid_max_media_dim raise gr.Error(f"⚠️ Video dimensions must be between {min_dim}-{max_dim} pixels.\n\nCurrent size: {width}(w)x{height}(h)") if not CommonUtil.check_duration(duration): min_duration = CommonUtil.valid_min_media_duration max_duration = CommonUtil.valid_max_media_duration raise gr.Error(f"⚠️ Video duration must be between {min_duration}-{max_duration} seconds.\n\nCurrent duration: {duration}s") if CommonUtil.check_fps(fps): return video_path target_fps = CommonUtil.valid_video_fps print(f"Converting video from {fps}fps to {target_fps}fps: {video_path}") temp_dir = tempfile.mkdtemp() base_name = os.path.splitext(os.path.basename(video_path))[0] converted_path = os.path.join(temp_dir, f"{base_name}_{target_fps}fps.mp4") CommonUtil.change_video_fps(video_path, converted_path, fps=target_fps) temp_video_files.add(converted_path) return converted_path except gr.Error: # Re-raise gr.Error to show notification raise except Exception as e: print(f"Error processing video FPS: {e}") raise gr.Error(f"Error processing video: {str(e)}") def check_and_validate_image(image_path): """Check and validate image properties""" if not image_path: return None try: is_image, _, _, width, height, _, _ = CommonUtil.get_media_properties(image_path) if not is_image: raise gr.Error("⚠️ Not an image file. Please upload a valid image file.") if not CommonUtil.check_dim(width, height): min_dim = CommonUtil.valid_min_media_dim max_dim = CommonUtil.valid_max_media_dim raise gr.Error(f"⚠️ Image dimensions must be between {min_dim}-{max_dim} pixels.\n\nCurrent size: {width}(w)x{height}(h)") return image_path except gr.Error: # Re-raise gr.Error to show notification raise except Exception as e: print(f"Error validating image: {e}") raise gr.Error(f"❌ Error processing image: {str(e)}") def process_video_input(video_path): if not video_path: return None converted_path = check_and_convert_video_fps(video_path) print(f"Video processing result: {converted_path}") return converted_path def process_audio_input(audio_path): if not audio_path: return None validated_audio = check_and_convert_audio_sample_rate(audio_path) return validated_audio def cleanup_temp_video_files(): global temp_video_files for temp_file in temp_video_files: try: if os.path.exists(temp_file): os.remove(temp_file) print(f"Cleaned up temporary file: {temp_file}") except Exception as e: print(f"Error cleaning up {temp_file}: {e}") # Clear the set temp_video_files.clear() with gr.Blocks(theme=gr.themes.Soft(font=[gr.themes.GoogleFont("Plus Jakarta Sans")])) as demo_image: with gr.Row(): # Step 1: Choose Image with gr.Column(scale=4): gr.Markdown("### STEP 1 - SELECT IMAGE") base_image_input = gr.Image(label="IMAGE", type="filepath", sources="upload", height=media_height, interactive=True) gr.Examples( examples=[[example] for example in example_base_images[female_key]], inputs=[base_image_input], fn=lambda x: x, outputs=[base_image_input], cache_examples=False, label="Female", ) gr.Examples( examples=[[example] for example in example_base_images[male_key]], inputs=[base_image_input], fn=lambda x: x, outputs=[base_image_input], cache_examples=False, label="Male", ) # Step 2: Motion and Audio/TTS with gr.Column(scale=4): gr.Markdown("### STEP 2 - SELECT MOTION & AUDIO") base_motion_expression = gr.Radio( choices=valid_talking_expressions, value=valid_talking_expressions[0], visible=False, ) with gr.Tabs(): with gr.TabItem("TALKING MOTION") as tab_talking_motion: base_talking_expression = gr.Radio( choices=valid_talking_expressions, label="STEP 2.1 - TALKING MOTION", value=valid_talking_expressions[0], ) with gr.TabItem("EXPRESSION MOTION") as tab_expression_motion: base_expression_expression = gr.Radio( choices=valid_nontalking_expressions, label="STEP 2 - EXPRESSION MOTION", value=None, ) with gr.Tabs(): with gr.TabItem("DRIVING AUDIO: FILE") as tab_audio_file: driving_audio_input = gr.File(label="STEP 2.2 - AUDIO FILE", file_types=[".mp3", ".wav"], type="filepath", height=287) example_driving_audio_dropdown = gr.Dropdown( choices=example_driving_audio_dropdown_choices, value=None, label="OR SELECT FROM EXAMPLES", interactive=True, allow_custom_value=False ) def update_audio_input(selected_audio): return selected_audio if selected_audio else None def process_audio_and_update(audio_path): processed_audio = process_audio_input(audio_path) return processed_audio example_driving_audio_dropdown.change( fn=update_audio_input, inputs=[example_driving_audio_dropdown], outputs=[driving_audio_input] ) driving_audio_input.change( fn=process_audio_and_update, inputs=[driving_audio_input], outputs=[driving_audio_input] ) with gr.TabItem("DRIVING AUDIO: TTS") as tab_audio_tts: driving_input_voice = gr.Dropdown( choices=voices[unknown_key], value=voices[unknown_key][0], label="STEP 2.2 - VOICE" ) driving_text_input = gr.Textbox( label="INPUT TEXT (300 characters max)", lines=2, ) example_text_dropdown = gr.Dropdown( choices=example_driving_audio_texts, value=None, label="OR SELECT FROM EXAMPLES", interactive=True, allow_custom_value=False ) def update_text_input(selected_text): return selected_text if selected_text else "" example_text_dropdown.change( fn=update_text_input, inputs=[example_text_dropdown], outputs=[driving_text_input] ) process_button_animation = gr.Button("🌟 Generate", variant="primary", elem_classes=["generate-button"]) # Step 3: Result with gr.Column(scale=4): gr.Markdown("### RESULT") output_video_i2v = gr.Video(autoplay=True, label="OUTPUT VIDEO", height=512, show_download_button=True) message = gr.Textbox(label="INFO", max_lines=8) process_button_reset = gr.ClearButton( [ base_image_input, base_motion_expression, base_talking_expression, base_expression_expression, driving_audio_input, driving_text_input, driving_input_voice, example_text_dropdown, example_driving_audio_dropdown, output_video_i2v, ], value="🧹 Clear", variant="secondary", ) def process_image_and_update_voices(image_path): validated_image = check_and_validate_image(image_path) voice_dropdown = update_voices(validated_image) return validated_image, voice_dropdown base_image_input.change( fn=process_image_and_update_voices, inputs=[base_image_input], outputs=[base_image_input, driving_input_voice] ) base_talking_expression.change( fn=lambda x: x, inputs=[base_talking_expression], outputs=[base_motion_expression], ) base_expression_expression.change( fn=lambda x: gr.update(value=x), inputs=[base_expression_expression], outputs=[base_motion_expression], ) def update_talking_tab(): audio_visibility = update_audio_tabs_visibility("talking") return audio_visibility[0], audio_visibility[1], gr.update(choices=valid_talking_expressions, value=valid_talking_expressions[0]) def update_expression_tab(): audio_visibility = update_audio_tabs_visibility("expression") return audio_visibility[1], audio_visibility[0], gr.update(choices=valid_nontalking_expressions, value=valid_nontalking_expressions[0]) tab_talking_motion.select( fn=update_talking_tab, inputs=[], outputs=[tab_audio_file, tab_audio_tts, base_motion_expression], ) tab_expression_motion.select( fn=update_expression_tab, inputs=[], outputs=[tab_audio_file, tab_audio_tts, base_motion_expression], ) process_button_animation.click( fn=task_executor_fn, inputs=[ base_image_input, base_motion_expression, driving_audio_input, driving_text_input, driving_input_voice, ], outputs=[output_video_i2v, output_video_i2v, message], show_progress=True, ) with gr.Blocks(theme=gr.themes.Soft(font=[gr.themes.GoogleFont("Plus Jakarta Sans")])) as demo_video: with gr.Row(): # Step 1: Choose Video with gr.Column(scale=4): gr.Markdown("### STEP 1 - SELECT VIDEO") base_video_input = gr.Video(label="VIDEO", sources="upload", height=media_height, interactive=True) gr.Examples( examples=[[example] for example in example_source_videos[female_key]], inputs=[base_video_input], fn=lambda x: x, outputs=[base_video_input], cache_examples=False, label="Female", elem_id="female-video-examples" ) gr.Examples( examples=[[example] for example in example_source_videos[male_key]], inputs=[base_video_input], fn=lambda x: x, outputs=[base_video_input], cache_examples=False, label="Male", elem_id="male-video-examples" ) # Step 2: Audio/TTS with gr.Column(scale=4): gr.Markdown("### STEP 2 - SELECT AUDIO") with gr.Tabs(): with gr.TabItem("DRIVING AUDIO: FILE") as tab_audio_file: driving_audio_input = gr.File(label="AUDIO", file_types=[".mp3", ".wav"], type="filepath", height=454) example_driving_audio_dropdown = gr.Dropdown( choices=example_driving_audio_dropdown_choices, value=None, label="OR SELECT FROM EXAMPLES", interactive=True, allow_custom_value=False ) def update_audio_input(selected_audio): return selected_audio if selected_audio else None def process_audio_and_update(audio_path): processed_audio = process_audio_input(audio_path) return processed_audio example_driving_audio_dropdown.change( fn=update_audio_input, inputs=[example_driving_audio_dropdown], outputs=[driving_audio_input] ) driving_audio_input.change( fn=process_audio_and_update, inputs=[driving_audio_input], outputs=[driving_audio_input] ) with gr.TabItem("DRIVING AUDIO: TTS") as tab_audio_tts: driving_input_voice = gr.Dropdown( choices=voices[unknown_key], value=voices[unknown_key][0], label="VOICE" ) driving_text_input = gr.Textbox( label="INPUT TEXT (300 characters max)", lines=5, ) example_text_dropdown = gr.Dropdown( choices=example_driving_audio_texts, value=None, label="OR SELECT FROM EXAMPLES", interactive=True, allow_custom_value=False ) def update_text_input(selected_text): return selected_text if selected_text else "" example_text_dropdown.change( fn=update_text_input, inputs=[example_text_dropdown], outputs=[driving_text_input] ) process_button_animation = gr.Button("🌟 Generate", variant="primary", elem_classes=["generate-button"]) # Step 3: Result with gr.Column(scale=4): gr.Markdown("### RESULT") output_video_i2v = gr.Video(autoplay=True, label="OUTPUT VIDEO", height=512, show_download_button=True) message = gr.Textbox(label="INFO", max_lines=8) process_button_reset = gr.Button("🧹 Clear", variant="secondary") def clear_all(): cleanup_temp_video_files() return None, None, None, None, None process_button_reset.click( fn=clear_all, inputs=[], outputs=[base_video_input, driving_audio_input, driving_text_input, driving_input_voice, output_video_i2v] ) def process_video_and_update_voices(video_path): processed_video = process_video_input(video_path) voice_dropdown = update_voices(processed_video) return processed_video, voice_dropdown base_video_input.change( fn=process_video_and_update_voices, inputs=[base_video_input], outputs=[base_video_input, driving_input_voice] ) base_motion_expression = gr.Radio(value=None, visible=False) process_button_animation.click( fn=task_executor_fn, inputs=[ base_video_input, base_motion_expression, driving_audio_input, driving_text_input, driving_input_voice, ], outputs=[output_video_i2v, output_video_i2v, message], show_progress=True, ) with gr.Blocks() as showcase_examples: gr.Markdown("# IMAGE TO AVATAR") with gr.Row(): with gr.Column(scale=7): for path in examples_showcase["make_image_talk_multilingual"]: gr.Video(value=path, label=os.path.basename(path), height=300) with gr.Column(scale=3): for path in examples_showcase["make_image_talk_cartoon"]: gr.Video(value=path, label=os.path.basename(path), height=616) with gr.Row(): with gr.Column(scale=7): for path in examples_showcase["make_image_talk_diff_angles"]: gr.Video(value=path, label=os.path.basename(path), height=350) with gr.Column(scale=3): for path in examples_showcase["make_image_talk_hb"]: gr.Video(value=path, label=os.path.basename(path), height=350) with gr.Row(): for path in examples_showcase['make_image_talk_selfie']: gr.Video(value=path, label=os.path.basename(path), height=430) gr.Markdown("# VIDEO TO AVATAR") with gr.Row(): with gr.Column(scale=7): for path in examples_showcase["make_video_talk_multilingual"]: gr.Video(value=path, label=os.path.basename(path), height=300) with gr.Column(scale=3): for path in examples_showcase["make_video_talk_corp_msg"]: gr.Video(value=path, label=os.path.basename(path), height=616) with gr.Row(): for path in examples_showcase["make_video_talk_rap_multii"]: gr.Video(value=path, label=os.path.basename(path), height=500) gr.Markdown("# VIDEO TO AVATAR: DUBBING") with gr.Row(): for path in examples_showcase["dubbing_superpowerman"]: gr.Video(value=path, label=os.path.basename(path), height=320) with gr.Row(): for path in examples_showcase["dubbing_coffee"]: gr.Video(value=path, label=os.path.basename(path), height=440) with gr.Blocks(analytics_enabled=False, css="footer{display:none !important} .generate-button{margin-top:-10px !important;} #female-video-examples .gallery *, #male-video-examples .gallery *{height:142.1px !important; min-height:142.1px !important; max-height:142.1px !important;} #female-video-examples .gallery img, #male-video-examples .gallery img, #female-video-examples .gallery video, #male-video-examples .gallery video{width:80px !important; height:142.1px !important; object-fit:cover !important; min-height:142.1px !important; max-height:142.1px !important;} #female-video-examples .gallery > div, #male-video-examples .gallery > div{width:80px !important; height:142.1px !important; min-height:142.1px !important; max-height:142.1px !important; margin:2px !important;} .logo-left{text-align:left !important; margin:0 !important; padding:0 !important; border:none !important; outline:none !important; box-shadow:none !important; min-height:auto !important; height:auto !important; overflow:visible !important;} .logo-left > div{text-align:left !important; margin:0 !important; padding:0 !important; overflow:visible !important;} .logo-left img{height:45px !important; min-height:45px !important; max-height:45px !important;} .logo-right{text-align:right !important; margin:0 !important; padding:0 !important; border:none !important; outline:none !important; box-shadow:none !important; min-height:auto !important; height:auto !important; overflow:visible !important; display:flex !important; justify-content:flex-end !important; align-items:center !important;} .logo-right > div{text-align:right !important; margin:0 !important; padding:0 !important; overflow:visible !important; width:100% !important; display:flex !important; justify-content:flex-end !important;} .logo-right img{height:70px !important; min-height:70px !important; max-height:70px !important;}", title="SUTRA Avatar v2") as demo: with gr.Row(): with gr.Column(scale=10): gr.HTML(value="", elem_classes=["logo-left"]) with gr.Column(scale=2): gr.HTML(value="", elem_classes=["logo-right"]) gr.TabbedInterface( interface_list=[demo_image, demo_video, showcase_examples], tab_names=["IMAGE to AVATAR", "VIDEO to AVATAR", "SHOWCASE"], ) if __name__ == "__main__": parser = argparse.ArgumentParser(description="SUTRA AVATAR CLIENT") args = parser.parse_args() task_executor = CloudTaskExecutor() demo.queue(default_concurrency_limit=10).launch( server_name="0.0.0.0", allowed_paths=["/"], )