File size: 3,926 Bytes
86694c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import h5py
import numpy as np
from scipy.io import wavfile
from tensorflow import keras


class SoundDataGenerator(keras.utils.Sequence):
    "Generates data for Keras"

    def __init__(
        self,
        data_file=None,
        batch_size=32,
        n_samps=16384,
        shuffle=True,
        last: float = 0.0,
        first: float = 0.0,
        channels_last=False,
        for_autoencoder=False,
    ):
        "Initialization"
        self.dim = (1, n_samps)
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.data_file = data_file
        self.n_channels = 1
        self.for_autoencoder = for_autoencoder
        # For the E2E model, need to return channels last?
        if channels_last:
            self.expand_axis = 2
        else:
            self.expand_axis = 1

        database = h5py.File(data_file, "r")

        self.database = database

        self.n_samps = self.read_file(0).shape[0]
        print("N Samps in audio data: {}".format(self.n_samps))

        # set up list of IDs from data files
        n_points = len(database["files"])
        self.list_IDs = range(len(database["files"]))

        print(f"Number of examples in dataset: {len(self.list_IDs)}")
        slice: int = 0
        if last > 0.0:
            slice = int(n_points * (1 - last))
            self.list_IDs = self.list_IDs[slice:]
            print(f"Taking Last N points: {len(self.list_IDs)}")
        elif first > 0.0:
            slice = int(n_points * first)
            self.list_IDs = self.list_IDs[:slice]
            print(f"Taking First N points: {len(self.list_IDs)}")

        # set up label size from data files
        self.label_size = len(database["labels"][0])
        self.on_epoch_end()

    def get_audio_length(self):
        return self.n_samps

    def get_label_size(self):
        return self.label_size

    def __len__(self):
        "Denotes the number of batches per epoch"
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        "Generate one batch of data"
        # Generate indexes of the batch
        indexes = self.indexes[index * self.batch_size : (index + 1) * self.batch_size]

        # Find list of IDs
        # list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(indexes)

        # print("Returning data! Got X: {}, y: {}".format(X.shape,y.shape))
        return X, y

    def on_epoch_end(self):
        "Updates indexes after each epoch"
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle is True:
            np.random.shuffle(self.indexes)

    # Think this makes things worse - fills up memory
    # @lru_cache(maxsize=150000)
    def read_file(self, index):
        filename = self.database["files"][index]
        fs, data = wavfile.read(filename)
        return data

    def __data_generation(self, list_IDs_temp):
        # X : (n_samples, *dim, n_channels)
        "Generates data containing batch_size samples"
        # Initialization    
        # X = np.empty((self.batch_size, *self.dim))
        # y = np.empty((self.batch_size), dtype=int)

        # Generate data
        X = []
        y = []
        for i in list_IDs_temp:
            # Read labels
            y.append(self.database["labels"][i])
            # Load soundfile data
            data = self.read_file(i)
            if data.shape[0] > self.n_samps:
                print(
                    "Warning - too many samples: {} > {}".format(
                        data.shape[0], self.n_samps
                    )
                )
            X.append(data[: self.n_samps])
        Xd = np.expand_dims(np.vstack(X), axis=1)
        # Xd = Xd.flatten()
        Xd = Xd.reshape((X.__len__(), 1, self.n_samps))
        yd = np.vstack(y)

        if self.for_autoencoder:
            return yd, yd
        return Xd, yd