Spaces:
Sleeping
Sleeping
import os | |
import random | |
import joblib | |
import librosa | |
import numpy as np | |
import pandas as pd | |
from tqdm import tqdm | |
from fastapi import APIRouter | |
from datetime import datetime | |
from datasets import load_dataset, Audio | |
from sklearn.metrics import accuracy_score | |
from .utils.evaluation import AudioEvaluationRequest | |
from .utils.emissions import tracker, clean_emissions_data, get_space_info | |
from dotenv import load_dotenv | |
load_dotenv() | |
router = APIRouter() | |
DESCRIPTION = "Random Baseline" | |
ROUTE = "/audio" | |
def is_valid_duration(example): | |
""" | |
Filter function to remove samples with decoding errors. | |
To be used with datasets.filter() | |
""" | |
return len(example["audio"]["array"]) > 0 | |
def enhanced_dsp_pipeline(y, sr, n_fft=80, hop_length=40): | |
"""Extract enhanced audio features.""" | |
features = {} | |
# Normalize audio with a larger maximum value | |
y = librosa.util.normalize(y, norm=np.inf) | |
# Apply pre-emphasis to enhance high frequencies | |
y_pre = librosa.effects.preemphasis(y, coef=0.97) | |
# Compute spectrograms for both original and pre-emphasized signals | |
D = librosa.stft(y, n_fft=n_fft, hop_length=hop_length) | |
D_pre = librosa.stft(y_pre, n_fft=n_fft, hop_length=hop_length) | |
S = np.abs(D) | |
S_pre = np.abs(D_pre) | |
# Core spectral features from original signal | |
features['centroid'] = librosa.feature.spectral_centroid(S=S, sr=sr).ravel() | |
features['roloff'] = librosa.feature.spectral_rolloff(S=S, sr=sr, roll_percent=0.85).ravel() | |
features['zcr'] = librosa.feature.zero_crossing_rate(y, frame_length=n_fft, hop_length=hop_length).ravel() | |
features['rmse'] = librosa.feature.rms(S=S, frame_length=n_fft).ravel() | |
features['flux'] = librosa.onset.onset_strength(y=y, sr=sr).ravel() | |
# Additional features from pre-emphasized signal | |
features['pre_centroid'] = librosa.feature.spectral_centroid(S=S_pre, sr=sr).ravel() | |
features['pre_roloff'] = librosa.feature.spectral_rolloff(S=S_pre, sr=sr, roll_percent=0.85).ravel() | |
features['pre_contrast'] = librosa.feature.spectral_contrast(S=S_pre, sr=sr, n_bands=2).ravel() | |
# Bandwidth at different frequency cutoffs | |
features['bandwidth_80'] = librosa.feature.spectral_bandwidth(S=S, sr=sr, p=0.8).ravel() | |
features['bandwidth_90'] = librosa.feature.spectral_bandwidth(S=S, sr=sr, p=0.9).ravel() | |
# Enhanced MFCC computation with more coefficients and focused frequency bands | |
mfcc = librosa.feature.mfcc( | |
y=y_pre, # Use pre-emphasized signal | |
sr=sr, | |
n_fft=n_fft*2, # Increased frequency resolution | |
hop_length=hop_length, | |
n_mfcc=20, # Increased from 13 to 20 | |
fmin=50, # Focus on chainsaw frequency range | |
fmax=2000, # Upper limit for chainsaw harmonics | |
n_mels=40 # Increased mel bands | |
) | |
# Compute deltas and double-deltas (acceleration coefficients) | |
mfcc_delta = librosa.feature.delta(mfcc) | |
mfcc_delta2 = librosa.feature.delta(mfcc, order=2) | |
# Add static MFCC coefficients | |
for idx, v_mfcc in enumerate(mfcc): | |
features[f'mfcc_{idx}'] = v_mfcc.ravel() | |
# Add delta coefficients | |
for idx, v_delta in enumerate(mfcc_delta): | |
features[f'mfcc_delta_{idx}'] = v_delta.ravel() | |
# Add double-delta coefficients | |
for idx, v_delta2 in enumerate(mfcc_delta2): | |
features[f'mfcc_delta2_{idx}'] = v_delta2.ravel() | |
# Calculate covariance between consecutive MFCC coefficients | |
for i in range(mfcc.shape[0]-1): | |
features[f'mfcc_cov_{i}_{i+1}'] = np.cov(mfcc[i], mfcc[i+1])[0,1] | |
# Calculate statistics | |
stats_dict = {} | |
for k, v in features.items(): | |
stats_dict[f'{k}_max'] = np.max(v) | |
stats_dict[f'{k}_min'] = np.min(v) | |
stats_dict[f'{k}_mean'] = np.mean(v) | |
stats_dict[f'{k}_std'] = np.std(v) | |
return stats_dict | |
def segment_features(y, sr, segment_duration=0.5): | |
"""Extract features from audio segments.""" | |
segment_length = int(segment_duration * sr) | |
segments = [y[i:i + segment_length] for i in range(0, len(y), segment_length)] | |
all_features = [] | |
for segment in segments: | |
if len(segment) >= segment_length // 2: | |
features = enhanced_dsp_pipeline(segment, sr) | |
all_features.append(features) | |
if not all_features: | |
return enhanced_dsp_pipeline(y, sr) | |
# Aggregate features across segments | |
aggregated_features = {} | |
for key in all_features[0].keys(): | |
values = [f[key] for f in all_features] | |
aggregated_features[key] = np.mean(values) | |
aggregated_features[f"{key}_var"] = np.var(values) | |
return aggregated_features | |
def process_dataset(dataset): | |
"""Process the dataset and prepare features.""" | |
features = [] | |
labels = [] | |
for d in tqdm(dataset): | |
y = d["audio"]["array"] | |
label = d["label"] | |
# Process original audio | |
segment_feats = segment_features(y, sr=4000) | |
features.append(segment_feats) | |
labels.append(label) | |
X = pd.DataFrame(features) | |
y = np.array(labels) | |
return X, y | |
def evaluate_model(model, X_test, selected_features): | |
"""Evaluate model on test set.""" | |
X_test_selected = X_test[selected_features] | |
return model.predict(X_test_selected) | |
async def evaluate_audio(request: AudioEvaluationRequest): | |
""" | |
Evaluate audio classification for rainforest sound detection. | |
Current Model: Random Baseline | |
- Makes random predictions from the label space (0-1) | |
- Used as a baseline for comparison | |
""" | |
# Get space info | |
username, space_url = get_space_info() | |
# Define the label mapping | |
LABEL_MAPPING = { | |
"chainsaw": 0, | |
"environment": 1 | |
} | |
# Load and prepare the dataset | |
# Because the dataset is gated, we need to use the HF_TOKEN environment variable to authenticate | |
dataset = load_dataset(request.dataset_name,token=os.getenv("HF_TOKEN")) | |
# Split dataset | |
train_test = dataset["train"] | |
test_dataset = dataset["test"] | |
# Start tracking emissions | |
tracker.start() | |
tracker.start_task("inference") | |
#-------------------------------------------------------------------------------------------- | |
# YOUR MODEL INFERENCE CODE HERE | |
# Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked. | |
#-------------------------------------------------------------------------------------------- | |
# Make random predictions (placeholder for actual model inference) | |
test_dataset = test_dataset.filter(is_valid_duration) | |
test_dataset = test_dataset.cast_column("audio", Audio(sampling_rate=4000)) | |
X_test, true_labels = process_dataset(test_dataset) | |
model = joblib.load('tasks/assets/chainsaw_model.joblib') | |
selected_features = joblib.load('tasks/assets/selected_features.joblib') | |
predictions = evaluate_model(model, X_test, selected_features) | |
#-------------------------------------------------------------------------------------------- | |
# YOUR MODEL INFERENCE STOPS HERE | |
#-------------------------------------------------------------------------------------------- | |
# Stop tracking emissions | |
emissions_data = tracker.stop_task() | |
# Calculate accuracy | |
accuracy = accuracy_score(true_labels, predictions) | |
# Prepare results dictionary | |
results = { | |
"username": username, | |
"space_url": space_url, | |
"submission_timestamp": datetime.now().isoformat(), | |
"model_description": DESCRIPTION, | |
"accuracy": float(accuracy), | |
"energy_consumed_wh": emissions_data.energy_consumed * 1000, | |
"emissions_gco2eq": emissions_data.emissions * 1000, | |
"emissions_data": clean_emissions_data(emissions_data), | |
"api_route": ROUTE, | |
"dataset_config": { | |
"dataset_name": request.dataset_name, | |
"test_size": request.test_size, | |
"test_seed": request.test_seed | |
} | |
} | |
return results |