""" 使用 moviepy 优化视频处理速度的示例 包含:视频加速、多核处理、预设参数优化等 """ from moviepy.editor import VideoFileClip from moviepy.video.fx.speedx import speedx import multiprocessing as mp import time class VideoSpeedProcessor: """视频速度处理器""" def __init__(self, input_path: str, output_path: str): self.input_path = input_path self.output_path = output_path # 获取CPU核心数 self.cpu_cores = mp.cpu_count() def process_with_optimization(self, speed_factor: float = 1.0) -> None: """ 使用优化参数处理视频 参数: speed_factor: 速度倍数 (1.0 为原速, 2.0 为双倍速) """ start_time = time.time() # 加载视频时使用优化参数 video = VideoFileClip( self.input_path, audio=True, # 如果不需要音频可以设为False target_resolution=(720, None), # 可以降低分辨率加快处理 resize_algorithm='fast_bilinear' # 使用快速的重置算法 ) # 应用速度变化 if speed_factor != 1.0: video = speedx(video, factor=speed_factor) # 使用优化参数导出视频 video.write_videofile( self.output_path, codec='libx264', # 使用h264编码 audio_codec='aac', # 音频编码 temp_audiofile='temp-audio.m4a', # 临时音频文件 remove_temp=True, # 处理完成后删除临时文件 write_logfile=False, # 关闭日志文件 threads=self.cpu_cores, # 使用多核处理 preset='ultrafast', # 使用最快的编码预设 ffmpeg_params=[ '-brand', 'mp42', '-crf', '23', # 压缩率,范围0-51,数值越大压缩率越高 ] ) # 释放资源 video.close() end_time = time.time() print(f"处理完成!用时: {end_time - start_time:.2f} 秒") def batch_process_segments(self, segment_times: list, speed_factor: float = 1.0) -> None: """ 批量处理视频片段(并行处理) 参数: segment_times: 列表,包含多个(start, end)时间元组 speed_factor: 速度倍数 """ start_time = time.time() # 创建进程池 with mp.Pool(processes=self.cpu_cores) as pool: # 准备参数 args = [(self.input_path, start, end, speed_factor, i) for i, (start, end) in enumerate(segment_times)] # 并行处理片段 pool.starmap(self._process_segment, args) end_time = time.time() print(f"批量处理完成!总用时: {end_time - start_time:.2f} 秒") @staticmethod def _process_segment(video_path: str, start: str, end: str, speed_factor: float, index: int) -> None: """处理单个视频片段""" # 转换时间格式 start_sec = VideoSpeedProcessor._time_to_seconds(start) end_sec = VideoSpeedProcessor._time_to_seconds(end) # 加载并处理视频片段 video = VideoFileClip( video_path, audio=True, target_resolution=(720, None) ).subclip(start_sec, end_sec) # 应用速度变化 if speed_factor != 1.0: video = speedx(video, factor=speed_factor) # 保存处理后的片段 output_path = f"../../resource/videos/segment_{index}.mp4" video.write_videofile( output_path, codec='libx264', audio_codec='aac', preset='ultrafast', threads=2 # 每个进程使用的线程数 ) video.close() @staticmethod def _time_to_seconds(time_str: str) -> float: """将时间字符串(MM:SS)转换为秒数""" minutes, seconds = map(int, time_str.split(':')) return minutes * 60 + seconds def test_video_speed(): """测试视频加速处理""" processor = VideoSpeedProcessor( "../../resource/videos/best.mp4", "../../resource/videos/speed_up.mp4" ) # 测试1:简单加速 processor.process_with_optimization(speed_factor=1.5) # 1.5倍速 # 测试2:并行处理多个片段 segments = [ ("00:00", "01:00"), ("01:00", "02:00"), ("02:00", "03:00") ] processor.batch_process_segments(segments, speed_factor=2.0) # 2倍速 if __name__ == "__main__": test_video_speed()