Spaces:
Sleeping
Sleeping
""" | |
Code from: | |
https://github.com/DCASE-REPO/DESED_task | |
""" | |
from collections import OrderedDict | |
import numpy as np | |
import pandas as pd | |
from dcase_util.data import DecisionEncoder | |
class ManyHotEncoder: | |
"""" | |
Adapted after DecisionEncoder.find_contiguous_regions method in | |
https://github.com/DCASE-REPO/dcase_util/blob/master/dcase_util/data/decisions.py | |
Encode labels into numpy arrays where 1 correspond to presence of the class and 0 absence. | |
Multiple 1 can appear on the same line, it is for multi label problem. | |
Args: | |
labels: list, the classes which will be encoded | |
n_frames: int, (Default value = None) only useful for strong labels. The number of frames of a segment. | |
Attributes: | |
labels: list, the classes which will be encoded | |
n_frames: int, only useful for strong labels. The number of frames of a segment. | |
""" | |
def __init__( | |
self, labels, audio_len=10, frame_hop=160, net_pooling=4, fs=16000 | |
): | |
if type(labels) in [np.ndarray, np.array]: | |
labels = labels.tolist() | |
elif isinstance(labels, (dict, OrderedDict)): | |
labels = list(labels.keys()) | |
self.labels = labels | |
self.audio_len = audio_len | |
self.frame_hop = frame_hop | |
self.fs = fs | |
self.net_pooling = net_pooling | |
n_frames = self.audio_len * self.fs | |
self.n_frames = int(int((n_frames / self.frame_hop)) / self.net_pooling) | |
def encode_weak(self, labels): | |
""" Encode a list of weak labels into a numpy array | |
Args: | |
labels: list, list of labels to encode (to a vector of 0 and 1) | |
Returns: | |
numpy.array | |
A vector containing 1 for each label, and 0 everywhere else | |
""" | |
# useful for tensor empty labels | |
if type(labels) is str: | |
if labels == "empty": | |
y = np.zeros(len(self.labels)) - 1 | |
return y | |
else: | |
labels = labels.split(",") | |
if type(labels) is pd.DataFrame: | |
if labels.empty: | |
labels = [] | |
elif "event_label" in labels.columns: | |
labels = labels["event_label"] | |
y = np.zeros(len(self.labels)) | |
for label in labels: | |
if not pd.isna(label): | |
i = self.labels.index(label) | |
y[i] = 1 | |
return y | |
def _time_to_frame(self, time): | |
samples = time * self.fs | |
frame = (samples) / self.frame_hop | |
return np.clip(frame / self.net_pooling, a_min=0, a_max=self.n_frames) | |
def _frame_to_time(self, frame): | |
frame = frame * self.net_pooling / (self.fs / self.frame_hop) | |
return np.clip(frame, a_min=0, a_max=self.audio_len) | |
def encode_strong_df(self, label_df): | |
"""Encode a list (or pandas Dataframe or Serie) of strong labels, they correspond to a given filename | |
Args: | |
label_df: pandas DataFrame or Series, contains filename, onset (in frames) and offset (in frames) | |
If only filename (no onset offset) is specified, it will return the event on all the frames | |
onset and offset should be in frames | |
Returns: | |
numpy.array | |
Encoded labels, 1 where the label is present, 0 otherwise | |
""" | |
assert any( | |
[x is not None for x in [self.audio_len, self.frame_hop]] | |
) | |
samples_len = self.n_frames | |
if type(label_df) is str: | |
if label_df == "empty": | |
y = np.zeros((samples_len, len(self.labels))) - 1 | |
return y | |
y = np.zeros((samples_len, len(self.labels))) | |
if type(label_df) is pd.DataFrame: | |
if {"onset", "offset", "event_label"}.issubset(label_df.columns): | |
for _, row in label_df.iterrows(): | |
if not pd.isna(row["event_label"]): | |
i = self.labels.index(row["event_label"]) | |
onset = int(self._time_to_frame(row["onset"])) | |
offset = int(np.ceil(self._time_to_frame(row["offset"]))) | |
if "confidence" in label_df.columns: | |
y[onset:offset, i] = row["confidence"] # support confidence | |
else: | |
y[ | |
onset:offset, i | |
] = 1 # means offset not included (hypothesis of overlapping frames, so ok) | |
elif type(label_df) in [ | |
pd.Series, | |
list, | |
np.ndarray, | |
]: # list of list or list of strings | |
if type(label_df) is pd.Series: | |
if {"onset", "offset", "event_label"}.issubset( | |
label_df.index | |
): # means only one value | |
if not pd.isna(label_df["event_label"]): | |
i = self.labels.index(label_df["event_label"]) | |
onset = int(self._time_to_frame(label_df["onset"])) | |
offset = int(np.ceil(self._time_to_frame(label_df["offset"]))) | |
if "confidence" in label_df.columns: | |
y[onset:offset, i] = label_df["confidence"] | |
else: | |
y[onset:offset, i] = 1 | |
return y | |
for event_label in label_df: | |
# List of string, so weak labels to be encoded in strong | |
if type(event_label) is str: | |
if event_label != "": | |
i = self.labels.index(event_label) | |
y[:, i] = 1 | |
# List of list, with [label, onset, offset] | |
elif len(event_label) == 3: | |
if event_label[0] != "": | |
i = self.labels.index(event_label[0]) | |
onset = int(self._time_to_frame(event_label[1])) | |
offset = int(np.ceil(self._time_to_frame(event_label[2]))) | |
y[onset:offset, i] = 1 | |
# List of list, with [label, onset, offset, confidence] | |
elif len(event_label) == 4: | |
if event_label[0] != "": | |
i = self.labels.index(event_label[0]) | |
onset = int(self._time_to_frame(event_label[1])) | |
offset = int(np.ceil(self._time_to_frame(event_label[2]))) | |
y[onset:offset, i] = event_label[3] | |
else: | |
raise NotImplementedError( | |
"cannot encode strong, type mismatch: {}".format( | |
type(event_label) | |
) | |
) | |
else: | |
raise NotImplementedError( | |
"To encode_strong, type is pandas.Dataframe with onset, offset and event_label" | |
"columns, or it is a list or pandas Series of event labels, " | |
"type given: {}".format(type(label_df)) | |
) | |
return y | |
def decode_weak(self, labels): | |
""" Decode the encoded weak labels | |
Args: | |
labels: numpy.array, the encoded labels to be decoded | |
Returns: | |
list | |
Decoded labels, list of string | |
""" | |
result_labels = [] | |
for i, value in enumerate(labels): | |
if value == 1: | |
result_labels.append(self.labels[i]) | |
return result_labels | |
def decode_strong(self, labels): | |
""" Decode the encoded strong labels | |
Args: | |
labels: numpy.array, the encoded labels to be decoded | |
Returns: | |
list | |
Decoded labels, list of list: [[label, onset offset], ...] | |
""" | |
result_labels = [] | |
for i, label_column in enumerate(labels.T): | |
change_indices = DecisionEncoder().find_contiguous_regions(label_column) | |
# append [label, onset, offset] in the result list | |
for row in change_indices: | |
result_labels.append( | |
[ | |
self.labels[i], | |
self._frame_to_time(row[0]), | |
self._frame_to_time(row[1]), | |
] | |
) | |
return result_labels | |
def state_dict(self): | |
return { | |
"labels": self.labels, | |
"audio_len": self.audio_len, | |
"frame_hop": self.frame_hop, | |
"net_pooling": self.net_pooling, | |
"fs": self.fs, | |
} | |
def load_state_dict(cls, state_dict): | |
labels = state_dict["labels"] | |
audio_len = state_dict["audio_len"] | |
frame_hop = state_dict["frame_hop"] | |
net_pooling = state_dict["net_pooling"] | |
fs = state_dict["fs"] | |
return cls(labels, audio_len, frame_hop, net_pooling, fs) | |