Spaces:
Sleeping
Sleeping
| import h5py | |
| import numpy as np | |
| from scipy.io import wavfile | |
| from tensorflow import keras | |
| class SoundDataGenerator(keras.utils.Sequence): | |
| "Generates data for Keras" | |
| def __init__( | |
| self, | |
| data_file=None, | |
| batch_size=32, | |
| n_samps=16384, | |
| shuffle=True, | |
| last: float = 0.0, | |
| first: float = 0.0, | |
| channels_last=False, | |
| for_autoencoder=False, | |
| ): | |
| "Initialization" | |
| self.dim = (1, n_samps) | |
| self.batch_size = batch_size | |
| self.shuffle = shuffle | |
| self.data_file = data_file | |
| self.n_channels = 1 | |
| self.for_autoencoder = for_autoencoder | |
| # For the E2E model, need to return channels last? | |
| if channels_last: | |
| self.expand_axis = 2 | |
| else: | |
| self.expand_axis = 1 | |
| database = h5py.File(data_file, "r") | |
| self.database = database | |
| self.n_samps = self.read_file(0).shape[0] | |
| print("N Samps in audio data: {}".format(self.n_samps)) | |
| # set up list of IDs from data files | |
| n_points = len(database["files"]) | |
| self.list_IDs = range(len(database["files"])) | |
| print(f"Number of examples in dataset: {len(self.list_IDs)}") | |
| slice: int = 0 | |
| if last > 0.0: | |
| slice = int(n_points * (1 - last)) | |
| self.list_IDs = self.list_IDs[slice:] | |
| print(f"Taking Last N points: {len(self.list_IDs)}") | |
| elif first > 0.0: | |
| slice = int(n_points * first) | |
| self.list_IDs = self.list_IDs[:slice] | |
| print(f"Taking First N points: {len(self.list_IDs)}") | |
| # set up label size from data files | |
| self.label_size = len(database["labels"][0]) | |
| self.on_epoch_end() | |
| def get_audio_length(self): | |
| return self.n_samps | |
| def get_label_size(self): | |
| return self.label_size | |
| def __len__(self): | |
| "Denotes the number of batches per epoch" | |
| return int(np.floor(len(self.list_IDs) / self.batch_size)) | |
| def __getitem__(self, index): | |
| "Generate one batch of data" | |
| # Generate indexes of the batch | |
| indexes = self.indexes[index * self.batch_size : (index + 1) * self.batch_size] | |
| # Find list of IDs | |
| # list_IDs_temp = [self.list_IDs[k] for k in indexes] | |
| # Generate data | |
| X, y = self.__data_generation(indexes) | |
| # print("Returning data! Got X: {}, y: {}".format(X.shape,y.shape)) | |
| return X, y | |
| def on_epoch_end(self): | |
| "Updates indexes after each epoch" | |
| self.indexes = np.arange(len(self.list_IDs)) | |
| if self.shuffle is True: | |
| np.random.shuffle(self.indexes) | |
| # Think this makes things worse - fills up memory | |
| # @lru_cache(maxsize=150000) | |
| def read_file(self, index): | |
| filename = self.database["files"][index] | |
| fs, data = wavfile.read(filename) | |
| return data | |
| def __data_generation(self, list_IDs_temp): | |
| # X : (n_samples, *dim, n_channels) | |
| "Generates data containing batch_size samples" | |
| # Initialization | |
| # X = np.empty((self.batch_size, *self.dim)) | |
| # y = np.empty((self.batch_size), dtype=int) | |
| # Generate data | |
| X = [] | |
| y = [] | |
| for i in list_IDs_temp: | |
| # Read labels | |
| y.append(self.database["labels"][i]) | |
| # Load soundfile data | |
| data = self.read_file(i) | |
| if data.shape[0] > self.n_samps: | |
| print( | |
| "Warning - too many samples: {} > {}".format( | |
| data.shape[0], self.n_samps | |
| ) | |
| ) | |
| X.append(data[: self.n_samps]) | |
| Xd = np.expand_dims(np.vstack(X), axis=1) | |
| # Xd = Xd.flatten() | |
| Xd = Xd.reshape((X.__len__(), 1, self.n_samps)) | |
| yd = np.vstack(y) | |
| if self.for_autoencoder: | |
| return yd, yd | |
| return Xd, yd | |