# main.py # 主入口文件,负责启动 Gradio UI import gradio as gr from config import SCENE_CONFIGS, MODEL_CHOICES, MODE_CHOICES from backend_api import submit_to_backend, get_task_status, get_task_result from logging_utils import log_access, log_submission, is_request_allowed from simulation import stream_simulation_results, convert_to_h264, create_final_video_from_oss_images from ui_components import update_history_display, update_scene_display, update_log_display, get_scene_instruction from oss_utils import download_oss_file, get_user_tmp_dir, cleanup_user_tmp_dir, oss_file_exists, clean_oss_result_path import os from datetime import datetime SESSION_TASKS = {} def run_simulation(scene, model, mode, prompt, history, request: gr.Request): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") scene_desc = SCENE_CONFIGS.get(scene, {}).get("description", scene) user_ip = request.client.host if request else "unknown" session_id = request.session_hash if not is_request_allowed(user_ip): log_submission(scene, prompt, model, user_ip, "IP blocked temporarily") raise gr.Error("Too many requests from this IP. Please wait and try again one minute later.") # 提交任务到后端 submission_result = submit_to_backend(scene, prompt, mode, model, user_ip) if submission_result.get("status") != "pending": log_submission(scene, prompt, model, user_ip, "Submission failed") raise gr.Error(f"Submission failed: {submission_result.get('message', 'unknown issue')}") try: task_id = submission_result["task_id"] SESSION_TASKS[session_id] = task_id gr.Info(f"Simulation started, task_id: {task_id}") import time time.sleep(5) status = get_task_status(task_id) # OSS上的结果文件夹路径,不再检查本地路径是否存在 result_folder = clean_oss_result_path(status.get("result", f"gradio_demo/tasks/{task_id}"), task_id) except Exception as e: log_submission(scene, prompt, model, user_ip, str(e)) raise gr.Error(f"error occurred when parsing submission result from backend: {str(e)}") # 流式输出视频片段(从OSS读取) try: for video_path in stream_simulation_results(result_folder, task_id, request): if video_path: yield video_path, history except Exception as e: log_submission(scene, prompt, model, user_ip, str(e)) raise gr.Error(f"流式输出过程中出错: {str(e)}") # 获取最终任务状态 status = get_task_status(task_id) if status.get("status") == "completed": try: # 从OSS上的所有图片拼接成最终视频(6帧每秒) gr.Info("Creating final video from all OSS images...") video_path = create_final_video_from_oss_images(result_folder, task_id, request, fps=6) gr.Info(f"Final video created successfully with 6 fps!") except Exception as e: print(f"Error creating final video from OSS images: {e}") log_submission(scene, prompt, model, user_ip, f"Final video creation failed: {str(e)}") video_path = None new_entry = { "timestamp": timestamp, "scene": scene, "model": model, "mode": mode, "prompt": prompt, "video_path": video_path } updated_history = history + [new_entry] if len(updated_history) > 10: updated_history = updated_history[:10] log_submission(scene, prompt, model, user_ip, "success") gr.Info("Simulation completed successfully!") yield None, updated_history elif status.get("status") == "failed": log_submission(scene, prompt, model, user_ip, status.get('result', 'backend error')) raise gr.Error(f"任务执行失败: {status.get('result', 'backend 未知错误')}") yield None, history elif status.get("status") == "terminated": log_submission(scene, prompt, model, user_ip, "terminated") # 对于终止的任务,不再检查本地文件 yield None, history else: log_submission(scene, prompt, model, user_ip, "missing task's status from backend") raise gr.Error("missing task's status from backend") yield None, history def cleanup_session(request: gr.Request): session_id = request.session_hash task_id = SESSION_TASKS.pop(session_id, None) from config import BACKEND_URL import requests if task_id: try: requests.post(f"{BACKEND_URL}/predict/terminate/{task_id}", timeout=3) except Exception: pass # 清理用户临时目录 cleanup_user_tmp_dir(session_id) def record_access(request: gr.Request): user_ip = request.client.host if request else "unknown" user_agent = request.headers.get("user-agent", "unknown") log_access(user_ip, user_agent) return update_log_display() custom_css = """ #simulation-panel { border-radius: 8px; padding: 20px; background: #f9f9f9; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } #result-panel { border-radius: 8px; padding: 20px; background: #f0f8ff; } .dark #simulation-panel { background: #2a2a2a; } .dark #result-panel { background: #1a2a3a; } .history-container { max-height: 600px; overflow-y: auto; margin-top: 20px; } .history-accordion { margin-bottom: 10px; } """ header_html = """
""" with gr.Blocks(title="InternNav Model Inference Demo", css=custom_css) as demo: gr.HTML(header_html) history_state = gr.State([]) with gr.Row(): with gr.Column(elem_id="simulation-panel"): gr.Markdown("### Simulation Settings") scene_dropdown = gr.Dropdown( label="Choose a scene", choices=list(SCENE_CONFIGS.keys()), value="demo1", interactive=True ) scene_description = gr.Markdown("") scene_preview = gr.Image( label="Scene Preview", elem_classes=["scene-preview"], interactive=False ) prompt_input = gr.Textbox( label="Navigation Prompt", value="Walk past the left side of the bed and stop in the doorway.", placeholder="e.g.: 'Walk past the left side of the bed and stop in the doorway.'", lines=2, max_lines=4 ) model_dropdown = gr.Dropdown( label="Chose a pretrained model", choices=MODEL_CHOICES, value=MODEL_CHOICES[0], interactive=True ) mode_dropdown = gr.Dropdown( label="Select Mode", choices=MODE_CHOICES, value=MODE_CHOICES[0], interactive=True ) scene_dropdown.change( fn=lambda scene: [update_scene_display(scene)[0], update_scene_display(scene)[1], get_scene_instruction(scene)], inputs=scene_dropdown, outputs=[scene_description, scene_preview, prompt_input] ) submit_btn = gr.Button("Start Navigation Simulation", variant="primary") with gr.Column(elem_id="result-panel"): gr.Markdown("### Latest Simulation Result") video_output = gr.Video( label="Live", interactive=False, format="mp4", autoplay=True, streaming=True ) with gr.Column() as history_container: gr.Markdown("### History") gr.Markdown("#### History will be reset after refresh") history_slots = [] for i in range(10): with gr.Column(visible=False) as slot: with gr.Accordion(visible=False, open=False) as accordion: video = gr.Video(interactive=False) detail_md = gr.Markdown() history_slots.append((slot, accordion, video, detail_md)) gr.Examples( examples=[ ["demo1", "rdp", "vlnPE", "Walk past the left side of the bed and stop in the doorway."], ["demo2", "rdp", "vlnPE", "Walk through the bathroom, past the sink and toilet. Stop in front of the counter with the two suitcase."], ["demo3", "rdp", "vlnPE", "Do a U-turn. Walk forward through the kitchen, heading to the black door. Walk out of the door and take a right onto the deck. Walk out on to the deck and stop."], ["demo4", "rdp", "vlnPE", "Walk out of bathroom and stand on white bath mat."], ["demo5", "rdp", "vlnPE", "Walk straight through the double wood doors, follow the red carpet straight to the next doorway and stop where the carpet splits off."] ], inputs=[scene_dropdown, model_dropdown, mode_dropdown, prompt_input], label="Navigation Task Examples" ) submit_btn.click( fn=run_simulation, inputs=[scene_dropdown, model_dropdown, mode_dropdown, prompt_input, history_state], outputs=[video_output, history_state], queue=True, api_name="run_simulation" ).then( fn=update_history_display, inputs=history_state, outputs=[comp for slot in history_slots for comp in slot], queue=True ) demo.load( fn=lambda: update_scene_display("demo1"), outputs=[scene_description, scene_preview] ) demo.load( fn=record_access, inputs=None, outputs=None, queue=False ) demo.queue(default_concurrency_limit=8) demo.unload(fn=cleanup_session) if __name__ == "__main__": demo.launch( server_name="0.0.0.0", server_port=7860, # Hugging Face Space默认端口 share=False, debug=False, # 生产环境建议关闭debug allowed_paths=["./assets", "./logs", "./tmp"] # 添加临时目录到允许路径 )