from fastapi import APIRouter from datetime import datetime from datasets import load_dataset from sklearn.metrics import accuracy_score import random import os import librosa import pickle import pandas as pd import numpy as np from sklearn.ensemble import RandomForestClassifier from .utils.evaluation import AudioEvaluationRequest from .utils.emissions import tracker, clean_emissions_data, get_space_info from dotenv import load_dotenv load_dotenv() router = APIRouter() DESCRIPTION = "Not Neural Network" ROUTE = "/audio" def extract_mfcc_features( signal, sr=16000, n_mfcc=13, duration=3.0 ): """ Extrait des MFCC (base, delta, delta-delta) à partir d'un signal audio 1D. Retourne un tuple: (features_vector, mfcc_combined) où: - features_vector : vecteur 1D (moyenne+std des MFCC combinés), - mfcc_combined : matrice 2D de taille (3*n_mfcc, nb_frames). """ # 1) Durée cible en échantillons target_length = int(sr * duration) # 2) Tronquer ou padder le signal à la durée souhaitée if len(signal) > target_length: signal = signal[:target_length] elif len(signal) < target_length: signal = np.pad(signal, (0, target_length - len(signal)), mode='constant') # 3) Extraction des MFCC de base mfcc = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=n_mfcc) # 4) Dérivées première (delta) et seconde (delta-delta) mfcc_delta = librosa.feature.delta(mfcc, order=1) mfcc_delta2 = librosa.feature.delta(mfcc, order=2) # 5) Concaténer en (3*n_mfcc, nb_frames) mfcc_combined = np.vstack([mfcc, mfcc_delta, mfcc_delta2]) # 6) Calculer moyenne et écart-type sur l'axe temporel # => vecteur de taille (6 * n_mfcc) si 3*n_mfcc + mean/std mfcc_mean = np.mean(mfcc_combined, axis=1) mfcc_std = np.std(mfcc_combined, axis=1) # 7) Vecteur global features_vector = np.concatenate([mfcc_mean, mfcc_std]) # Retour des deux return features_vector, mfcc_combined def transform_data(df, sr=12000, duration=3.0): """ Prend un DataFrame df avec colonnes 'audio' et 'label'. - Extrait les MFCC + delta + delta-delta pour chaque signal => récupère un vecteur global (mean/std) + la matrice 2D complète. - Montre comment concaténer ces deux morceaux pour un seul vecteur final. - Entraîne un RandomForest (binaire). - Affiche l'accuracy sur un jeu de test (25%). """ X = [] Y = [] print("Extraction des features MFCC (base + delta + delta-delta)...") for i, row in df.iterrows(): signal = row["audio"] y = row["label"] # Récupère (vecteur global, matrice 2D) features_vector, mfcc_matrix = extract_mfcc_features( signal=signal, sr=sr, duration=duration ) # Exemple : On concatène (moyenne+std) + la matrice aplatie mfcc_matrix_flat = mfcc_matrix.flatten() big_features = np.concatenate([features_vector, mfcc_matrix_flat]) # On stocke big_features dans X X.append(big_features) Y.append(y) X = np.array(X) Y = np.array(Y) return X, y @router.post(ROUTE, tags=["Audio Task"], description=DESCRIPTION) async def evaluate_audio(request: AudioEvaluationRequest): """ Evaluate audio classification for rainforest sound detection. Current Model: Random Baseline - Makes random predictions from the label space (0-1) - Used as a baseline for comparison """ # Get space info username, space_url = get_space_info() # Define the label mapping LABEL_MAPPING = { "chainsaw": 0, "environment": 1 } # Load and prepare the dataset # Because the dataset is gated, we need to use the HF_TOKEN environment variable to authenticate dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN")) # Split dataset train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed) test_dataset = train_test["test"] """ dict_train = [ { "label": elmt["label"], "audio": elmt["audio"]["array"], "sampling_rate": elmt["audio"]["sampling_rate"] } for elmt in train_test["train"] ] """ # df_train = pd.DataFrame(dict_train) dict_test = [ { "label": elmt["label"], "audio": elmt["audio"]["array"], "sampling_rate": elmt["audio"]["sampling_rate"] } for elmt in test_dataset ] df_test = pd.DataFrame(dict_test) # Get the model with open("models/mon_modele.pkl", "rb") as f: model = pickle.load(f) # model = RandomForestClassifier # X_train, y_train = transform_data(df_test) X_test, y_test = transform_data(df_test) # Start tracking emissions tracker.start() tracker.start_task("inference") #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE CODE HERE # Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked. #-------------------------------------------------------------------------------------------- # clf = RandomForestClassifier(n_estimators=100, random_state=42) # clf.fit(X_train, y_train) print("Évaluation sur le test set...") y_pred = model.predict(X_test) # Make random predictions (placeholder for actual model inference) true_labels = test_dataset["label"] # predictions = [random.randint(0, 1) for _ in range(len(true_labels))] predictions = model.predict(df_test) #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE STOPS HERE #-------------------------------------------------------------------------------------------- # Stop tracking emissions emissions_data = tracker.stop_task() # Calculate accuracy accuracy = accuracy_score(true_labels, predictions) # Prepare results dictionary results = { "username": username, "space_url": space_url, "submission_timestamp": datetime.now().isoformat(), "model_description": DESCRIPTION, "accuracy": float(accuracy), "energy_consumed_wh": emissions_data.energy_consumed * 1000, "emissions_gco2eq": emissions_data.emissions * 1000, "emissions_data": clean_emissions_data(emissions_data), "api_route": ROUTE, "dataset_config": { "dataset_name": request.dataset_name, "test_size": request.test_size, "test_seed": request.test_seed } } return results