Spaces:
Sleeping
Sleeping
| import datetime | |
| import json | |
| import os | |
| from pickle import load | |
| from typing import Callable, List | |
| import librosa | |
| import numpy as np | |
| import pandas as pd | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from keras import backend as K | |
| from kapre.time_frequency import Spectrogram | |
| from models.convert_to_preset import convert_csv_to_preset | |
| from models.importer_audio import audio_importer | |
| import dawdreamer as daw | |
| from scipy.io import wavfile | |
| import librosa | |
| from generators.parameters import ParameterSet, ParamValue | |
| weight_var = K.variable(0.0) | |
| class Weight_trans(keras.callbacks.Callback): | |
| def __init__(self, weight_var, transition, epochs): | |
| self.alpha = weight_var | |
| self.transition = transition | |
| self.epochs = epochs | |
| def on_epoch_end(self, epoch, logs={}): | |
| if epoch > 680: | |
| if self.transition == "linear": | |
| K.set_value(self.alpha, ((epoch) / (self.epochs) - 0.617) * 0.00001) | |
| tf.print(f"new weight {weight_var.numpy()}") | |
| if self.transition == "linear2": | |
| K.set_value(self.alpha, (1.5625 * epoch - 1.0625) * 0.00001) | |
| tf.print(f"new weight {weight_var.numpy()}") | |
| if self.transition == "log": | |
| K.set_value( | |
| self.alpha, | |
| ( | |
| 1 | |
| - (tf.math.log(epoch * 0.001 - 0.67285) / tf.math.log(0.0005)) | |
| - 0.35 | |
| ) | |
| * 0.00001, | |
| ) | |
| tf.print("log") | |
| if self.transition == "log2": | |
| K.set_value( | |
| self.alpha, | |
| ( | |
| 1 | |
| - (tf.math.log(epoch * 0.001 - 0.6575) / tf.math.log(0.0005)) | |
| - 0.5 | |
| ) | |
| * 0.00001, | |
| ) | |
| tf.print("log") | |
| if self.transition == "log3": | |
| K.set_value( | |
| self.alpha, | |
| ( | |
| 1 | |
| - ( | |
| tf.math.log(epoch * 0.001 - 0.67978) | |
| / tf.math.log(0.00000005) | |
| ) | |
| - 0.5 | |
| ) | |
| * 0.00001, | |
| ) | |
| tf.print("log") | |
| if self.transition == "square": | |
| K.set_value(self.alpha, 4.1 * tf.pow(epoch * 0.001 - 0.65, 2) + 0.002) | |
| print("exp") | |
| if self.transition == "quad": | |
| K.set_value(self.alpha, 33 * tf.pow(epoch * 0.001 - 0.65, 4) + 0.002) | |
| print("quad") | |
| """Model Utils""" | |
| def mean_percentile_rank(y_true, y_pred, k=5): | |
| """ | |
| @paper | |
| The first evaluation measure is the Mean Percentile Rank | |
| (MPR) which is computed per synthesizer parameter. | |
| """ | |
| # TODO | |
| def top_k_mean_accuracy(y_true, y_pred, k=5): | |
| """ | |
| @ paper | |
| The top-k mean accuracy is obtained by computing the top-k | |
| accuracy for each test example and then taking the mean across | |
| all examples. In the same manner as done in the MPR analysis, | |
| we compute the top-k mean accuracy per synthesizer | |
| parameter for π = 1, ... ,5. | |
| """ | |
| # TODO: per parameter? | |
| original_shape = tf.shape(y_true) | |
| y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1])) | |
| y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1])) | |
| top_k = K.in_top_k(y_pred, tf.cast(tf.argmax(y_true, axis=-1), "int32"), k) | |
| correct_pred = tf.reshape(top_k, original_shape[:-1]) | |
| return tf.reduce_mean(tf.cast(correct_pred, tf.float32)) | |
| def CustomLoss(y_true, y_pred): | |
| bce = tf.keras.losses.BinaryCrossentropy() | |
| weights = custom_spectral_loss(y_true, y_pred) | |
| weight_shift = (1 - weight_var.numpy()) + (weight_var.numpy() * weights.numpy()) | |
| # tf.print(f"New weight is {weight_shift}") | |
| loss = bce(y_true, y_pred, sample_weight=weight_shift) | |
| return loss | |
| def custom_spectral_loss(y_true, y_pred): | |
| # tf.print("After compiling model :",tf.executing_eagerly()) | |
| y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[-1])) | |
| y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[-1])) | |
| # Assuming y_true and y_pred contain parameters for audio synthesis | |
| # Extract parameters from y_true and y_pred | |
| with open("test_datasets/InverSynth_params.pckl", "rb") as f: | |
| parameters: ParameterSet = load(f) | |
| predlist_true: List[ParamValue] = parameters.decode(y_true[0]) | |
| predlist_pred: List[ParamValue] = parameters.decode(y_pred[0]) | |
| # Convert parameter lists to DataFrames | |
| # Generate audio from parameters | |
| audio_true, penalty = generate_audio(predlist_true) | |
| audio_pred, penalty = generate_audio(predlist_pred) | |
| # Compute spectrogram | |
| if SPECTRO_TYPE == "spectro": | |
| spectrogram_true = tf.math.abs( | |
| tf.signal.stft(audio_true, frame_length=1024, frame_step=512) | |
| ) | |
| spectrogram_pred = tf.math.abs( | |
| tf.signal.stft(audio_pred, frame_length=1024, frame_step=512) | |
| ) | |
| elif SPECTRO_TYPE == "qtrans": | |
| spectrogram_true = librosa.amplitude_to_db( | |
| librosa.cqt(audio_true, sr=SAMPLE_RATE, hop_length=128), ref=np.max | |
| ) | |
| spectrogram_pred = librosa.amplitude_to_db( | |
| librosa.cqt(audio_pred, sr=SAMPLE_RATE, hop_length=128), ref=np.max | |
| ) | |
| elif SPECTRO_TYPE == "mel": | |
| mel_spect = librosa.feature.melspectrogram( | |
| audio_true, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024 | |
| ) | |
| spectrogram_true = librosa.power_to_db(mel_spect, ref=np.max) | |
| mel_spect = librosa.feature.melspectrogram( | |
| audio_pred, sr=SAMPLE_RATE, n_fft=2048, hop_length=1024 | |
| ) | |
| spectrogram_pred = librosa.power_to_db(mel_spect, ref=np.max) | |
| # L1 LOSS | |
| if LOSS_TYPE == "L1": | |
| spectral_loss = penalty * tf.reduce_mean( | |
| tf.abs(spectrogram_true - spectrogram_pred) | |
| ) | |
| # L2 LOSS | |
| elif LOSS_TYPE == "L2": | |
| spectral_loss = penalty * tf.reduce_mean( | |
| (spectrogram_true - spectrogram_pred) ** 2 | |
| ) | |
| # COSINE LOSS | |
| elif LOSS_TYPE == "COSINE": | |
| spectral_loss = tf.losses.cosine_distance( | |
| spectrogram_true, spectrogram_pred, weights=1.0, axis=-1 | |
| ) | |
| return spectral_loss | |
| def compare(target, prediction, params, precision=1, print_output=False): | |
| if print_output and len(prediction) < 10: | |
| print(prediction) | |
| print("Pred: {}".format(np.round(prediction, decimals=2))) | |
| print("PRnd: {}".format(np.round(prediction))) | |
| print("Act : {}".format(target)) | |
| print("+" * 5) | |
| pred: List[ParamValue] = params.decode(prediction) | |
| act: List[ParamValue] = params.decode(target) | |
| pred_index: List[int] = [np.array(p.encoding).argmax() for p in pred] | |
| act_index: List[int] = [np.array(p.encoding).argmax() for p in act] | |
| width = 8 | |
| names = "Parameter: " | |
| act_s = "Actual: " | |
| pred_s = "Predicted: " | |
| pred_i = "Pred. Indx:" | |
| act_i = "Act. Index:" | |
| diff_i = "Index Diff:" | |
| for p in act: | |
| names += p.name.rjust(width)[:width] | |
| act_s += f"{p.value:>8.2f}" | |
| for p in pred: | |
| pred_s += f"{p.value:>8.2f}" | |
| for p in pred_index: | |
| pred_i += f"{p:>8}" | |
| for p in act_index: | |
| act_i += f"{p:>8}" | |
| for i in range(len(act_index)): | |
| diff = pred_index[i] - act_index[i] | |
| diff_i += f"{diff:>8}" | |
| exact = 0.0 | |
| close = 0.0 | |
| n_params = len(pred_index) | |
| for i in range(n_params): | |
| if pred_index[i] == act_index[i]: | |
| exact = exact + 1.0 | |
| if abs(pred_index[i] - act_index[i]) <= precision: | |
| close = close + 1.0 | |
| exact_ratio = exact / n_params | |
| close_ratio = close / n_params | |
| if print_output: | |
| print(names) | |
| print(act_s) | |
| print(pred_s) | |
| print(act_i) | |
| print(pred_i) | |
| print(diff_i) | |
| print("-" * 30) | |
| return exact_ratio, close_ratio | |
| def evaluate( | |
| prediction: np.ndarray, | |
| x: np.ndarray, | |
| y: np.ndarray, | |
| params: ParameterSet, | |
| ): | |
| print("Prediction Shape: {}".format(prediction.shape)) | |
| num: int = x.shape[0] | |
| correct: int = 0 | |
| correct_r: float = 0.0 | |
| close_r: float = 0.0 | |
| for i in range(num): | |
| should_print = i < 5 | |
| exact, close = compare( | |
| target=y[i], | |
| prediction=prediction[i], | |
| params=params, | |
| print_output=should_print, | |
| ) | |
| if exact == 1.0: | |
| correct = correct + 1 | |
| correct_r += exact | |
| close_r += close | |
| summary = params.explain() | |
| print( | |
| "{} Parameters with {} levels (fixed: {})".format( | |
| summary["n_variable"], summary["levels"], summary["n_fixed"] | |
| ) | |
| ) | |
| print( | |
| "Got {} out of {} ({:.1f}% perfect); Exact params: {:.1f}%, Close params: {:.1f}%".format( | |
| correct, | |
| num, | |
| correct / num * 100, | |
| correct_r / num * 100, | |
| close_r / num * 100, | |
| ) | |
| ) | |
| """ | |
| Wrap up the whole training process in a standard function. Gets a callback | |
| to actually make the model, to keep it as flexible as possible. | |
| # Params: | |
| # - dataset_name (dataset name) | |
| # - model_name: (C1..C6,e2e) | |
| # - model_callback: function taking name,inputs,outputs,data_format and returning a Keras model | |
| # - epochs: int | |
| # - dataset_dir: place to find input data | |
| # - output_dir: place to put outputs | |
| # - parameters_file (override parameters filename) | |
| # - dataset_file (override dataset filename) | |
| # - data_format (channels_first or channels_last) | |
| # - run_name: to save this run as | |
| """ | |
| # LOSS TYPE FOR CUSTOM LOSS FUNCTION | |
| LOSS_TYPE = "L1" | |
| SPECTRO_TYPE = "spectro" | |
| PRINT = 1 | |
| # DAWDREAMER EXPORT SETTINGS | |
| SAMPLE_RATE = 16384 | |
| BUFFER_SIZE = 1024 | |
| SYNTH_PLUGIN = 'libTAL-NoiseMaker.so' | |
| # SYNTH_PLUGIN = "TAL-NoiseMaker.vst3" | |
| ENGINE = daw.RenderEngine(SAMPLE_RATE, BUFFER_SIZE) | |
| SYNTH = ENGINE.make_plugin_processor("my_synth", SYNTH_PLUGIN) | |
| SYNTH.add_midi_note(40, 127, 0, 0.8) | |
| with open("plugin_config/TAL-NoiseMaker-config.json") as f: | |
| data = json.load(f) | |
| dico = [] | |
| # Extract the key ID from the JSON data | |
| key_id = data["parameters"] | |
| for param in key_id: | |
| dico.append(param["id"]) | |
| DICO = dico | |
| def train_model( | |
| # Main options | |
| dataset_name: str, | |
| model_name: str, | |
| epochs: int, | |
| model_callback: Callable[[str, int, int, str], keras.Model], | |
| dataset_dir: str, | |
| output_dir: str, # Directory names | |
| dataset_file: str = None, | |
| parameters_file: str = None, | |
| run_name: str = None, | |
| data_format: str = "channels_last", | |
| save_best: bool = True, | |
| resume: bool = False, | |
| checkpoint: bool = True, | |
| model_type: str = "STFT", | |
| ): | |
| tf.config.run_functions_eagerly(True) | |
| # tf.data.experimental.enable_debug_mode() | |
| time_generated = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") | |
| if not dataset_file: | |
| dataset_file = ( | |
| os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_data.hdf5" | |
| ) | |
| if not parameters_file: | |
| parameters_file = ( | |
| os.getcwd() + "/" + dataset_dir + "/" + dataset_name + "_params.pckl" | |
| ) | |
| if not run_name: | |
| run_name = dataset_name + "_" + model_name | |
| model_file = f"{output_dir}/model/{run_name}_{time_generated}" | |
| if not os.path.exists(model_file): | |
| os.makedirs(model_file) | |
| best_model_file = f"{output_dir}/best_checkpoint/{run_name}_best_{time_generated}" | |
| if not os.path.exists(best_model_file): | |
| os.makedirs(best_model_file) | |
| if resume: | |
| # checkpoint_model_file = f"{output_dir}/{run_name}_checkpoint_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}" | |
| # history_file = f"{output_dir}/{run_name}_{datetime.datetime.now().strftime('%Y%m%d-%H%M%S')}" | |
| checkpoint_model_file = ( | |
| f"{output_dir}/checkpoints/model" | |
| ) | |
| history_file = f"{output_dir}/history/model" | |
| print(tf.config.list_physical_devices("GPU")) | |
| gpu_avail = len(tf.config.list_physical_devices("GPU")) # True/False | |
| cuda_gpu_avail = len(tf.config.list_physical_devices("GPU")) # True/False | |
| print("+" * 30) | |
| print(f"++ {run_name}") | |
| print( | |
| f"Running model: {model_name} on dataset {dataset_file} (parameters {parameters_file}) for {epochs} epochs" | |
| ) | |
| print(f"Saving model in {output_dir} as {model_file}") | |
| print(f"Saving history as {history_file}") | |
| print(f"GPU: {gpu_avail}, with CUDA: {cuda_gpu_avail}") | |
| print("+" * 30) | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Get training and validation generators | |
| params = {"data_file": dataset_file, "batch_size": 64, "shuffle": True} | |
| model: keras.Model = None | |
| if resume and os.path.exists(checkpoint_model_file): | |
| history = pd.read_csv(history_file) | |
| # Note - its zero indexed in the file, but 1 indexed in the display | |
| initial_epoch: int = max(history.iloc[:, 0]) + 1 | |
| # epochs:int = initial_epoch | |
| print( | |
| f"Resuming from model file: {checkpoint_model_file} after epoch {initial_epoch}" | |
| ) | |
| model = keras.models.load_model( | |
| checkpoint_model_file, | |
| custom_objects={ | |
| "top_k_mean_accuracy": top_k_mean_accuracy, | |
| "Spectrogram": Spectrogram, | |
| "custom_spectral_loss": custom_spectral_loss, | |
| "CustomLoss": CustomLoss, | |
| }, | |
| ) | |
| return model, parameters_file | |
| def inference(model: keras.Model, parameters_file: str, file_path: str, file_id: str): | |
| # Start infer | |
| with open(parameters_file, "rb") as f: | |
| parameters: ParameterSet = load(f) | |
| print("++++" * 5) | |
| print("Pushing to trained model") | |
| print("++++" * 5) | |
| Valid = False | |
| while Valid == False: | |
| namefile = file_path | |
| if os.path.exists(namefile): | |
| Valid = True | |
| else: | |
| raise("File Path invalid, try again ") | |
| try: | |
| newpred = model.predict(audio_importer(str(f"{namefile}"))) | |
| except: | |
| raise "Crashed" | |
| predlist: List[ParamValue] = parameters.decode(newpred[0]) | |
| df = pd.DataFrame(predlist) | |
| print(df) | |
| df = df.drop(["encoding"], axis=1) | |
| # saving the dataframe | |
| print("Outputting CSV config in " + str(f"temp/")) | |
| csv_path = str(f"temp/{file_id}_config.csv") | |
| xml_path_wow = (f"temp/{file_id}_config.noisemakerpreset") | |
| df.to_csv(csv_path) | |
| xml_path = convert_csv_to_preset(csv_path, xml_path_wow) | |
| # export(prediction, X, y, parameters) | |
| # Loop through the rows of the DataFrame | |
| i = 0 | |
| for values in df["value"].values: | |
| # Set parameters using DataFrame values | |
| SYNTH.set_parameter(DICO[i], values) | |
| # (MIDI note, velocity, start, duration) | |
| i += 1 | |
| # Setting volume to 0.9 | |
| SYNTH.set_parameter(1, 0.9) | |
| # Set up the processing graph | |
| graph = [ | |
| # synth takes no inputs, so we give an empty list. | |
| (SYNTH, []), | |
| ] | |
| ENGINE.load_graph(graph) | |
| ENGINE.render(1) | |
| data = ENGINE.get_audio() | |
| try: | |
| data = librosa.to_mono(data).transpose() | |
| except: | |
| tf.print("ERROR" * 100) | |
| df = df.fillna(0) | |
| data = df.to_numpy() | |
| data = librosa.to_mono(data).transpose() | |
| tf.print("crashed, nan in generation") | |
| synth_params = dict(SYNTH.get_patch()) | |
| print(synth_params) | |
| df = pd.DataFrame(data) | |
| # penalty=1000000 | |
| # df = pd.DataFrame(data) | |
| # df = df.fillna(0) | |
| # data = df.to_numpy() | |
| output_file_path = str(f"temp/{file_id}_generated.wav") | |
| wavfile.write(output_file_path, SAMPLE_RATE, data) | |
| return file_path, xml_path, output_file_path | |
| def generate_audio(df_params): | |
| # Loop through the rows of the DataFrame | |
| i = 0 | |
| penalty = 1 | |
| for param in df_params: | |
| # Set parameters using DataFrame values | |
| SYNTH.set_parameter(DICO[i], param.value) | |
| # (MIDI note, velocity, start, duration) | |
| i += 1 | |
| # Set up the processing graph | |
| graph = [ | |
| # synth takes no inputs, so we give an empty list. | |
| (SYNTH, []), | |
| ] | |
| ENGINE.load_graph(graph) | |
| ENGINE.render(1) | |
| data = ENGINE.get_audio() | |
| try: | |
| data = librosa.to_mono(data).transpose() | |
| except: | |
| print("ERROR" * 100) | |
| df = pd.DataFrame(data) | |
| df = df.fillna(0) | |
| data = df.to_numpy() | |
| data = librosa.to_mono(data).transpose() | |
| result = np.array(data) | |
| return result, penalty | |