|
|
|
|
|
import os
|
|
import time
|
|
import uuid
|
|
import cv2
|
|
import numpy as np
|
|
from typing import List
|
|
import gradio as gr
|
|
from backend_api import get_task_status
|
|
from oss_utils import list_oss_files, download_oss_file, get_user_tmp_dir, test_oss_access, clean_oss_result_path
|
|
|
|
def stream_simulation_results(result_folder: str, task_id: str, request: gr.Request, fps: int = 6):
|
|
"""
|
|
流式输出仿真结果,从OSS读取图片
|
|
|
|
参数:
|
|
result_folder: OSS上包含生成图片的文件夹路径
|
|
task_id: 后端任务ID用于状态查询
|
|
request: Gradio请求对象
|
|
fps: 输出视频的帧率
|
|
|
|
生成:
|
|
生成的视频文件路径 (分段输出)
|
|
"""
|
|
|
|
image_folder = os.path.join(result_folder, "images")
|
|
frame_buffer: List[np.ndarray] = []
|
|
frames_per_segment = fps * 2
|
|
processed_files = set()
|
|
width, height = 0, 0
|
|
last_status_check = 0
|
|
status_check_interval = 5
|
|
max_time = 240
|
|
|
|
|
|
user_dir = get_user_tmp_dir(request.session_hash)
|
|
local_image_dir = os.path.join(user_dir, task_id, "images")
|
|
os.makedirs(local_image_dir, exist_ok=True)
|
|
|
|
while max_time > 0:
|
|
max_time -= 1
|
|
current_time = time.time()
|
|
|
|
|
|
if current_time - last_status_check > status_check_interval:
|
|
status = get_task_status(task_id)
|
|
print(f"Session {request.session_hash}, status: {status}")
|
|
if status.get("status") == "completed":
|
|
|
|
process_remaining_oss_images(image_folder, local_image_dir, processed_files, frame_buffer)
|
|
if frame_buffer:
|
|
yield create_video_segment(frame_buffer, fps, width, height, request)
|
|
break
|
|
elif status.get("status") == "failed":
|
|
raise gr.Error(f"任务执行失败: {status.get('result', '未知错误')}")
|
|
elif status.get("status") == "terminated":
|
|
break
|
|
last_status_check = current_time
|
|
|
|
|
|
try:
|
|
oss_files = list_oss_files(image_folder)
|
|
new_files = [f for f in oss_files if f not in processed_files]
|
|
has_new_frames = False
|
|
|
|
for oss_path in new_files:
|
|
try:
|
|
|
|
filename = os.path.basename(oss_path)
|
|
local_path = os.path.join(local_image_dir, filename)
|
|
download_oss_file(oss_path, local_path)
|
|
|
|
|
|
frame = cv2.imread(local_path)
|
|
if frame is not None:
|
|
if width == 0:
|
|
height, width = frame.shape[:2]
|
|
|
|
frame_buffer.append(frame)
|
|
processed_files.add(oss_path)
|
|
has_new_frames = True
|
|
except Exception as e:
|
|
print(f"Error processing {oss_path}: {e}")
|
|
|
|
|
|
if has_new_frames and len(frame_buffer) >= frames_per_segment:
|
|
segment_frames = frame_buffer[:frames_per_segment]
|
|
frame_buffer = frame_buffer[frames_per_segment:]
|
|
yield create_video_segment(segment_frames, fps, width, height, request)
|
|
|
|
except Exception as e:
|
|
print(f"Error accessing OSS: {e}")
|
|
|
|
time.sleep(1)
|
|
|
|
if max_time <= 0:
|
|
raise gr.Error("timeout 240s")
|
|
|
|
def create_video_segment(frames: List[np.ndarray], fps: int, width: int, height: int, request: gr.Request) -> str:
|
|
"""创建视频片段"""
|
|
user_dir = get_user_tmp_dir(request.session_hash)
|
|
video_chunk_dir = os.path.join(user_dir, "video_chunks")
|
|
os.makedirs(video_chunk_dir, exist_ok=True)
|
|
|
|
segment_name = os.path.join(video_chunk_dir, f"output_{uuid.uuid4()}.mp4")
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
|
out = cv2.VideoWriter(segment_name, fourcc, fps, (width, height))
|
|
|
|
for frame in frames:
|
|
out.write(frame)
|
|
out.release()
|
|
|
|
return segment_name
|
|
|
|
def process_remaining_oss_images(oss_folder: str, local_dir: str, processed_files: set, frame_buffer: List[np.ndarray]):
|
|
"""处理OSS上剩余的图片"""
|
|
try:
|
|
oss_files = list_oss_files(oss_folder)
|
|
new_files = [f for f in oss_files if f not in processed_files]
|
|
|
|
for oss_path in new_files:
|
|
try:
|
|
|
|
filename = os.path.basename(oss_path)
|
|
local_path = os.path.join(local_dir, filename)
|
|
download_oss_file(oss_path, local_path)
|
|
|
|
|
|
frame = cv2.imread(local_path)
|
|
if frame is not None:
|
|
frame_buffer.append(frame)
|
|
processed_files.add(oss_path)
|
|
except Exception as e:
|
|
print(f"Error processing remaining {oss_path}: {e}")
|
|
except Exception as e:
|
|
print(f"Error accessing OSS for remaining files: {e}")
|
|
|
|
def convert_to_h264(video_path):
|
|
"""
|
|
将视频转换为 H.264 编码的 MP4 格式
|
|
生成新文件路径在原路径基础上添加 _h264 后缀
|
|
"""
|
|
import shutil
|
|
import subprocess
|
|
|
|
base, ext = os.path.splitext(video_path)
|
|
video_path_h264 = f"{base}_h264.mp4"
|
|
|
|
|
|
ffmpeg_bin = shutil.which("ffmpeg")
|
|
if ffmpeg_bin is None:
|
|
|
|
possible_paths = [
|
|
"/root/anaconda3/envs/gradio/bin/ffmpeg",
|
|
"/usr/bin/ffmpeg",
|
|
"/usr/local/bin/ffmpeg"
|
|
]
|
|
for path in possible_paths:
|
|
if os.path.exists(path):
|
|
ffmpeg_bin = path
|
|
break
|
|
|
|
if ffmpeg_bin is None:
|
|
raise RuntimeError("❌ 找不到 ffmpeg,请确保其已安装并在 PATH 中")
|
|
|
|
ffmpeg_cmd = [
|
|
ffmpeg_bin,
|
|
"-i", video_path,
|
|
"-c:v", "libx264",
|
|
"-preset", "slow",
|
|
"-crf", "23",
|
|
"-c:a", "aac",
|
|
"-movflags", "+faststart",
|
|
video_path_h264
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(ffmpeg_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
if not os.path.exists(video_path_h264):
|
|
raise FileNotFoundError(f"⚠️ H.264 文件未生成: {video_path_h264}")
|
|
return video_path_h264
|
|
except subprocess.CalledProcessError as e:
|
|
raise gr.Error(f"FFmpeg 转换失败: {e.stderr}")
|
|
except Exception as e:
|
|
raise gr.Error(f"转换过程中发生错误: {str(e)}")
|
|
|
|
def create_final_video_from_oss_images(result_folder: str, task_id: str, request: gr.Request, fps: int = 6) -> str:
|
|
"""
|
|
从OSS上的所有图片拼接成最终完整视频
|
|
|
|
参数:
|
|
result_folder: OSS上的结果文件夹路径
|
|
task_id: 任务ID
|
|
request: Gradio请求对象
|
|
fps: 视频帧率 (6帧每秒)
|
|
|
|
返回:
|
|
最终视频文件路径
|
|
"""
|
|
|
|
cleaned_result_folder = clean_oss_result_path(result_folder, task_id)
|
|
|
|
|
|
image_folder = os.path.join(cleaned_result_folder, "images").replace('\\', '/')
|
|
user_dir = get_user_tmp_dir(request.session_hash)
|
|
local_image_dir = os.path.join(user_dir, task_id, "final_images")
|
|
os.makedirs(local_image_dir, exist_ok=True)
|
|
|
|
|
|
print(f"🔍 DEBUG: original result_folder = {result_folder}")
|
|
print(f"🔍 DEBUG: cleaned_result_folder = {cleaned_result_folder}")
|
|
print(f"🔍 DEBUG: task_id = {task_id}")
|
|
print(f"🔍 DEBUG: image_folder = {image_folder}")
|
|
print(f"🔍 DEBUG: user_dir = {user_dir}")
|
|
print(f"🔍 DEBUG: local_image_dir = {local_image_dir}")
|
|
|
|
try:
|
|
|
|
print(f"🔍 DEBUG: Testing OSS access for task {task_id}")
|
|
test_oss_access(task_id)
|
|
|
|
|
|
print(f"🔍 DEBUG: Listing files in OSS folder: {image_folder}")
|
|
oss_files = list_oss_files(image_folder)
|
|
print(f"🔍 DEBUG: Total files found in OSS: {len(oss_files)}")
|
|
print(f"🔍 DEBUG: First 10 files: {oss_files[:10]}")
|
|
|
|
image_files = [f for f in oss_files if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
|
|
print(f"🔍 DEBUG: Image files found: {len(image_files)}")
|
|
print(f"🔍 DEBUG: First 5 image files: {image_files[:5]}")
|
|
|
|
if not image_files:
|
|
|
|
print(f"🔍 DEBUG: No images in {image_folder}, trying {cleaned_result_folder}")
|
|
oss_files_direct = list_oss_files(cleaned_result_folder)
|
|
print(f"🔍 DEBUG: Files in result_folder: {len(oss_files_direct)}")
|
|
print(f"🔍 DEBUG: Direct files: {oss_files_direct[:10]}")
|
|
|
|
|
|
all_image_files = [f for f in oss_files_direct if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
|
|
print(f"🔍 DEBUG: All image files in result_folder: {len(all_image_files)}")
|
|
|
|
|
|
alternative_paths = [
|
|
f"gradio_demo/tasks/{task_id}/images",
|
|
f"gradio_demo/tasks/{task_id}",
|
|
f"tasks/{task_id}/images",
|
|
f"tasks/{task_id}",
|
|
|
|
f"gradio_demo/tasks/{task_id}/image",
|
|
f"gradio_demo/tasks/{task_id}/imgs",
|
|
]
|
|
|
|
for alt_path in alternative_paths:
|
|
print(f"🔍 DEBUG: Trying alternative path: {alt_path}")
|
|
alt_files = list_oss_files(alt_path)
|
|
alt_images = [f for f in alt_files if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
|
|
print(f"🔍 DEBUG: Found {len(alt_images)} images in {alt_path}")
|
|
if alt_images:
|
|
all_image_files = alt_images
|
|
print(f"🔍 DEBUG: Using images from alternative path: {alt_path}")
|
|
break
|
|
|
|
if all_image_files:
|
|
image_files = all_image_files
|
|
print(f"🔍 DEBUG: Using image files from alternative search: {len(image_files)}")
|
|
else:
|
|
raise gr.Error("No images found in OSS for final video creation")
|
|
|
|
|
|
image_files.sort(key=lambda x: os.path.splitext(os.path.basename(x))[0])
|
|
print(f"🔍 DEBUG: Sorted image files (first 5): {[os.path.basename(f) for f in image_files[:5]]}")
|
|
|
|
|
|
frames = []
|
|
width, height = 0, 0
|
|
|
|
print(f"🔍 DEBUG: Starting to download {len(image_files)} images...")
|
|
for i, oss_path in enumerate(image_files):
|
|
try:
|
|
filename = os.path.basename(oss_path)
|
|
local_path = os.path.join(local_image_dir, filename)
|
|
download_oss_file(oss_path, local_path)
|
|
|
|
|
|
frame = cv2.imread(local_path)
|
|
if frame is not None:
|
|
if width == 0:
|
|
height, width = frame.shape[:2]
|
|
print(f"🔍 DEBUG: Image dimensions: {width}x{height}")
|
|
frames.append(frame)
|
|
|
|
if (i + 1) % 10 == 0:
|
|
print(f"🔍 DEBUG: Downloaded {i + 1}/{len(image_files)} images")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error processing {oss_path}: {e}")
|
|
continue
|
|
|
|
print(f"🔍 DEBUG: Successfully loaded {len(frames)} frames")
|
|
|
|
if not frames:
|
|
raise gr.Error("No valid images could be processed for final video")
|
|
|
|
|
|
final_video_dir = os.path.join(user_dir, task_id, "final")
|
|
os.makedirs(final_video_dir, exist_ok=True)
|
|
final_video_path = os.path.join(final_video_dir, "final_video.mp4")
|
|
|
|
print(f"🔍 DEBUG: Creating video at: {final_video_path}")
|
|
print(f"🔍 DEBUG: Video parameters: {len(frames)} frames, {fps} fps, {width}x{height}")
|
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
|
out = cv2.VideoWriter(final_video_path, fourcc, fps, (width, height))
|
|
|
|
for frame in frames:
|
|
out.write(frame)
|
|
out.release()
|
|
|
|
|
|
print(f"🔍 DEBUG: Converting to H.264...")
|
|
h264_video_path = convert_to_h264(final_video_path)
|
|
|
|
print(f"✅ Final video created: {h264_video_path} with {len(frames)} frames at {fps} fps")
|
|
return h264_video_path
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error creating final video from OSS images: {e}")
|
|
print(f"❌ Exception type: {type(e).__name__}")
|
|
import traceback
|
|
print(f"❌ Traceback: {traceback.format_exc()}")
|
|
raise gr.Error(f"Failed to create final video: {str(e)}")
|
|
|