maxenceory's picture
Update tasks/audio.py
b215aca verified
from fastapi import APIRouter
from datetime import datetime
from datasets import load_dataset
from sklearn.metrics import accuracy_score
import random
import os
import librosa
import pickle
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from .utils.evaluation import AudioEvaluationRequest
from .utils.emissions import tracker, clean_emissions_data, get_space_info
from dotenv import load_dotenv
load_dotenv()
router = APIRouter()
DESCRIPTION = "Not Neural Network"
ROUTE = "/audio"
def extract_mfcc_features(
signal,
sr=16000,
n_mfcc=13,
duration=3.0
):
"""
Extrait des MFCC (base, delta, delta-delta) à partir d'un signal audio 1D.
Retourne un tuple:
(features_vector, mfcc_combined)
où:
- features_vector : vecteur 1D (moyenne+std des MFCC combinés),
- mfcc_combined : matrice 2D de taille (3*n_mfcc, nb_frames).
"""
# 1) Durée cible en échantillons
target_length = int(sr * duration)
# 2) Tronquer ou padder le signal à la durée souhaitée
if len(signal) > target_length:
signal = signal[:target_length]
elif len(signal) < target_length:
signal = np.pad(signal, (0, target_length - len(signal)), mode='constant')
# 3) Extraction des MFCC de base
mfcc = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=n_mfcc)
# 4) Dérivées première (delta) et seconde (delta-delta)
mfcc_delta = librosa.feature.delta(mfcc, order=1)
mfcc_delta2 = librosa.feature.delta(mfcc, order=2)
# 5) Concaténer en (3*n_mfcc, nb_frames)
mfcc_combined = np.vstack([mfcc, mfcc_delta, mfcc_delta2])
# 6) Calculer moyenne et écart-type sur l'axe temporel
# => vecteur de taille (6 * n_mfcc) si 3*n_mfcc + mean/std
mfcc_mean = np.mean(mfcc_combined, axis=1)
mfcc_std = np.std(mfcc_combined, axis=1)
# 7) Vecteur global
features_vector = np.concatenate([mfcc_mean, mfcc_std])
# Retour des deux
return features_vector, mfcc_combined
def transform_data(df, sr=12000, duration=3.0):
"""
Prend un DataFrame df avec colonnes 'audio' et 'label'.
- Extrait les MFCC + delta + delta-delta pour chaque signal
=> récupère un vecteur global (mean/std) + la matrice 2D complète.
- Montre comment concaténer ces deux morceaux pour un seul vecteur final.
- Entraîne un RandomForest (binaire).
- Affiche l'accuracy sur un jeu de test (25%).
"""
X = []
Y = []
print("Extraction des features MFCC (base + delta + delta-delta)...")
for i, row in df.iterrows():
signal = row["audio"]
y = row["label"]
# Récupère (vecteur global, matrice 2D)
features_vector, mfcc_matrix = extract_mfcc_features(
signal=signal,
sr=sr,
duration=duration
)
# Exemple : On concatène (moyenne+std) + la matrice aplatie
mfcc_matrix_flat = mfcc_matrix.flatten()
big_features = np.concatenate([features_vector, mfcc_matrix_flat])
# On stocke big_features dans X
X.append(big_features)
Y.append(y)
X = np.array(X)
Y = np.array(Y)
return X, y
@router.post(ROUTE, tags=["Audio Task"],
description=DESCRIPTION)
async def evaluate_audio(request: AudioEvaluationRequest):
"""
Evaluate audio classification for rainforest sound detection.
Current Model: Random Baseline
- Makes random predictions from the label space (0-1)
- Used as a baseline for comparison
"""
# Get space info
username, space_url = get_space_info()
# Define the label mapping
LABEL_MAPPING = {
"chainsaw": 0,
"environment": 1
}
# Load and prepare the dataset
# Because the dataset is gated, we need to use the HF_TOKEN environment variable to authenticate
dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN"))
# Split dataset
train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed)
test_dataset = train_test["test"]
"""
dict_train = [
{
"label": elmt["label"],
"audio": elmt["audio"]["array"],
"sampling_rate": elmt["audio"]["sampling_rate"]
} for elmt in train_test["train"]
]
"""
# df_train = pd.DataFrame(dict_train)
dict_test = [
{
"label": elmt["label"],
"audio": elmt["audio"]["array"],
"sampling_rate": elmt["audio"]["sampling_rate"]
} for elmt in test_dataset
]
df_test = pd.DataFrame(dict_test)
# Get the model
with open("models/mon_modele.pkl", "rb") as f:
model = pickle.load(f)
# model = RandomForestClassifier
# X_train, y_train = transform_data(df_test)
X_test, y_test = transform_data(df_test)
# Start tracking emissions
tracker.start()
tracker.start_task("inference")
#--------------------------------------------------------------------------------------------
# YOUR MODEL INFERENCE CODE HERE
# Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked.
#--------------------------------------------------------------------------------------------
# clf = RandomForestClassifier(n_estimators=100, random_state=42)
# clf.fit(X_train, y_train)
print("Évaluation sur le test set...")
y_pred = model.predict(X_test)
# Make random predictions (placeholder for actual model inference)
true_labels = test_dataset["label"]
# predictions = [random.randint(0, 1) for _ in range(len(true_labels))]
predictions = model.predict(df_test)
#--------------------------------------------------------------------------------------------
# YOUR MODEL INFERENCE STOPS HERE
#--------------------------------------------------------------------------------------------
# Stop tracking emissions
emissions_data = tracker.stop_task()
# Calculate accuracy
accuracy = accuracy_score(true_labels, predictions)
# Prepare results dictionary
results = {
"username": username,
"space_url": space_url,
"submission_timestamp": datetime.now().isoformat(),
"model_description": DESCRIPTION,
"accuracy": float(accuracy),
"energy_consumed_wh": emissions_data.energy_consumed * 1000,
"emissions_gco2eq": emissions_data.emissions * 1000,
"emissions_data": clean_emissions_data(emissions_data),
"api_route": ROUTE,
"dataset_config": {
"dataset_name": request.dataset_name,
"test_size": request.test_size,
"test_seed": request.test_seed
}
}
return results