File size: 11,200 Bytes
a602628
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
"""
Audio Processor - Handles audio blending and processing
"""

import numpy as np
import torch
import torchaudio
from scipy import signal
import logging
from typing import Optional, Tuple

logger = logging.getLogger(__name__)


class AudioProcessor:
    """Handles audio processing, blending, and effects."""
    
    def __init__(self, config: dict):
        """
        Initialize audio processor.
        
        Args:
            config: Configuration dictionary
        """
        self.config = config
        self.sample_rate = config.get("sample_rate", 44100)
    
    def blend_clip(
        self,
        new_clip_path: str,
        previous_clip: Optional[np.ndarray],
        lead_in: float = 2.0,
        lead_out: float = 2.0
    ) -> str:
        """
        Blend new clip with previous clip using crossfades.
        
        Args:
            new_clip_path: Path to new audio clip
            previous_clip: Previous clip as numpy array
            lead_in: Lead-in duration in seconds for blending
            lead_out: Lead-out duration in seconds for blending
            
        Returns:
            Path to blended clip
        """
        try:
            # Load new clip
            new_audio, sr = torchaudio.load(new_clip_path)
            if sr != self.sample_rate:
                resampler = torchaudio.transforms.Resample(sr, self.sample_rate)
                new_audio = resampler(new_audio)
            
            new_np = new_audio.numpy()
            
            # If no previous clip, return new clip as-is
            if previous_clip is None:
                return new_clip_path
            
            # Calculate blend samples
            lead_in_samples = int(lead_in * self.sample_rate)
            lead_out_samples = int(lead_out * self.sample_rate)
            
            # Ensure clips are compatible shape
            if previous_clip.shape[0] != new_np.shape[0]:
                # Match channels
                if previous_clip.shape[0] == 1 and new_np.shape[0] == 2:
                    previous_clip = np.repeat(previous_clip, 2, axis=0)
                elif previous_clip.shape[0] == 2 and new_np.shape[0] == 1:
                    new_np = np.repeat(new_np, 2, axis=0)
            
            # Blend lead-in with previous clip's lead-out
            if previous_clip.shape[1] >= lead_out_samples and new_np.shape[1] >= lead_in_samples:
                # Extract regions to blend
                prev_tail = previous_clip[:, -lead_out_samples:]
                new_head = new_np[:, :lead_in_samples]
                
                # Create crossfade
                # Use equal-power crossfade for smooth transition
                fade_out = np.cos(np.linspace(0, np.pi/2, lead_out_samples)) ** 2
                fade_in = np.sin(np.linspace(0, np.pi/2, lead_in_samples)) ** 2
                
                # Adjust lengths if different
                if lead_in_samples != lead_out_samples:
                    # Use shorter length
                    blend_length = min(lead_in_samples, lead_out_samples)
                    prev_tail = prev_tail[:, -blend_length:]
                    new_head = new_head[:, :blend_length]
                    fade_out = fade_out[-blend_length:]
                    fade_in = fade_in[:blend_length]
                
                # Apply crossfade
                blended_region = (prev_tail * fade_out + new_head * fade_in)
                
                # Reconstruct clip with blended region
                result = new_np.copy()
                result[:, :blended_region.shape[1]] = blended_region
                
            else:
                # Not enough audio to blend, return as-is
                result = new_np
            
            # Apply gentle compression to avoid clipping
            result = self._apply_compression(result)
            
            # Save blended clip
            from pathlib import Path
            from datetime import datetime
            
            output_dir = Path(self.config.get("output_dir", "outputs"))
            output_dir.mkdir(exist_ok=True)
            
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            output_path = output_dir / f"blended_{timestamp}.wav"
            
            result_tensor = torch.from_numpy(result).float()
            torchaudio.save(
                str(output_path),
                result_tensor,
                self.sample_rate,
                encoding="PCM_S",
                bits_per_sample=16
            )
            
            logger.info(f"✅ Blended clip saved: {output_path}")
            return str(output_path)
            
        except Exception as e:
            logger.error(f"Blending failed: {e}")
            # Return original if blending fails
            return new_clip_path
    
    def crossfade(
        self,
        audio1: np.ndarray,
        audio2: np.ndarray,
        fade_duration: float = 2.0
    ) -> np.ndarray:
        """
        Create crossfade between two audio segments.
        
        Args:
            audio1: First audio segment
            audio2: Second audio segment
            fade_duration: Duration of crossfade in seconds
            
        Returns:
            Crossfaded audio
        """
        fade_samples = int(fade_duration * self.sample_rate)
        
        # Ensure same number of channels
        if audio1.shape[0] != audio2.shape[0]:
            target_channels = max(audio1.shape[0], audio2.shape[0])
            if audio1.shape[0] < target_channels:
                audio1 = np.repeat(audio1, target_channels // audio1.shape[0], axis=0)
            if audio2.shape[0] < target_channels:
                audio2 = np.repeat(audio2, target_channels // audio2.shape[0], axis=0)
        
        # Extract fade regions
        fade_out_region = audio1[:, -fade_samples:]
        fade_in_region = audio2[:, :fade_samples]
        
        # Create equal-power crossfade curves
        fade_out_curve = np.cos(np.linspace(0, np.pi/2, fade_samples)) ** 2
        fade_in_curve = np.sin(np.linspace(0, np.pi/2, fade_samples)) ** 2
        
        # Apply fades
        faded = fade_out_region * fade_out_curve + fade_in_region * fade_in_curve
        
        # Concatenate: audio1 (except fade region) + faded + audio2 (except fade region)
        result = np.concatenate([
            audio1[:, :-fade_samples],
            faded,
            audio2[:, fade_samples:]
        ], axis=1)
        
        return result
    
    def _apply_compression(self, audio: np.ndarray, threshold: float = 0.8) -> np.ndarray:
        """
        Apply gentle compression to prevent clipping.
        
        Args:
            audio: Input audio
            threshold: Compression threshold
            
        Returns:
            Compressed audio
        """
        # Soft clipping using tanh
        peak = np.abs(audio).max()
        
        if peak > threshold:
            # Apply soft compression
            compressed = np.tanh(audio * (threshold / peak)) * threshold
            return compressed
        
        return audio
    
    def normalize_audio(self, audio: np.ndarray, target_db: float = -3.0) -> np.ndarray:
        """
        Normalize audio to target dB level.
        
        Args:
            audio: Input audio
            target_db: Target level in dB
            
        Returns:
            Normalized audio
        """
        # Calculate current peak in dB
        peak = np.abs(audio).max()
        if peak == 0:
            return audio
        
        current_db = 20 * np.log10(peak)
        
        # Calculate gain needed
        gain_db = target_db - current_db
        gain_linear = 10 ** (gain_db / 20)
        
        # Apply gain
        normalized = audio * gain_linear
        
        # Ensure no clipping
        normalized = np.clip(normalized, -1.0, 1.0)
        
        return normalized
    
    def remove_clicks_pops(self, audio: np.ndarray) -> np.ndarray:
        """
        Remove clicks and pops from audio.
        
        Args:
            audio: Input audio
            
        Returns:
            Cleaned audio
        """
        # Apply median filter to remove impulse noise
        from scipy.ndimage import median_filter
        
        cleaned = np.zeros_like(audio)
        for ch in range(audio.shape[0]):
            cleaned[ch] = median_filter(audio[ch], size=3)
        
        return cleaned
    
    def apply_fade(
        self,
        audio: np.ndarray,
        fade_in: float = 0.0,
        fade_out: float = 0.0
    ) -> np.ndarray:
        """
        Apply fade in/out to audio.
        
        Args:
            audio: Input audio
            fade_in: Fade in duration in seconds
            fade_out: Fade out duration in seconds
            
        Returns:
            Faded audio
        """
        result = audio.copy()
        
        # Fade in
        if fade_in > 0:
            fade_in_samples = int(fade_in * self.sample_rate)
            fade_in_samples = min(fade_in_samples, audio.shape[1])
            fade_curve = np.linspace(0, 1, fade_in_samples) ** 2
            result[:, :fade_in_samples] *= fade_curve
        
        # Fade out
        if fade_out > 0:
            fade_out_samples = int(fade_out * self.sample_rate)
            fade_out_samples = min(fade_out_samples, audio.shape[1])
            fade_curve = np.linspace(1, 0, fade_out_samples) ** 2
            result[:, -fade_out_samples:] *= fade_curve
        
        return result
    
    def resample_audio(
        self,
        audio: np.ndarray,
        orig_sr: int,
        target_sr: int
    ) -> np.ndarray:
        """
        Resample audio to target sample rate.
        
        Args:
            audio: Input audio
            orig_sr: Original sample rate
            target_sr: Target sample rate
            
        Returns:
            Resampled audio
        """
        if orig_sr == target_sr:
            return audio
        
        # Use scipy's resample for high-quality resampling
        num_samples = int(audio.shape[1] * target_sr / orig_sr)
        resampled = signal.resample(audio, num_samples, axis=1)
        
        return resampled
    
    def match_loudness(
        self,
        audio1: np.ndarray,
        audio2: np.ndarray
    ) -> Tuple[np.ndarray, np.ndarray]:
        """
        Match loudness between two audio segments.
        
        Args:
            audio1: First audio segment
            audio2: Second audio segment
            
        Returns:
            Tuple of loudness-matched audio segments
        """
        # Calculate RMS for each
        rms1 = np.sqrt(np.mean(audio1 ** 2))
        rms2 = np.sqrt(np.mean(audio2 ** 2))
        
        if rms2 == 0:
            return audio1, audio2
        
        # Calculate gain to match audio1 to audio2
        gain = rms2 / rms1
        
        # Apply gain to audio1
        matched_audio1 = audio1 * gain
        
        # Prevent clipping
        peak = np.abs(matched_audio1).max()
        if peak > 1.0:
            matched_audio1 = matched_audio1 / peak
        
        return matched_audio1, audio2