File size: 4,673 Bytes
3b13b0e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""
使用 moviepy 优化视频处理速度的示例
包含:视频加速、多核处理、预设参数优化等
"""

from moviepy.editor import VideoFileClip
from moviepy.video.fx.speedx import speedx
import multiprocessing as mp
import time


class VideoSpeedProcessor:
    """视频速度处理器"""

    def __init__(self, input_path: str, output_path: str):
        self.input_path = input_path
        self.output_path = output_path
        # 获取CPU核心数
        self.cpu_cores = mp.cpu_count()

    def process_with_optimization(self, speed_factor: float = 1.0) -> None:
        """
        使用优化参数处理视频
        参数:
            speed_factor: 速度倍数 (1.0 为原速, 2.0 为双倍速)
        """
        start_time = time.time()
        
        # 加载视频时使用优化参数
        video = VideoFileClip(
            self.input_path,
            audio=True,  # 如果不需要音频可以设为False
            target_resolution=(720, None),  # 可以降低分辨率加快处理
            resize_algorithm='fast_bilinear'  # 使用快速的重置算法
        )

        # 应用速度变化
        if speed_factor != 1.0:
            video = speedx(video, factor=speed_factor)

        # 使用优化参数导出视频
        video.write_videofile(
            self.output_path,
            codec='libx264',  # 使用h264编码
            audio_codec='aac',  # 音频编码
            temp_audiofile='temp-audio.m4a',  # 临时音频文件
            remove_temp=True,  # 处理完成后删除临时文件
            write_logfile=False,  # 关闭日志文件
            threads=self.cpu_cores,  # 使用多核处理
            preset='ultrafast',  # 使用最快的编码预设
            ffmpeg_params=[
                '-brand', 'mp42',
                '-crf', '23',  # 压缩率,范围0-51,数值越大压缩率越高
            ]
        )

        # 释放资源
        video.close()

        end_time = time.time()
        print(f"处理完成!用时: {end_time - start_time:.2f} 秒")

    def batch_process_segments(self, segment_times: list, speed_factor: float = 1.0) -> None:
        """
        批量处理视频片段(并行处理)
        参数:
            segment_times: 列表,包含多个(start, end)时间元组
            speed_factor: 速度倍数
        """
        start_time = time.time()
        
        # 创建进程池
        with mp.Pool(processes=self.cpu_cores) as pool:
            # 准备参数
            args = [(self.input_path, start, end, speed_factor, i) 
                   for i, (start, end) in enumerate(segment_times)]
            
            # 并行处理片段
            pool.starmap(self._process_segment, args)

        end_time = time.time()
        print(f"批量处理完成!总用时: {end_time - start_time:.2f} 秒")

    @staticmethod
    def _process_segment(video_path: str, start: str, end: str, 
                        speed_factor: float, index: int) -> None:
        """处理单个视频片段"""
        # 转换时间格式
        start_sec = VideoSpeedProcessor._time_to_seconds(start)
        end_sec = VideoSpeedProcessor._time_to_seconds(end)
        
        # 加载并处理视频片段
        video = VideoFileClip(
            video_path,
            audio=True,
            target_resolution=(720, None)
        ).subclip(start_sec, end_sec)

        # 应用速度变化
        if speed_factor != 1.0:
            video = speedx(video, factor=speed_factor)

        # 保存处理后的片段
        output_path = f"../../resource/videos/segment_{index}.mp4"
        video.write_videofile(
            output_path,
            codec='libx264',
            audio_codec='aac',
            preset='ultrafast',
            threads=2  # 每个进程使用的线程数
        )
        
        video.close()

    @staticmethod
    def _time_to_seconds(time_str: str) -> float:
        """将时间字符串(MM:SS)转换为秒数"""
        minutes, seconds = map(int, time_str.split(':'))
        return minutes * 60 + seconds


def test_video_speed():
    """测试视频加速处理"""
    processor = VideoSpeedProcessor(
        "../../resource/videos/best.mp4",
        "../../resource/videos/speed_up.mp4"
    )
    
    # 测试1:简单加速
    processor.process_with_optimization(speed_factor=1.5)  # 1.5倍速
    
    # 测试2:并行处理多个片段
    segments = [
        ("00:00", "01:00"),
        ("01:00", "02:00"),
        ("02:00", "03:00")
    ]
    processor.batch_process_segments(segments, speed_factor=2.0)  # 2倍速


if __name__ == "__main__":
    test_video_speed()