Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Audio Processor - Handles audio blending and processing | |
| """ | |
| import numpy as np | |
| import torch | |
| import torchaudio | |
| from scipy import signal | |
| import logging | |
| from typing import Optional, Tuple | |
| logger = logging.getLogger(__name__) | |
| class AudioProcessor: | |
| """Handles audio processing, blending, and effects.""" | |
| def __init__(self, config: dict): | |
| """ | |
| Initialize audio processor. | |
| Args: | |
| config: Configuration dictionary | |
| """ | |
| self.config = config | |
| self.sample_rate = config.get("sample_rate", 44100) | |
| def blend_clip( | |
| self, | |
| new_clip_path: str, | |
| previous_clip: Optional[np.ndarray], | |
| lead_in: float = 2.0, | |
| lead_out: float = 2.0 | |
| ) -> str: | |
| """ | |
| Blend new clip with previous clip using crossfades. | |
| Args: | |
| new_clip_path: Path to new audio clip | |
| previous_clip: Previous clip as numpy array | |
| lead_in: Lead-in duration in seconds for blending | |
| lead_out: Lead-out duration in seconds for blending | |
| Returns: | |
| Path to blended clip | |
| """ | |
| try: | |
| # Load new clip | |
| new_audio, sr = torchaudio.load(new_clip_path) | |
| if sr != self.sample_rate: | |
| resampler = torchaudio.transforms.Resample(sr, self.sample_rate) | |
| new_audio = resampler(new_audio) | |
| new_np = new_audio.numpy() | |
| # If no previous clip, return new clip as-is | |
| if previous_clip is None: | |
| return new_clip_path | |
| # Calculate blend samples | |
| lead_in_samples = int(lead_in * self.sample_rate) | |
| lead_out_samples = int(lead_out * self.sample_rate) | |
| # Ensure clips are compatible shape | |
| if previous_clip.shape[0] != new_np.shape[0]: | |
| # Match channels | |
| if previous_clip.shape[0] == 1 and new_np.shape[0] == 2: | |
| previous_clip = np.repeat(previous_clip, 2, axis=0) | |
| elif previous_clip.shape[0] == 2 and new_np.shape[0] == 1: | |
| new_np = np.repeat(new_np, 2, axis=0) | |
| # Blend lead-in with previous clip's lead-out | |
| if previous_clip.shape[1] >= lead_out_samples and new_np.shape[1] >= lead_in_samples: | |
| # Extract regions to blend | |
| prev_tail = previous_clip[:, -lead_out_samples:] | |
| new_head = new_np[:, :lead_in_samples] | |
| # Create crossfade | |
| # Use equal-power crossfade for smooth transition | |
| fade_out = np.cos(np.linspace(0, np.pi/2, lead_out_samples)) ** 2 | |
| fade_in = np.sin(np.linspace(0, np.pi/2, lead_in_samples)) ** 2 | |
| # Adjust lengths if different | |
| if lead_in_samples != lead_out_samples: | |
| # Use shorter length | |
| blend_length = min(lead_in_samples, lead_out_samples) | |
| prev_tail = prev_tail[:, -blend_length:] | |
| new_head = new_head[:, :blend_length] | |
| fade_out = fade_out[-blend_length:] | |
| fade_in = fade_in[:blend_length] | |
| # Apply crossfade | |
| blended_region = (prev_tail * fade_out + new_head * fade_in) | |
| # Reconstruct clip with blended region | |
| result = new_np.copy() | |
| result[:, :blended_region.shape[1]] = blended_region | |
| else: | |
| # Not enough audio to blend, return as-is | |
| result = new_np | |
| # Apply gentle compression to avoid clipping | |
| result = self._apply_compression(result) | |
| # Save blended clip | |
| from pathlib import Path | |
| from datetime import datetime | |
| output_dir = Path(self.config.get("output_dir", "outputs")) | |
| output_dir.mkdir(exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_path = output_dir / f"blended_{timestamp}.wav" | |
| result_tensor = torch.from_numpy(result).float() | |
| torchaudio.save( | |
| str(output_path), | |
| result_tensor, | |
| self.sample_rate, | |
| encoding="PCM_S", | |
| bits_per_sample=16 | |
| ) | |
| logger.info(f"✅ Blended clip saved: {output_path}") | |
| return str(output_path) | |
| except Exception as e: | |
| logger.error(f"Blending failed: {e}") | |
| # Return original if blending fails | |
| return new_clip_path | |
| def crossfade( | |
| self, | |
| audio1: np.ndarray, | |
| audio2: np.ndarray, | |
| fade_duration: float = 2.0 | |
| ) -> np.ndarray: | |
| """ | |
| Create crossfade between two audio segments. | |
| Args: | |
| audio1: First audio segment | |
| audio2: Second audio segment | |
| fade_duration: Duration of crossfade in seconds | |
| Returns: | |
| Crossfaded audio | |
| """ | |
| fade_samples = int(fade_duration * self.sample_rate) | |
| # Ensure same number of channels | |
| if audio1.shape[0] != audio2.shape[0]: | |
| target_channels = max(audio1.shape[0], audio2.shape[0]) | |
| if audio1.shape[0] < target_channels: | |
| audio1 = np.repeat(audio1, target_channels // audio1.shape[0], axis=0) | |
| if audio2.shape[0] < target_channels: | |
| audio2 = np.repeat(audio2, target_channels // audio2.shape[0], axis=0) | |
| # Extract fade regions | |
| fade_out_region = audio1[:, -fade_samples:] | |
| fade_in_region = audio2[:, :fade_samples] | |
| # Create equal-power crossfade curves | |
| fade_out_curve = np.cos(np.linspace(0, np.pi/2, fade_samples)) ** 2 | |
| fade_in_curve = np.sin(np.linspace(0, np.pi/2, fade_samples)) ** 2 | |
| # Apply fades | |
| faded = fade_out_region * fade_out_curve + fade_in_region * fade_in_curve | |
| # Concatenate: audio1 (except fade region) + faded + audio2 (except fade region) | |
| result = np.concatenate([ | |
| audio1[:, :-fade_samples], | |
| faded, | |
| audio2[:, fade_samples:] | |
| ], axis=1) | |
| return result | |
| def _apply_compression(self, audio: np.ndarray, threshold: float = 0.8) -> np.ndarray: | |
| """ | |
| Apply gentle compression to prevent clipping. | |
| Args: | |
| audio: Input audio | |
| threshold: Compression threshold | |
| Returns: | |
| Compressed audio | |
| """ | |
| # Soft clipping using tanh | |
| peak = np.abs(audio).max() | |
| if peak > threshold: | |
| # Apply soft compression | |
| compressed = np.tanh(audio * (threshold / peak)) * threshold | |
| return compressed | |
| return audio | |
| def normalize_audio(self, audio: np.ndarray, target_db: float = -3.0) -> np.ndarray: | |
| """ | |
| Normalize audio to target dB level. | |
| Args: | |
| audio: Input audio | |
| target_db: Target level in dB | |
| Returns: | |
| Normalized audio | |
| """ | |
| # Calculate current peak in dB | |
| peak = np.abs(audio).max() | |
| if peak == 0: | |
| return audio | |
| current_db = 20 * np.log10(peak) | |
| # Calculate gain needed | |
| gain_db = target_db - current_db | |
| gain_linear = 10 ** (gain_db / 20) | |
| # Apply gain | |
| normalized = audio * gain_linear | |
| # Ensure no clipping | |
| normalized = np.clip(normalized, -1.0, 1.0) | |
| return normalized | |
| def remove_clicks_pops(self, audio: np.ndarray) -> np.ndarray: | |
| """ | |
| Remove clicks and pops from audio. | |
| Args: | |
| audio: Input audio | |
| Returns: | |
| Cleaned audio | |
| """ | |
| # Apply median filter to remove impulse noise | |
| from scipy.ndimage import median_filter | |
| cleaned = np.zeros_like(audio) | |
| for ch in range(audio.shape[0]): | |
| cleaned[ch] = median_filter(audio[ch], size=3) | |
| return cleaned | |
| def apply_fade( | |
| self, | |
| audio: np.ndarray, | |
| fade_in: float = 0.0, | |
| fade_out: float = 0.0 | |
| ) -> np.ndarray: | |
| """ | |
| Apply fade in/out to audio. | |
| Args: | |
| audio: Input audio | |
| fade_in: Fade in duration in seconds | |
| fade_out: Fade out duration in seconds | |
| Returns: | |
| Faded audio | |
| """ | |
| result = audio.copy() | |
| # Fade in | |
| if fade_in > 0: | |
| fade_in_samples = int(fade_in * self.sample_rate) | |
| fade_in_samples = min(fade_in_samples, audio.shape[1]) | |
| fade_curve = np.linspace(0, 1, fade_in_samples) ** 2 | |
| result[:, :fade_in_samples] *= fade_curve | |
| # Fade out | |
| if fade_out > 0: | |
| fade_out_samples = int(fade_out * self.sample_rate) | |
| fade_out_samples = min(fade_out_samples, audio.shape[1]) | |
| fade_curve = np.linspace(1, 0, fade_out_samples) ** 2 | |
| result[:, -fade_out_samples:] *= fade_curve | |
| return result | |
| def resample_audio( | |
| self, | |
| audio: np.ndarray, | |
| orig_sr: int, | |
| target_sr: int | |
| ) -> np.ndarray: | |
| """ | |
| Resample audio to target sample rate. | |
| Args: | |
| audio: Input audio | |
| orig_sr: Original sample rate | |
| target_sr: Target sample rate | |
| Returns: | |
| Resampled audio | |
| """ | |
| if orig_sr == target_sr: | |
| return audio | |
| # Use scipy's resample for high-quality resampling | |
| num_samples = int(audio.shape[1] * target_sr / orig_sr) | |
| resampled = signal.resample(audio, num_samples, axis=1) | |
| return resampled | |
| def match_loudness( | |
| self, | |
| audio1: np.ndarray, | |
| audio2: np.ndarray | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Match loudness between two audio segments. | |
| Args: | |
| audio1: First audio segment | |
| audio2: Second audio segment | |
| Returns: | |
| Tuple of loudness-matched audio segments | |
| """ | |
| # Calculate RMS for each | |
| rms1 = np.sqrt(np.mean(audio1 ** 2)) | |
| rms2 = np.sqrt(np.mean(audio2 ** 2)) | |
| if rms2 == 0: | |
| return audio1, audio2 | |
| # Calculate gain to match audio1 to audio2 | |
| gain = rms2 / rms1 | |
| # Apply gain to audio1 | |
| matched_audio1 = audio1 * gain | |
| # Prevent clipping | |
| peak = np.abs(matched_audio1).max() | |
| if peak > 1.0: | |
| matched_audio1 = matched_audio1 / peak | |
| return matched_audio1, audio2 | |