| from collections import Counter, defaultdict |
| from ortools.linear_solver import pywraplp |
| import random |
| from omegaconf import DictConfig |
| import pandas as pd |
| from pathlib import Path |
| import os |
| import numpy as np |
| from sklearn.model_selection import train_test_split |
| from dpacman.data_tasks.fimo.post_fimo import get_reverse_complement |
| import json |
| import rootutils |
| from dpacman.utils import pylogger |
|
|
| root = rootutils.setup_root(__file__, indicator=".project-root", pythonpath=True) |
| logger = pylogger.RankedLogger(__name__, rank_zero_only=True) |
|
|
| def split_with_predefined_test( |
| full_df = pd.DataFrame(), |
| split_names=("train", "val", "test"), |
| test_trs=None, |
| test_dnas=None, |
| ratios=(0.8, 0.1, 0.1), |
| ): |
| """ |
| Method for splitting into train and val with a predefined test set. |
| The proteins in the test set, and the DNA clusters of the DNAs they're associated with, must be excluded from train and val. |
| The remaining rows for train and val are split to preserve 80/10/10 as best as possible. |
| """ |
| full_df[""] |
| test = full_df.copy(deep=True) |
| if test_trs is not None: |
| test = test.loc[test["tr_seqid"].isin(test_trs)].reset_index(drop=True) |
| if test_dnas is not None: |
| test = test.loc[test["dna_seqid"].isin(test_dnas)].reset_index(drop=True) |
| |
| tr_clusters_to_exclude = test["tr_cluster_rep"].unique().tolist() |
| dna_clusters_to_exclude = test["dna_cluster_rep"].unique().tolist() |
| |
| remaining = full_df.loc[ |
| (~full_df["tr_cluster_rep"].isin(tr_clusters_to_exclude)) & |
| (~full_df["dna_cluster_rep"].isin(dna_clusters_to_exclude)) |
| ].reset_index(drop=True) |
| |
| test_ids = test["ID"].unique().tolist() |
| remaining_ids = remaining["ID"].unique().tolist() |
| remaining_clusters = remaining["dna_cluster_rep"].unique().tolis() |
| lost_rows = full_df.loc[ |
| (~full_df["ID"].isin(test_ids)) & |
| (~full_df["ID"].isin(remaining_ids)) |
| ] |
| |
| logger.info(f"Rows in test: {len(test)}") |
| logger.info(f"Rows to be split between train and val: {len(remaining)}") |
| total_rows = len(test) + len(remaining) |
| logger.info(f"Total rows: {total_rows}. Test percentage: {100*len(test)/total_rows:.2f}%") |
| logger.info(f"Lost rows: {len(lost_rows)}") |
| |
| train_ratio_from_remaining = round((0.8*total_rows)/len(remaining), 2) |
| |
| test_size_1 = 1 - train_ratio_from_remaining |
| logger.info( |
| f"\tPerforming first split: non-test clusters -> train clusters ({round(1-test_size_1,3)}) and val ({test_size_1})" |
| ) |
| X = remaining_clusters |
| y = [0] * len(remaining_clusters) |
| X_train, X_val, y_train, y_val = train_test_split( |
| X, y, test_size=test_size_1, random_state=0 |
| ) |
|
|
| train = remaining.loc[remaining["dna_cluster_rep"].isin(X_train)] |
| val = remaining.loc[remaining["dna_cluster_rep"].isin(X_val)] |
| leaky_test = lost_rows |
| |
| splits = { |
| "train": train, |
| "val": val, |
| "test": test, |
| "leaky_test": leaky_test |
| } |
| return splits |
|
|
| def split_bipartite_fast( |
| dna_clusters, |
| split_names=("train", "val", "test"), |
| ratios=(0.8, 0.1, 0.1), |
| ): |
| |
| test_size_1 = 0.2 |
| test_size_2 = 0.5 |
| logger.info( |
| f"\tPerforming first split: all clusters -> train clusters ({round(1-test_size_1,3)}) and other ({test_size_1})" |
| ) |
| X = dna_clusters |
| y = [0] * len(dna_clusters) |
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y, test_size=test_size_1, random_state=0 |
| ) |
| logger.info( |
| f"\tPerforming second split: other -> val clusters ({round(1-test_size_2,3)}) and test clusters ({test_size_2})" |
| ) |
| X_val, X_test, y_val, y_test = train_test_split( |
| X_test, y_test, test_size=test_size_2, random_state=0 |
| ) |
|
|
| dna_assign = {} |
| for x in X_train: |
| dna_assign[x] = "train" |
| for x in X_val: |
| dna_assign[x] = "val" |
| for x in X_test: |
| dna_assign[x] = "test" |
|
|
| kept_by_split = {"train": len(X_train), "val": len(X_val), "test": len(X_test)} |
| return dna_assign, kept_by_split |
|
|
| def convert_scores(scores): |
| svec = [int(x) for x in scores.split(",")] |
| max_score = max(svec) |
| binary_svec = [0 if x<max_score else 1 for x in svec] |
| assert(svec.count(max_score)==binary_svec.count(1)) |
| binary_svec = ",".join([str(x) for x in binary_svec]) |
| return binary_svec |
|
|
| def split_bipartite_with_ratios_and_leaky( |
| edges, |
| split_names=("train", "val", "test"), |
| ratios=(0.8, 0.1, 0.1), |
| require_nonempty=False, |
| ratio_tolerance=None, |
| bigM=None, |
| shuffle_within_pair=False, |
| seed=0, |
| test_edges_must=None, |
| ): |
| """ |
| edges: list of (tf_cluster_id, dna_cluster_id). Duplicates allowed (-> weights). |
| test_edges_must: None, list of pairs, or dict {(tf,dna): required_count}. |
| - If a pair appears with required_count > 0, at least that many examples MUST be kept in TEST. |
| - This implicitly pins both clusters of that pair to TEST (cluster exclusivity). |
| |
| Returns: |
| tf_assign: {tf_cluster -> split} |
| dna_assign: {dna_cluster -> split} |
| kept_by_split: {split -> kept_count} (train/val/test only) |
| total_kept: int |
| split_to_indices: {split -> [input indices]} including 'leaky_test' |
| split_to_edges: {split -> [(tf,dna), ...]} including 'leaky_test' |
| """ |
| |
| w = Counter(edges) |
| tfs = {t for (t, _) in w} |
| dnas = {d for (_, d) in w} |
| S = list(split_names) |
| rs = dict(zip(S, ratios)) |
| N = sum(w.values()) |
| if bigM is None: |
| bigM = 1000 * max(1, N) |
|
|
| |
| pair_to_indices = defaultdict(list) |
| for idx, (c, d) in enumerate(edges): |
| pair_to_indices[(c, d)].append(idx) |
|
|
| if shuffle_within_pair: |
| rng = random.Random(seed) |
| for key in pair_to_indices: |
| rng.shuffle(pair_to_indices[key]) |
|
|
| |
| req_test = Counter() |
| if test_edges_must: |
| if isinstance(test_edges_must, dict): |
| for k, v in test_edges_must.items(): |
| if not isinstance(k, tuple) or len(k) != 2: |
| raise ValueError( |
| "test_edges_must dict keys must be (tf_cluster, dna_cluster)" |
| ) |
| if v < 0: |
| raise ValueError("required_count must be non-negative") |
| if v: |
| req_test[k] += int(v) |
| else: |
| |
| req_test = Counter(test_edges_must) |
| |
| for pair, req in req_test.items(): |
| if pair not in w: |
| raise ValueError(f"Required test pair {pair} not present in edges.") |
| if req > w[pair]: |
| raise ValueError( |
| f"Required count {req} for {pair} exceeds available {w[pair]}." |
| ) |
|
|
| |
| solver = pywraplp.Solver.CreateSolver("CBC") |
| if solver is None: |
| raise RuntimeError("Could not create CBC solver.") |
|
|
| |
| x = {(c, s): solver.BoolVar(f"x[{c},{s}]") for c in tfs for s in S} |
| y = {(d, s): solver.BoolVar(f"y[{d},{s}]") for d in dnas for s in S} |
|
|
| |
| for c in tfs: |
| solver.Add(sum(x[c, s] for s in S) == 1) |
| for d in dnas: |
| solver.Add(sum(y[d, s] for s in S) == 1) |
|
|
| |
| k = { |
| ((c, d), s): solver.IntVar(0, w[(c, d)], f"k[{c},{d},{s}]") |
| for (c, d) in w |
| for s in S |
| } |
|
|
| |
| for (c, d), wt in w.items(): |
| for s in S: |
| solver.Add(k[((c, d), s)] <= wt * x[c, s]) |
| solver.Add(k[((c, d), s)] <= wt * y[d, s]) |
|
|
| |
| for (c, d), req in req_test.items(): |
| solver.Add(k[((c, d), "test")] >= req) |
|
|
| |
| if require_nonempty: |
| for s in S: |
| solver.Add(sum(x[c, s] for c in tfs) + sum(y[d, s] for d in dnas) >= 1) |
|
|
| |
| K = {s: solver.IntVar(0, N, f"K[{s}]") for s in S} |
| for s in S: |
| solver.Add(K[s] == sum(k[((c, d), s)] for (c, d) in w)) |
| T = solver.IntVar(0, N, "T") |
| solver.Add(T == sum(K[s] for s in S)) |
|
|
| |
| dpos = {s: solver.NumVar(0, solver.infinity(), f"dpos[{s}]") for s in S} |
| dneg = {s: solver.NumVar(0, solver.infinity(), f"dneg[{s}]") for s in S} |
| for s in S: |
| solver.Add(K[s] - rs[s] * T == dpos[s] - dneg[s]) |
|
|
| |
| if ratio_tolerance is not None: |
| eps = float(ratio_tolerance) |
| for s in S: |
| solver.Add(K[s] >= (rs[s] - eps) * T) |
| solver.Add(K[s] <= (rs[s] + eps) * T) |
|
|
| |
| obj = solver.Objective() |
| obj.SetMaximization() |
| obj.SetCoefficient(T, float(bigM)) |
| for s in S: |
| obj.SetCoefficient(dpos[s], -1.0) |
| obj.SetCoefficient(dneg[s], -1.0) |
|
|
| status = solver.Solve() |
| if status not in (pywraplp.Solver.OPTIMAL, pywraplp.Solver.FEASIBLE): |
| raise RuntimeError( |
| "No feasible solution (check ratio_tolerance vs. required test edges)." |
| ) |
|
|
| |
| tf_assign = {c: next(s for s in S if x[c, s].solution_value() > 0.5) for c in tfs} |
| dna_assign = {d: next(s for s in S if y[d, s].solution_value() > 0.5) for d in dnas} |
|
|
| |
| kept_by_split = {s: int(round(K[s].solution_value())) for s in S} |
| total_kept = int(round(T.solution_value())) |
|
|
| |
| split_to_indices = {s: [] for s in S} |
| remaining_indices = {pair: list(pair_to_indices[pair]) for pair in pair_to_indices} |
|
|
| |
| for (c, d), wt in w.items(): |
| for s in S: |
| cnt = int(round(k[((c, d), s)].solution_value())) |
| if cnt > 0: |
| take = remaining_indices[(c, d)][:cnt] |
| split_to_indices[s].extend(take) |
| remaining_indices[(c, d)] = remaining_indices[(c, d)][cnt:] |
|
|
| |
| leaky_indices = [] |
| for pair, idxs in remaining_indices.items(): |
| if idxs: |
| leaky_indices.extend(idxs) |
|
|
| split_to_indices["leaky_test"] = leaky_indices |
| split_to_edges = { |
| s: [edges[i] for i in split_to_indices[s]] for s in split_to_indices |
| } |
|
|
| return ( |
| tf_assign, |
| dna_assign, |
| kept_by_split, |
| total_kept, |
| split_to_indices, |
| split_to_edges, |
| ) |
|
|
|
|
| class DSU: |
| def __init__(self): |
| self.p = {} |
|
|
| def find(self, x): |
| if x not in self.p: |
| self.p[x] = x |
| while self.p[x] != x: |
| self.p[x] = self.p[self.p[x]] |
| x = self.p[x] |
| return x |
|
|
| def union(self, a, b): |
| ra, rb = self.find(a), self.find(b) |
| if ra != rb: |
| self.p[rb] = ra |
|
|
|
|
| def split_bipartite_by_components( |
| edges, |
| split_names=("train", "val", "test"), |
| ratios=(0.8, 0.1, 0.1), |
| seed=0, |
| require_nonempty=False, |
| test_edges_must=None, |
| ): |
| """ |
| Guarantees exclusivity: each TF cluster and DNA cluster appears in at most one split. |
| Strategy: find connected components in the TF–DNA bipartite graph and assign components wholesale. |
| """ |
| rng = random.Random(seed) |
| w = Counter(edges) |
| if not w: |
| raise ValueError("No edges.") |
|
|
| |
| dsu = DSU() |
| for tf, dna in w: |
| dsu.union(("T", tf), ("D", dna)) |
| comp_pairs = defaultdict(list) |
| comp_weight = defaultdict(int) |
| for (tf, dna), cnt in w.items(): |
| root = dsu.find(("T", tf)) |
| comp_pairs[root].append((tf, dna)) |
| comp_weight[root] += cnt |
|
|
| comps = list(comp_pairs.keys()) |
| C = len(comps) |
| S = list(split_names) |
| rs = dict(zip(S, ratios)) |
| N = sum(comp_weight[c] for c in comps) |
| target = {s: int(round(rs[s] * N)) for s in S} |
|
|
| |
| pinned = {} |
| if test_edges_must: |
| req = ( |
| Counter(test_edges_must) |
| if not isinstance(test_edges_must, dict) |
| else Counter(test_edges_must) |
| ) |
| |
| for (tf, dna), r in req.items(): |
| if (tf, dna) not in w: |
| raise ValueError(f"Required pair {(tf,dna)} not present.") |
| if r > w[(tf, dna)]: |
| raise ValueError( |
| f"Required count {r} for {(tf,dna)} exceeds available {w[(tf,dna)]}." |
| ) |
| comp = dsu.find(("T", tf)) |
| if comp in pinned and pinned[comp] != "test": |
| raise ValueError( |
| f"Component conflict: already pinned to {pinned[comp]}, but {(tf,dna)} demands test." |
| ) |
| pinned[comp] = "test" |
| |
| |
|
|
| |
| kept_by_split = {s: 0 for s in S} |
| comp_assign = {} |
|
|
| |
| for comp, split in pinned.items(): |
| comp_assign[comp] = split |
| kept_by_split[split] += comp_weight[comp] |
|
|
| |
| remaining = [c for c in comps if c not in comp_assign] |
| remaining.sort(key=lambda c: comp_weight[c], reverse=True) |
|
|
| |
| if require_nonempty: |
| seeds = remaining[: min(len(S), len(remaining))] |
| for comp, s in zip(seeds, S): |
| comp_assign[comp] = s |
| kept_by_split[s] += comp_weight[comp] |
| remaining = [c for c in remaining if c not in comp_assign] |
|
|
| for comp in remaining: |
| |
| deficits = {s: target[s] - kept_by_split[s] for s in S} |
| best = max(deficits, key=lambda s: deficits[s]) |
| comp_assign[comp] = best |
| kept_by_split[best] += comp_weight[comp] |
|
|
| total_kept = sum(kept_by_split.values()) |
|
|
| |
| pair_to_indices = defaultdict(list) |
| for idx, pair in enumerate(edges): |
| pair_to_indices[pair].append(idx) |
|
|
| split_to_indices = {s: [] for s in S} |
| for comp, s in comp_assign.items(): |
| for pair in comp_pairs[comp]: |
| split_to_indices[s].extend(pair_to_indices[pair]) |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| split_to_edges = { |
| s: [edges[i] for i in split_to_indices[s]] for s in split_to_indices |
| } |
| tf_assign, dna_assign = {}, {} |
| for comp, s in comp_assign.items(): |
| for tf, dna in comp_pairs[comp]: |
| tf_assign[tf] = s |
| dna_assign[dna] = s |
|
|
| |
| tf_in_split = defaultdict(set) |
| dna_in_split = defaultdict(set) |
| for s, elist in split_to_edges.items(): |
| for tf, dna in elist: |
| tf_in_split[tf].add(s) |
| dna_in_split[dna].add(s) |
| dup_tf = {tf: ss for tf, ss in tf_in_split.items() if len(ss) > 1} |
| dup_dna = {dn: ss for dn, ss in dna_in_split.items() if len(ss) > 1} |
| assert not dup_tf and not dup_dna, f"Exclusivity violated: {dup_tf} {dup_dna}" |
|
|
| return ( |
| tf_assign, |
| dna_assign, |
| kept_by_split, |
| total_kept, |
| split_to_indices, |
| split_to_edges, |
| ) |
|
|
|
|
| def print_split_ratios(kept_by_split): |
| total = sum(kept_by_split.values()) |
| train_pcnt = 100 * kept_by_split["train"] / total |
| val_pcnt = 100 * kept_by_split["val"] / total |
| test_pcnt = 100 * kept_by_split["test"] / total |
| logger.info( |
| f"Cluster distribution - Train: {train_pcnt:.2f}%, Val: {val_pcnt:.2f}%, Test: {test_pcnt:.2f}%" |
| ) |
|
|
|
|
| def make_edges( |
| processed_fimo_path: str, protein_cluster_path: str, dna_cluster_path: str |
| ): |
| """ |
| Make edges for input to the splitting algorithm. Edges consist of: (tr_cluster_rep)_(dna_cluster_rep) where the cluster rep is the sequence ID |
| """ |
| |
| protein_clusters = pd.read_csv(protein_cluster_path, header=None, sep="\t") |
| protein_clusters.columns = ["tr_cluster_rep", "tr_seqid"] |
|
|
| dna_clusters = pd.read_csv(dna_cluster_path, header=None, sep="\t") |
| dna_clusters.columns = ["dna_cluster_rep", "dna_seqid"] |
|
|
| |
| edges = pd.read_parquet(processed_fimo_path) |
| edges = pd.merge(edges, dna_clusters, on="dna_seqid", how="left") |
| edges = pd.merge(edges, protein_clusters, on="tr_seqid", how="left") |
| edges["edge"] = edges.apply( |
| lambda row: (row["tr_cluster_rep"], row["dna_cluster_rep"]), axis=1 |
| ) |
|
|
| logger.info(f"Total unique edges: {len(edges['edge'].unique().tolist())}") |
| dup_edges = edges.loc[edges.duplicated("edge")]["edge"].unique().tolist() |
| logger.info(f"Total edges with >1 datapoint: {len(dup_edges)}") |
| logger.info( |
| f"Total datapoints belonging to a duplicate edge: {len(edges.loc[edges['edge'].isin(dup_edges)])}" |
| ) |
| return edges |
|
|
|
|
| def check_validity(train, val, test, split_by="both"): |
| """ |
| Rigorous check for no overlap |
| Columns = ["ID","dna_sequence","tr_sequence","tr_cluster_rep","dna_cluster_rep", "scores","split"] |
| """ |
| train_ids = set(train["ID"].unique().tolist()) |
| val_ids = set(val["ID"].unique().tolist()) |
| test_ids = set(test["ID"].unique().tolist()) |
|
|
| assert len(train_ids.intersection(val_ids)) == 0 |
| assert len(train_ids.intersection(test_ids)) == 0 |
| assert len(val_ids.intersection(test_ids)) == 0 |
| logger.info(f"Pass! No overlap in IDs") |
|
|
| if split_by != "dna": |
| train_tr_seqs = set(train["tr_sequence"].unique().tolist()) |
| val_tr_seqs = set(val["tr_sequence"].unique().tolist()) |
| test_tr_seqs = set(test["tr_sequence"].unique().tolist()) |
|
|
| assert len(train_tr_seqs.intersection(val_tr_seqs)) == 0 |
| assert len(train_tr_seqs.intersection(test_tr_seqs)) == 0 |
| assert len(val_tr_seqs.intersection(test_tr_seqs)) == 0 |
| logger.info(f"Pass! No overlap in TR sequences") |
|
|
| train_tr_reps = set(train["tr_cluster_rep"].unique().tolist()) |
| val_tr_reps = set(val["tr_cluster_rep"].unique().tolist()) |
| test_tr_reps = set(test["tr_cluster_rep"].unique().tolist()) |
|
|
| assert len(train_tr_reps.intersection(val_tr_reps)) == 0 |
| assert len(train_tr_reps.intersection(test_tr_reps)) == 0 |
| assert len(val_tr_reps.intersection(test_tr_reps)) == 0 |
| logger.info(f"Pass! No overlap in TR cluster reps") |
|
|
| if split_by != "protein": |
| train_dna_seqs = set(train["dna_sequence"].unique().tolist()) |
| val_dna_seqs = set(val["dna_sequence"].unique().tolist()) |
| test_dna_seqs = set(test["dna_sequence"].unique().tolist()) |
|
|
| assert len(train_dna_seqs.intersection(val_dna_seqs)) == 0 |
| assert len(train_dna_seqs.intersection(test_dna_seqs)) == 0 |
| assert len(val_dna_seqs.intersection(test_dna_seqs)) == 0 |
| logger.info(f"Pass! No overlap in DNA sequences") |
|
|
| train_dna_reps = set(train["dna_cluster_rep"].unique().tolist()) |
| val_dna_reps = set(val["dna_cluster_rep"].unique().tolist()) |
| test_dna_reps = set(test["dna_cluster_rep"].unique().tolist()) |
|
|
| assert len(train_dna_reps.intersection(val_dna_reps)) == 0 |
| assert len(train_dna_reps.intersection(test_dna_reps)) == 0 |
| assert len(val_dna_reps.intersection(test_dna_reps)) == 0 |
| logger.info(f"Pass! No overlap in DNA cluster reps") |
|
|
|
|
| def augment_rc(df): |
| """ |
| Get the reverse complement and add it as a datapoint, effectively doubling the dataset. |
| Also flip the orientation of the scores |
| |
| columns = ["ID","dna_sequence","tr_sequence","tr_cluster_rep","dna_cluster_rep", "scores","split"] |
| """ |
| df_rc = df.copy(deep=True) |
|
|
| df_rc["dna_sequence"] = df_rc["dna_sequence"].apply( |
| lambda x: get_reverse_complement(x) |
| ) |
| df_rc["ID"] = df_rc["ID"] + "_rc" |
| df_rc["scores"] = df_rc["scores"].apply(lambda s: ",".join(s.split(",")[::-1])) |
|
|
| final_df = pd.concat([df, df_rc]).reset_index(drop=True) |
|
|
| return final_df |
|
|
|
|
| def main(cfg: DictConfig): |
| """ |
| Take a set of DNA clusters + protein clusters, and create the best possible splits into train/val/test. |
| """ |
| |
| edge_df = make_edges( |
| processed_fimo_path=Path(root) / cfg.data_task.input_data_path, |
| protein_cluster_path=Path(root) / cfg.data_task.cluster_output_paths.protein, |
| dna_cluster_path=Path(root) / cfg.data_task.cluster_output_paths.dna, |
| ) |
| edges = edge_df["edge"].unique().tolist() |
|
|
| |
| total_proteins = len(edge_df["tr_seqid"].unique().tolist()) |
| total_protein_clusters = len(edge_df["tr_cluster_rep"].unique().tolist()) |
|
|
| no_protein_overlap = (total_proteins) == (total_protein_clusters) |
| logger.info(f"All proteins are in their own clusters: {no_protein_overlap}") |
|
|
| if cfg.data_task.split_by == "dna": |
| if cfg.data_task.p_exclude: |
| return |
| else: |
| logger.info(f"Easy split: all proteins are in their own clusters.") |
| dna_clusters = edge_df["dna_cluster_rep"].unique().tolist() |
| results = split_bipartite_fast( |
| dna_clusters, |
| split_names=("train", "val", "test"), |
| ratios=( |
| cfg.data_task.train_ratio, |
| cfg.data_task.val_ratio, |
| cfg.data_task.test_ratio, |
| ), |
| ) |
| dna_assign, kept_by_split = results |
|
|
| |
| edge_df["split"] = edge_df["dna_cluster_rep"].map(dna_assign) |
| else: |
| results = split_bipartite_by_components( |
| edges, |
| split_names=("train", "val", "test"), |
| ratios=( |
| cfg.data_task.train_ratio, |
| cfg.data_task.val_ratio, |
| cfg.data_task.test_ratio, |
| ), |
| require_nonempty=cfg.data_task.require_nonempty, |
| seed=cfg.data_task.seed, |
| test_edges_must=None, |
| ) |
|
|
| ( |
| tf_assign, |
| dna_assign, |
| kept_by_split, |
| total_kept, |
| split_to_indices, |
| split_to_edges, |
| ) = results |
|
|
| |
| print(tf_assign) |
| print(dna_assign) |
| edge_df["tr_split"] = edge_df["tr_cluster_rep"].map(tf_assign) |
| edge_df["dna_split"] = edge_df["dna_cluster_rep"].map(dna_assign) |
| edge_df["same_split"] = ( |
| edge_df["tr_split"] == edge_df["dna_split"] |
| ) |
| edge_df["split"] = edge_df["tr_split"] |
| print(edge_df) |
| edge_df["split"] = np.where( |
| edge_df["same_split"], |
| edge_df["split"], |
| "leak", |
| ) |
| print(edge_df) |
|
|
| |
| print_split_ratios(kept_by_split) |
|
|
| |
| |
| assert len(edge_df["ID"].unique()) == len(edge_df) |
| split_cols = [ |
| "ID", |
| "dna_sequence", |
| "tr_sequence", |
| "tr_cluster_rep", |
| "dna_cluster_rep", |
| "scores", |
| "split", |
| ] |
| train = edge_df.loc[edge_df["split"] == "train"].reset_index(drop=True)[split_cols] |
| val = edge_df.loc[edge_df["split"] == "val"].reset_index(drop=True)[split_cols] |
| test = edge_df.loc[edge_df["split"] == "test"].reset_index(drop=True)[split_cols] |
|
|
| |
| check_validity(train, val, test, split_by=cfg.data_task.split_by) |
|
|
| total = sum([len(train), len(val), len(test)]) |
| logger.info(f"Length of train dataset: {len(train)} ({100*len(train)/total:.2f}%)") |
| logger.info(f"Length of val dataset: {len(val)} ({100*len(val)/total:.2f}%)") |
| logger.info(f"Length of test dataset: {len(test)} ({100*len(test)/total:.2f}%)") |
| logger.info(f"Total sequences = {total}. Same as edges size? {total==len(edge_df)}") |
|
|
| og_unique_dna = pd.concat([train, val, test]) |
| og_unique_dna = len(og_unique_dna["dna_sequence"].unique()) |
|
|
| |
| if cfg.data_task.augment_rc: |
| train = augment_rc(train) |
| val = augment_rc(val) |
| test = augment_rc(test) |
|
|
| logger.info(f"Added reverse complement sequences to train, val, and test.") |
|
|
| check_validity(train, val, test, split_by=cfg.data_task.split_by) |
|
|
| total = sum([len(train), len(val), len(test)]) |
| logger.info( |
| f"Length of train dataset: {len(train)} ({100*len(train)/total:.2f}%)" |
| ) |
| logger.info(f"Length of val dataset: {len(val)} ({100*len(val)/total:.2f}%)") |
| logger.info(f"Length of test dataset: {len(test)} ({100*len(test)/total:.2f}%)") |
| logger.info( |
| f"Total sequences = {total}. Same as edges size? {total==len(edge_df)}" |
| ) |
|
|
| |
| all_data = pd.concat([train, val, test]) |
| all_data["dna_seqid"] = all_data["ID"].str.split("_", n=1, expand=True)[1] |
| dna_dict = dict(zip(all_data["dna_seqid"], all_data["dna_sequence"])) |
| assert len(dna_dict) == len(all_data.drop_duplicates(["dna_sequence"])) |
| new_map_path = str(Path(root) / cfg.data_task.dna_map_path).replace( |
| ".json", "_with_rc.json" |
| ) |
|
|
| with open(new_map_path, "w") as f: |
| json.dump(dna_dict, f, indent=2) |
| logger.info( |
| f"Saved DNA maps with reverse complements (len {len(dna_dict)}=2*original map of len {og_unique_dna}=={len(dna_dict)==2*og_unique_dna}) to {new_map_path}" |
| ) |
|
|
| |
| split_out_dir = Path(root) / cfg.data_task.split_out_dir |
| os.makedirs(split_out_dir, exist_ok=True) |
| |
| |
| train["fimo_binary_sores"] = train["scores"].apply(lambda x: convert_scores(x)) |
| val["fimo_binary_sores"] = val["scores"].apply(lambda x: convert_scores(x)) |
| test["fimo_binary_sores"] = test["scores"].apply(lambda x: convert_scores(x)) |
| |
| |
| split_final_cols = ["ID", "dna_sequence", "tr_sequence", "scores", "fimo_binary_sores", "split"] |
| train[split_final_cols].to_csv(split_out_dir / "train.csv", index=False) |
| val[split_final_cols].to_csv(split_out_dir / "val.csv", index=False) |
| test[split_final_cols].to_csv(split_out_dir / "test.csv", index=False) |
| logger.info(f"Saved all splits to {split_out_dir}") |
|
|