Spaces:
Running
Running
File size: 4,673 Bytes
3b13b0e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
"""
使用 moviepy 优化视频处理速度的示例
包含:视频加速、多核处理、预设参数优化等
"""
from moviepy.editor import VideoFileClip
from moviepy.video.fx.speedx import speedx
import multiprocessing as mp
import time
class VideoSpeedProcessor:
"""视频速度处理器"""
def __init__(self, input_path: str, output_path: str):
self.input_path = input_path
self.output_path = output_path
# 获取CPU核心数
self.cpu_cores = mp.cpu_count()
def process_with_optimization(self, speed_factor: float = 1.0) -> None:
"""
使用优化参数处理视频
参数:
speed_factor: 速度倍数 (1.0 为原速, 2.0 为双倍速)
"""
start_time = time.time()
# 加载视频时使用优化参数
video = VideoFileClip(
self.input_path,
audio=True, # 如果不需要音频可以设为False
target_resolution=(720, None), # 可以降低分辨率加快处理
resize_algorithm='fast_bilinear' # 使用快速的重置算法
)
# 应用速度变化
if speed_factor != 1.0:
video = speedx(video, factor=speed_factor)
# 使用优化参数导出视频
video.write_videofile(
self.output_path,
codec='libx264', # 使用h264编码
audio_codec='aac', # 音频编码
temp_audiofile='temp-audio.m4a', # 临时音频文件
remove_temp=True, # 处理完成后删除临时文件
write_logfile=False, # 关闭日志文件
threads=self.cpu_cores, # 使用多核处理
preset='ultrafast', # 使用最快的编码预设
ffmpeg_params=[
'-brand', 'mp42',
'-crf', '23', # 压缩率,范围0-51,数值越大压缩率越高
]
)
# 释放资源
video.close()
end_time = time.time()
print(f"处理完成!用时: {end_time - start_time:.2f} 秒")
def batch_process_segments(self, segment_times: list, speed_factor: float = 1.0) -> None:
"""
批量处理视频片段(并行处理)
参数:
segment_times: 列表,包含多个(start, end)时间元组
speed_factor: 速度倍数
"""
start_time = time.time()
# 创建进程池
with mp.Pool(processes=self.cpu_cores) as pool:
# 准备参数
args = [(self.input_path, start, end, speed_factor, i)
for i, (start, end) in enumerate(segment_times)]
# 并行处理片段
pool.starmap(self._process_segment, args)
end_time = time.time()
print(f"批量处理完成!总用时: {end_time - start_time:.2f} 秒")
@staticmethod
def _process_segment(video_path: str, start: str, end: str,
speed_factor: float, index: int) -> None:
"""处理单个视频片段"""
# 转换时间格式
start_sec = VideoSpeedProcessor._time_to_seconds(start)
end_sec = VideoSpeedProcessor._time_to_seconds(end)
# 加载并处理视频片段
video = VideoFileClip(
video_path,
audio=True,
target_resolution=(720, None)
).subclip(start_sec, end_sec)
# 应用速度变化
if speed_factor != 1.0:
video = speedx(video, factor=speed_factor)
# 保存处理后的片段
output_path = f"../../resource/videos/segment_{index}.mp4"
video.write_videofile(
output_path,
codec='libx264',
audio_codec='aac',
preset='ultrafast',
threads=2 # 每个进程使用的线程数
)
video.close()
@staticmethod
def _time_to_seconds(time_str: str) -> float:
"""将时间字符串(MM:SS)转换为秒数"""
minutes, seconds = map(int, time_str.split(':'))
return minutes * 60 + seconds
def test_video_speed():
"""测试视频加速处理"""
processor = VideoSpeedProcessor(
"../../resource/videos/best.mp4",
"../../resource/videos/speed_up.mp4"
)
# 测试1:简单加速
processor.process_with_optimization(speed_factor=1.5) # 1.5倍速
# 测试2:并行处理多个片段
segments = [
("00:00", "01:00"),
("01:00", "02:00"),
("02:00", "03:00")
]
processor.batch_process_segments(segments, speed_factor=2.0) # 2倍速
if __name__ == "__main__":
test_video_speed()
|