Spaces:
Sleeping
Sleeping
from fastapi import APIRouter | |
from datetime import datetime | |
from datasets import load_dataset | |
from sklearn.metrics import accuracy_score | |
import random | |
import os | |
import librosa | |
import pickle | |
import pandas as pd | |
import numpy as np | |
from sklearn.ensemble import RandomForestClassifier | |
from .utils.evaluation import AudioEvaluationRequest | |
from .utils.emissions import tracker, clean_emissions_data, get_space_info | |
from dotenv import load_dotenv | |
load_dotenv() | |
router = APIRouter() | |
DESCRIPTION = "Not Neural Network" | |
ROUTE = "/audio" | |
def extract_mfcc_features( | |
signal, | |
sr=16000, | |
n_mfcc=13, | |
duration=3.0 | |
): | |
""" | |
Extrait des MFCC (base, delta, delta-delta) à partir d'un signal audio 1D. | |
Retourne un tuple: | |
(features_vector, mfcc_combined) | |
où: | |
- features_vector : vecteur 1D (moyenne+std des MFCC combinés), | |
- mfcc_combined : matrice 2D de taille (3*n_mfcc, nb_frames). | |
""" | |
# 1) Durée cible en échantillons | |
target_length = int(sr * duration) | |
# 2) Tronquer ou padder le signal à la durée souhaitée | |
if len(signal) > target_length: | |
signal = signal[:target_length] | |
elif len(signal) < target_length: | |
signal = np.pad(signal, (0, target_length - len(signal)), mode='constant') | |
# 3) Extraction des MFCC de base | |
mfcc = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=n_mfcc) | |
# 4) Dérivées première (delta) et seconde (delta-delta) | |
mfcc_delta = librosa.feature.delta(mfcc, order=1) | |
mfcc_delta2 = librosa.feature.delta(mfcc, order=2) | |
# 5) Concaténer en (3*n_mfcc, nb_frames) | |
mfcc_combined = np.vstack([mfcc, mfcc_delta, mfcc_delta2]) | |
# 6) Calculer moyenne et écart-type sur l'axe temporel | |
# => vecteur de taille (6 * n_mfcc) si 3*n_mfcc + mean/std | |
mfcc_mean = np.mean(mfcc_combined, axis=1) | |
mfcc_std = np.std(mfcc_combined, axis=1) | |
# 7) Vecteur global | |
features_vector = np.concatenate([mfcc_mean, mfcc_std]) | |
# Retour des deux | |
return features_vector, mfcc_combined | |
def transform_data(df, sr=12000, duration=3.0): | |
""" | |
Prend un DataFrame df avec colonnes 'audio' et 'label'. | |
- Extrait les MFCC + delta + delta-delta pour chaque signal | |
=> récupère un vecteur global (mean/std) + la matrice 2D complète. | |
- Montre comment concaténer ces deux morceaux pour un seul vecteur final. | |
- Entraîne un RandomForest (binaire). | |
- Affiche l'accuracy sur un jeu de test (25%). | |
""" | |
X = [] | |
Y = [] | |
print("Extraction des features MFCC (base + delta + delta-delta)...") | |
for i, row in df.iterrows(): | |
signal = row["audio"] | |
y = row["label"] | |
# Récupère (vecteur global, matrice 2D) | |
features_vector, mfcc_matrix = extract_mfcc_features( | |
signal=signal, | |
sr=sr, | |
duration=duration | |
) | |
# Exemple : On concatène (moyenne+std) + la matrice aplatie | |
mfcc_matrix_flat = mfcc_matrix.flatten() | |
big_features = np.concatenate([features_vector, mfcc_matrix_flat]) | |
# On stocke big_features dans X | |
X.append(big_features) | |
Y.append(y) | |
X = np.array(X) | |
Y = np.array(Y) | |
return X, y | |
async def evaluate_audio(request: AudioEvaluationRequest): | |
""" | |
Evaluate audio classification for rainforest sound detection. | |
Current Model: Random Baseline | |
- Makes random predictions from the label space (0-1) | |
- Used as a baseline for comparison | |
""" | |
# Get space info | |
username, space_url = get_space_info() | |
# Define the label mapping | |
LABEL_MAPPING = { | |
"chainsaw": 0, | |
"environment": 1 | |
} | |
# Load and prepare the dataset | |
# Because the dataset is gated, we need to use the HF_TOKEN environment variable to authenticate | |
dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN")) | |
# Split dataset | |
train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed) | |
test_dataset = train_test["test"] | |
""" | |
dict_train = [ | |
{ | |
"label": elmt["label"], | |
"audio": elmt["audio"]["array"], | |
"sampling_rate": elmt["audio"]["sampling_rate"] | |
} for elmt in train_test["train"] | |
] | |
""" | |
# df_train = pd.DataFrame(dict_train) | |
dict_test = [ | |
{ | |
"label": elmt["label"], | |
"audio": elmt["audio"]["array"], | |
"sampling_rate": elmt["audio"]["sampling_rate"] | |
} for elmt in test_dataset | |
] | |
df_test = pd.DataFrame(dict_test) | |
# Get the model | |
with open("models/mon_modele.pkl", "rb") as f: | |
model = pickle.load(f) | |
# model = RandomForestClassifier | |
# X_train, y_train = transform_data(df_test) | |
X_test, y_test = transform_data(df_test) | |
# Start tracking emissions | |
tracker.start() | |
tracker.start_task("inference") | |
#-------------------------------------------------------------------------------------------- | |
# YOUR MODEL INFERENCE CODE HERE | |
# Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked. | |
#-------------------------------------------------------------------------------------------- | |
# clf = RandomForestClassifier(n_estimators=100, random_state=42) | |
# clf.fit(X_train, y_train) | |
print("Évaluation sur le test set...") | |
y_pred = model.predict(X_test) | |
# Make random predictions (placeholder for actual model inference) | |
true_labels = test_dataset["label"] | |
# predictions = [random.randint(0, 1) for _ in range(len(true_labels))] | |
predictions = model.predict(df_test) | |
#-------------------------------------------------------------------------------------------- | |
# YOUR MODEL INFERENCE STOPS HERE | |
#-------------------------------------------------------------------------------------------- | |
# Stop tracking emissions | |
emissions_data = tracker.stop_task() | |
# Calculate accuracy | |
accuracy = accuracy_score(true_labels, predictions) | |
# Prepare results dictionary | |
results = { | |
"username": username, | |
"space_url": space_url, | |
"submission_timestamp": datetime.now().isoformat(), | |
"model_description": DESCRIPTION, | |
"accuracy": float(accuracy), | |
"energy_consumed_wh": emissions_data.energy_consumed * 1000, | |
"emissions_gco2eq": emissions_data.emissions * 1000, | |
"emissions_data": clean_emissions_data(emissions_data), | |
"api_route": ROUTE, | |
"dataset_config": { | |
"dataset_name": request.dataset_name, | |
"test_size": request.test_size, | |
"test_seed": request.test_seed | |
} | |
} | |
return results |