aegwe4 / app /services /merger_video.py
chaowenguo's picture
Upload 121 files
3b13b0e verified
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
'''
@Project: NarratoAI
@File : merger_video
@Author : 小林同学
@Date : 2025/5/6 下午7:38
'''
import os
import shutil
import subprocess
from enum import Enum
from typing import List, Optional, Tuple
from loguru import logger
from app.utils import ffmpeg_utils
class VideoAspect(Enum):
"""视频宽高比枚举"""
landscape = "16:9" # 横屏 16:9
landscape_2 = "4:3"
portrait = "9:16" # 竖屏 9:16
portrait_2 = "3:4"
square = "1:1" # 方形 1:1
def to_resolution(self) -> Tuple[int, int]:
"""根据宽高比返回标准分辨率"""
if self == VideoAspect.portrait:
return 1080, 1920 # 竖屏 9:16
elif self == VideoAspect.portrait_2:
return 720, 1280 # 竖屏 4:3
elif self == VideoAspect.landscape:
return 1920, 1080 # 横屏 16:9
elif self == VideoAspect.landscape_2:
return 1280, 720 # 横屏 4:3
elif self == VideoAspect.square:
return 1080, 1080 # 方形 1:1
else:
return 1080, 1920 # 默认竖屏
def check_ffmpeg_installation() -> bool:
"""
检查ffmpeg是否已安装
Returns:
bool: 如果安装则返回True,否则返回False
"""
try:
subprocess.run(['ffmpeg', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True)
return True
except (subprocess.SubprocessError, FileNotFoundError):
logger.error("ffmpeg未安装或不在系统PATH中,请安装ffmpeg")
return False
def get_hardware_acceleration_option() -> Optional[str]:
"""
根据系统环境选择合适的硬件加速选项
Returns:
Optional[str]: 硬件加速参数,如果不支持则返回None
"""
# 使用集中式硬件加速检测
return ffmpeg_utils.get_ffmpeg_hwaccel_type()
def check_video_has_audio(video_path: str) -> bool:
"""
检查视频是否包含音频流
Args:
video_path: 视频文件路径
Returns:
bool: 如果视频包含音频流则返回True,否则返回False
"""
if not os.path.exists(video_path):
logger.warning(f"视频文件不存在: {video_path}")
return False
probe_cmd = [
'ffprobe', '-v', 'error',
'-select_streams', 'a:0',
'-show_entries', 'stream=codec_type',
'-of', 'csv=p=0',
video_path
]
try:
result = subprocess.run(probe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
return result.stdout.strip() == 'audio'
except Exception as e:
logger.warning(f"检测视频音频流时出错: {str(e)}")
return False
def create_ffmpeg_concat_file(video_paths: List[str], concat_file_path: str) -> str:
"""
创建ffmpeg合并所需的concat文件
Args:
video_paths: 需要合并的视频文件路径列表
concat_file_path: concat文件的输出路径
Returns:
str: concat文件的路径
"""
with open(concat_file_path, 'w', encoding='utf-8') as f:
for video_path in video_paths:
# 获取绝对路径
abs_path = os.path.abspath(video_path)
# 在Windows上将反斜杠替换为正斜杠
if os.name == 'nt': # Windows系统
abs_path = abs_path.replace('\\', '/')
else: # Unix/Mac系统
# 转义特殊字符
abs_path = abs_path.replace('\\', '\\\\').replace(':', '\\:')
# 处理路径中的单引号 (如果有)
abs_path = abs_path.replace("'", "\\'")
f.write(f"file '{abs_path}'\n")
return concat_file_path
def process_single_video(
input_path: str,
output_path: str,
target_width: int,
target_height: int,
keep_audio: bool = True,
hwaccel: Optional[str] = None
) -> str:
"""
处理单个视频:调整分辨率、帧率等
Args:
input_path: 输入视频路径
output_path: 输出视频路径
target_width: 目标宽度
target_height: 目标高度
keep_audio: 是否保留音频
hwaccel: 硬件加速选项
Returns:
str: 处理后的视频路径
"""
if not os.path.exists(input_path):
raise FileNotFoundError(f"找不到视频文件: {input_path}")
# 构建基本命令
command = ['ffmpeg', '-y']
# 安全检查:如果在Windows上,则慎用硬件加速
is_windows = os.name == 'nt'
if is_windows and hwaccel:
logger.info("在Windows系统上检测到硬件加速请求,将进行额外的兼容性检查")
try:
# 对视频进行快速探测,检测其基本信息
probe_cmd = [
'ffprobe', '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=codec_name,width,height',
'-of', 'csv=p=0',
input_path
]
result = subprocess.run(probe_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
# 如果探测成功,使用硬件加速;否则降级到软件编码
if result.returncode != 0:
logger.warning(f"视频探测失败,为安全起见,禁用硬件加速: {result.stderr}")
hwaccel = None
except Exception as e:
logger.warning(f"视频探测出错,禁用硬件加速: {str(e)}")
hwaccel = None
# 添加硬件加速参数(根据前面的安全检查可能已经被禁用)
if hwaccel:
try:
# 使用集中式硬件加速参数
hwaccel_args = ffmpeg_utils.get_ffmpeg_hwaccel_args()
command.extend(hwaccel_args)
except Exception as e:
logger.warning(f"应用硬件加速参数时出错: {str(e)},将使用软件编码")
# 重置命令,移除可能添加了一半的硬件加速参数
command = ['ffmpeg', '-y']
# 输入文件
command.extend(['-i', input_path])
# 处理音频
if not keep_audio:
command.extend(['-an']) # 移除音频
else:
# 检查输入视频是否有音频流
has_audio = check_video_has_audio(input_path)
if has_audio:
command.extend(['-c:a', 'aac', '-b:a', '128k']) # 音频编码为AAC
else:
logger.warning(f"视频 {input_path} 没有音频流,将会忽略音频设置")
command.extend(['-an']) # 没有音频流时移除音频设置
# 视频处理参数:缩放并添加填充以保持比例
scale_filter = f"scale={target_width}:{target_height}:force_original_aspect_ratio=decrease"
pad_filter = f"pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2"
command.extend([
'-vf', f"{scale_filter},{pad_filter}",
'-r', '30', # 设置帧率为30fps
])
# 选择编码器 - 考虑到Windows和特定硬件的兼容性
use_software_encoder = True
if hwaccel:
# 获取硬件加速类型和编码器信息
hwaccel_type = ffmpeg_utils.get_ffmpeg_hwaccel_type()
hwaccel_encoder = ffmpeg_utils.get_ffmpeg_hwaccel_encoder()
if hwaccel_type == 'cuda' or hwaccel_type == 'nvenc':
try:
# 检查NVENC编码器是否可用
encoders_cmd = subprocess.run(
["ffmpeg", "-hide_banner", "-encoders"],
stderr=subprocess.PIPE, stdout=subprocess.PIPE, text=True, check=False
)
if "h264_nvenc" in encoders_cmd.stdout.lower():
command.extend(['-c:v', 'h264_nvenc', '-preset', 'p4', '-profile:v', 'high'])
use_software_encoder = False
else:
logger.warning("NVENC编码器不可用,将使用软件编码")
except Exception as e:
logger.warning(f"NVENC编码器检测失败: {str(e)},将使用软件编码")
elif hwaccel_type == 'qsv':
command.extend(['-c:v', 'h264_qsv', '-preset', 'medium'])
use_software_encoder = False
elif hwaccel_type == 'videotoolbox': # macOS
command.extend(['-c:v', 'h264_videotoolbox', '-profile:v', 'high'])
use_software_encoder = False
elif hwaccel_type == 'vaapi': # Linux VA-API
command.extend(['-c:v', 'h264_vaapi', '-profile', '100'])
use_software_encoder = False
# 如果前面的条件未能应用硬件编码器,使用软件编码
if use_software_encoder:
logger.info("使用软件编码器(libx264)")
command.extend(['-c:v', 'libx264', '-preset', 'medium', '-profile:v', 'high'])
# 设置视频比特率和其他参数
command.extend([
'-b:v', '5M',
'-maxrate', '8M',
'-bufsize', '10M',
'-pix_fmt', 'yuv420p', # 兼容性更好的颜色格式
])
# 输出文件
command.append(output_path)
# 执行命令
try:
# logger.info(f"执行FFmpeg命令: {' '.join(command)}")
process = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info(f"视频处理成功: {output_path}")
return output_path
except subprocess.CalledProcessError as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
logger.error(f"处理视频失败: {error_msg}")
# 如果使用硬件加速失败,尝试使用软件编码
if hwaccel:
logger.info("尝试使用软件编码作为备选方案")
try:
# 构建新的命令,使用软件编码
fallback_cmd = ['ffmpeg', '-y', '-i', input_path]
# 保持原有的音频设置
if not keep_audio:
fallback_cmd.extend(['-an'])
else:
has_audio = check_video_has_audio(input_path)
if has_audio:
fallback_cmd.extend(['-c:a', 'aac', '-b:a', '128k'])
else:
fallback_cmd.extend(['-an'])
# 保持原有的视频过滤器
fallback_cmd.extend([
'-vf', f"{scale_filter},{pad_filter}",
'-r', '30',
'-c:v', 'libx264',
'-preset', 'medium',
'-profile:v', 'high',
'-b:v', '5M',
'-maxrate', '8M',
'-bufsize', '10M',
'-pix_fmt', 'yuv420p',
output_path
])
logger.info(f"执行备选FFmpeg命令: {' '.join(fallback_cmd)}")
subprocess.run(fallback_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info(f"使用软件编码成功处理视频: {output_path}")
return output_path
except subprocess.CalledProcessError as fallback_error:
fallback_error_msg = fallback_error.stderr.decode() if fallback_error.stderr else str(fallback_error)
logger.error(f"备选软件编码也失败: {fallback_error_msg}")
raise RuntimeError(f"无法处理视频 {input_path}: 硬件加速和软件编码都失败")
# 如果不是硬件加速导致的问题,或者备选方案也失败了,抛出原始错误
raise RuntimeError(f"处理视频失败: {error_msg}")
def combine_clip_videos(
output_video_path: str,
video_paths: List[str],
video_ost_list: List[int],
video_aspect: VideoAspect = VideoAspect.portrait,
threads: int = 4,
force_software_encoding: bool = False, # 新参数,强制使用软件编码
) -> str:
"""
合并子视频
Args:
output_video_path: 合并后的存储路径
video_paths: 子视频路径列表
video_ost_list: 原声播放列表 (0: 不保留原声, 1: 只保留原声, 2: 保留原声并保留解说)
video_aspect: 屏幕比例
threads: 线程数
force_software_encoding: 是否强制使用软件编码(忽略硬件加速检测)
Returns:
str: 合并后的视频路径
"""
# 检查ffmpeg是否安装
if not check_ffmpeg_installation():
raise RuntimeError("未找到ffmpeg,请先安装")
# 准备输出目录
output_dir = os.path.dirname(output_video_path)
os.makedirs(output_dir, exist_ok=True)
# 获取目标分辨率
aspect = VideoAspect(video_aspect)
video_width, video_height = aspect.to_resolution()
# 检测可用的硬件加速选项
hwaccel = None if force_software_encoding else get_hardware_acceleration_option()
if hwaccel:
logger.info(f"将使用 {hwaccel} 硬件加速")
elif force_software_encoding:
logger.info("已强制使用软件编码,跳过硬件加速检测")
else:
logger.info("未检测到兼容的硬件加速,将使用软件编码")
# Windows系统上,默认使用软件编码以提高兼容性
if os.name == 'nt' and hwaccel:
logger.warning("在Windows系统上检测到硬件加速,但为了提高兼容性,建议使用软件编码")
# 不强制禁用hwaccel,而是在process_single_video中进行额外安全检查
# 重组视频路径和原声设置为一个字典列表结构
video_segments = []
# 检查视频路径和原声设置列表长度是否匹配
if len(video_paths) != len(video_ost_list):
logger.warning(f"视频路径列表({len(video_paths)})和原声设置列表({len(video_ost_list)})长度不匹配")
# 调整长度以匹配较短的列表
min_length = min(len(video_paths), len(video_ost_list))
video_paths = video_paths[:min_length]
video_ost_list = video_ost_list[:min_length]
# 创建视频处理配置字典列表
for i, (video_path, video_ost) in enumerate(zip(video_paths, video_ost_list)):
if not os.path.exists(video_path):
logger.warning(f"视频不存在,跳过: {video_path}")
continue
# 检查是否有音频流
has_audio = check_video_has_audio(video_path)
# 构建视频片段配置
segment = {
"index": i,
"path": video_path,
"ost": video_ost,
"has_audio": has_audio,
"keep_audio": video_ost > 0 and has_audio # 只有当ost>0且实际有音频时才保留
}
# 记录日志
if video_ost > 0 and not has_audio:
logger.warning(f"视频 {video_path} 设置为保留原声(ost={video_ost}),但该视频没有音频流")
video_segments.append(segment)
# 处理每个视频片段
processed_videos = []
temp_dir = os.path.join(output_dir, "temp_videos")
os.makedirs(temp_dir, exist_ok=True)
try:
# 第一阶段:处理所有视频片段到中间文件
for segment in video_segments:
# 处理单个视频,去除或保留音频
temp_output = os.path.join(temp_dir, f"processed_{segment['index']}.mp4")
try:
process_single_video(
input_path=segment['path'],
output_path=temp_output,
target_width=video_width,
target_height=video_height,
keep_audio=segment['keep_audio'],
hwaccel=hwaccel
)
processed_videos.append({
"index": segment["index"],
"path": temp_output,
"keep_audio": segment["keep_audio"]
})
logger.info(f"视频 {segment['index'] + 1}/{len(video_segments)} 处理完成")
except Exception as e:
logger.error(f"处理视频 {segment['path']} 时出错: {str(e)}")
# 如果使用硬件加速失败,尝试使用软件编码
if hwaccel and not force_software_encoding:
logger.info(f"尝试使用软件编码处理视频 {segment['path']}")
try:
process_single_video(
input_path=segment['path'],
output_path=temp_output,
target_width=video_width,
target_height=video_height,
keep_audio=segment['keep_audio'],
hwaccel=None # 使用软件编码
)
processed_videos.append({
"index": segment["index"],
"path": temp_output,
"keep_audio": segment["keep_audio"]
})
logger.info(f"使用软件编码成功处理视频 {segment['index'] + 1}/{len(video_segments)}")
except Exception as fallback_error:
logger.error(f"使用软件编码处理视频 {segment['path']} 也失败: {str(fallback_error)}")
continue
else:
continue
if not processed_videos:
raise ValueError("没有有效的视频片段可以合并")
# 按原始索引排序处理后的视频
processed_videos.sort(key=lambda x: x["index"])
# 第二阶段:分步骤合并视频 - 避免复杂的filter_complex滤镜
try:
# 1. 首先,将所有没有音频的视频或音频被禁用的视频合并到一个临时文件中
video_paths_only = [video["path"] for video in processed_videos]
video_concat_path = os.path.join(temp_dir, "video_concat.mp4")
# 创建concat文件,用于合并视频流
concat_file = os.path.join(temp_dir, "concat_list.txt")
create_ffmpeg_concat_file(video_paths_only, concat_file)
# 合并所有视频流,但不包含音频
concat_cmd = [
'ffmpeg', '-y',
'-f', 'concat',
'-safe', '0',
'-i', concat_file,
'-c:v', 'libx264',
'-preset', 'medium',
'-profile:v', 'high',
'-an', # 不包含音频
'-threads', str(threads),
video_concat_path
]
subprocess.run(concat_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info("视频流合并完成")
# 2. 提取并合并有音频的片段
audio_segments = [video for video in processed_videos if video["keep_audio"]]
if not audio_segments:
# 如果没有音频片段,直接使用无音频的合并视频作为最终结果
shutil.copy(video_concat_path, output_video_path)
logger.info("无音频视频合并完成")
return output_video_path
# 创建音频中间文件
audio_files = []
for i, segment in enumerate(audio_segments):
# 提取音频
audio_file = os.path.join(temp_dir, f"audio_{i}.aac")
extract_audio_cmd = [
'ffmpeg', '-y',
'-i', segment["path"],
'-vn', # 不包含视频
'-c:a', 'aac',
'-b:a', '128k',
audio_file
]
subprocess.run(extract_audio_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
audio_files.append({
"index": segment["index"],
"path": audio_file
})
logger.info(f"提取音频 {i+1}/{len(audio_segments)} 完成")
# 3. 计算每个音频片段的时间位置
audio_timings = []
current_time = 0.0
# 获取每个视频片段的时长
for i, video in enumerate(processed_videos):
duration_cmd = [
'ffprobe', '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'csv=p=0',
video["path"]
]
result = subprocess.run(duration_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
duration = float(result.stdout.strip())
# 如果当前片段需要保留音频,记录时间位置
if video["keep_audio"]:
for audio in audio_files:
if audio["index"] == video["index"]:
audio_timings.append({
"file": audio["path"],
"start": current_time,
"index": video["index"]
})
break
current_time += duration
# 4. 创建静音音频轨道作为基础
silence_audio = os.path.join(temp_dir, "silence.aac")
create_silence_cmd = [
'ffmpeg', '-y',
'-f', 'lavfi',
'-i', f'anullsrc=r=44100:cl=stereo',
'-t', str(current_time), # 总时长
'-c:a', 'aac',
'-b:a', '128k',
silence_audio
]
subprocess.run(create_silence_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 5. 创建复杂滤镜命令以混合音频
filter_script = os.path.join(temp_dir, "filter_script.txt")
with open(filter_script, 'w') as f:
f.write(f"[0:a]volume=0.0[silence];\n") # 首先静音背景轨道
# 添加每个音频文件
for i, timing in enumerate(audio_timings):
f.write(f"[{i+1}:a]adelay={int(timing['start']*1000)}|{int(timing['start']*1000)}[a{i}];\n")
# 混合所有音频
mix_str = "[silence]"
for i in range(len(audio_timings)):
mix_str += f"[a{i}]"
mix_str += f"amix=inputs={len(audio_timings)+1}:duration=longest[aout]"
f.write(mix_str)
# 6. 构建音频合并命令
audio_inputs = ['-i', silence_audio]
for timing in audio_timings:
audio_inputs.extend(['-i', timing["file"]])
mixed_audio = os.path.join(temp_dir, "mixed_audio.aac")
audio_mix_cmd = [
'ffmpeg', '-y'
] + audio_inputs + [
'-filter_complex_script', filter_script,
'-map', '[aout]',
'-c:a', 'aac',
'-b:a', '128k',
mixed_audio
]
subprocess.run(audio_mix_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info("音频混合完成")
# 7. 将合并的视频和混合的音频组合在一起
final_cmd = [
'ffmpeg', '-y',
'-i', video_concat_path,
'-i', mixed_audio,
'-c:v', 'copy',
'-c:a', 'aac',
'-map', '0:v:0',
'-map', '1:a:0',
'-shortest',
output_video_path
]
subprocess.run(final_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info("视频最终合并完成")
return output_video_path
except subprocess.CalledProcessError as e:
logger.error(f"合并视频过程中出错: {e.stderr.decode() if e.stderr else str(e)}")
# 尝试备用合并方法 - 最简单的无音频合并
logger.info("尝试备用合并方法 - 无音频合并")
try:
concat_file = os.path.join(temp_dir, "concat_list.txt")
video_paths_only = [video["path"] for video in processed_videos]
create_ffmpeg_concat_file(video_paths_only, concat_file)
backup_cmd = [
'ffmpeg', '-y',
'-f', 'concat',
'-safe', '0',
'-i', concat_file,
'-c:v', 'copy',
'-an', # 无音频
output_video_path
]
subprocess.run(backup_cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.warning("使用备用方法(无音频)成功合并视频")
return output_video_path
except Exception as backup_error:
logger.error(f"备用合并方法也失败: {str(backup_error)}")
raise RuntimeError(f"无法合并视频: {str(backup_error)}")
except Exception as e:
logger.error(f"合并视频时出错: {str(e)}")
raise
finally:
# 清理临时文件
try:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
logger.info("已清理临时文件")
except Exception as e:
logger.warning(f"清理临时文件时出错: {str(e)}")
if __name__ == '__main__':
video_paths = [
'/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E02_00_14_09_440.mp4',
'/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E08_00_27_11_110.mp4',
'/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E08_00_34_44_480.mp4',
'/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E08_00_42_47_630.mp4',
'/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/S01E09_00_29_48_160.mp4'
]
combine_clip_videos(
output_video_path="/Users/apple/Desktop/home/NarratoAI/storage/temp/merge/merged_123.mp4",
video_paths=video_paths,
video_ost_list=[1, 1, 1,1,1],
video_aspect=VideoAspect.portrait,
force_software_encoding=False # 默认不强制使用软件编码,让系统自动决定
)