Spaces:
Running
Running
| """Utility functions for pandas operations""" | |
| from typing import List | |
| import numpy as np | |
| import pandas as pd | |
| def apply_filters(df: pd.DataFrame, filters: dict, reset_index=False): | |
| """ | |
| Filters df based on given filters (key-values pairs). | |
| """ | |
| import omegaconf | |
| X = df.copy() | |
| all_indices = [] | |
| for col, values in filters.items(): | |
| if isinstance(values, (list, tuple, np.ndarray, omegaconf.listconfig.ListConfig)): | |
| indices = X[col].isin(list(values)) | |
| else: | |
| indices = X[col] == values | |
| all_indices.append(indices) | |
| # print(col, values, len(indices), sum(indices)) | |
| # X = X[indices] | |
| if len(all_indices): | |
| all_indices = np.array(all_indices) | |
| indices = np.all(all_indices, axis=0) | |
| X = X[indices] | |
| if reset_index: | |
| X = X.reset_index(drop=True) | |
| return X | |
| def apply_antifilters(df: pd.DataFrame, filters: dict, reset_index=False): | |
| """ | |
| Filters df removing rows for given filters (key-values pairs). | |
| """ | |
| X = df.copy() | |
| for col, values in filters.items(): | |
| if isinstance(values, (list, tuple, np.ndarray)): | |
| indices = X[col].isin(list(values)) | |
| else: | |
| indices = X[col] == values | |
| X = X[~indices] | |
| if reset_index: | |
| X = X.reset_index(drop=True) | |
| return X | |
| def custom_eval(x): | |
| """Splits string '["a", "b", "c"]' into ["a", "b", "c"].""" | |
| if isinstance(x, str): | |
| x = x.replace('[', '') | |
| x = x.replace(']', '') | |
| x = x.split(',') | |
| x = [y.rstrip().lstrip() for y in x] | |
| return x | |
| else: | |
| return ['NA'] | |
| def split_column_into_columns(df, column): | |
| """ | |
| For given df, splits `column` containing values like '["a", "b"]' | |
| into one-hot subcolumns like a. b with `Yes`/`No` values. | |
| """ | |
| df[column] = df[column].apply(custom_eval) | |
| unique_values = [] | |
| for i in range(len(df)): | |
| index = df.index[i] | |
| list_of_values = df.loc[index, column] | |
| for x in list_of_values: | |
| if (x != 'NA') and (x != ''): | |
| df.at[index, x] = 'Yes' | |
| if x not in unique_values: | |
| unique_values.append(x) | |
| df[unique_values] = df[unique_values].fillna('No') | |
| df[f'any_{column}'] = df[unique_values].apply( | |
| lambda x: 'Yes' if 'Yes' in list(x) else 'No', axis=1 | |
| ) | |
| return df | |
| def custom_read_csv(path: str, columns_to_onehot: List) -> pd.DataFrame: | |
| """Custom CSV reader | |
| Args: | |
| path (str): path to .csv file | |
| columns_to_onehot (List): list of columns to one-hotify | |
| Returns: | |
| pd.DataFrame: loaded df | |
| """ | |
| df = pd.read_csv(path) | |
| for column in columns_to_onehot: | |
| df = split_column_into_columns(df, column) | |
| return df | |
| def split_df(df, test_size=0.2): | |
| from sklearn.model_selection import train_test_split | |
| # split the dataframe into train and test sets | |
| train_df, test_df = train_test_split(df, test_size=test_size, random_state=42) | |
| # split the train set into train and validation sets | |
| train_df, val_df = train_test_split(train_df, test_size=test_size, random_state=42) | |
| return train_df, val_df, test_df | |