from typing import Dict, List, Tuple import os, json, math, numpy as np, pandas as pd from collections import defaultdict, Counter # ---------------- Genetic code (DNA) ---------------- AA2CODONS = { 'A':['GCT','GCC','GCA','GCG'], 'R':['CGT','CGC','CGA','CGG','AGA','AGG'], 'N':['AAT','AAC'], 'D':['GAT','GAC'], 'C':['TGT','TGC'], 'Q':['CAA','CAG'], 'E':['GAA','GAG'], 'G':['GGT','GGC','GGA','GGG'], 'H':['CAT','CAC'], 'I':['ATT','ATC','ATA'], 'L':['TTA','TTG','CTT','CTC','CTA','CTG'], 'K':['AAA','AAG'], 'M':['ATG'], 'F':['TTT','TTC'], 'P':['CCT','CCC','CCA','CCG'], 'S':['TCT','TCC','TCA','TCG','AGT','AGC'], 'T':['ACT','ACC','ACA','ACG'], 'W':['TGG'], 'Y':['TAT','TAC'], 'V':['GTT','GTC','GTA','GTG'], '*':['TAA','TAG','TGA'] } DNA_Codons = { # 'M' - START, '_' - STOP "GCT": "A", "GCC": "A", "GCA": "A", "GCG": "A", "TGT": "C", "TGC": "C", "GAT": "D", "GAC": "D", "GAA": "E", "GAG": "E", "TTT": "F", "TTC": "F", "GGT": "G", "GGC": "G", "GGA": "G", "GGG": "G", "CAT": "H", "CAC": "H", "ATA": "I", "ATT": "I", "ATC": "I", "AAA": "K", "AAG": "K", "TTA": "L", "TTG": "L", "CTT": "L", "CTC": "L", "CTA": "L", "CTG": "L", "ATG": "M", "AAT": "N", "AAC": "N", "CCT": "P", "CCC": "P", "CCA": "P", "CCG": "P", "CAA": "Q", "CAG": "Q", "CGT": "R", "CGC": "R", "CGA": "R", "CGG": "R", "AGA": "R", "AGG": "R", "TCT": "S", "TCC": "S", "TCA": "S", "TCG": "S", "AGT": "S", "AGC": "S", "ACT": "T", "ACC": "T", "ACA": "T", "ACG": "T", "GTT": "V", "GTC": "V", "GTA": "V", "GTG": "V", "TGG": "W", "TAT": "Y", "TAC": "Y", "TAA": "_", "TAG": "_", "TGA": "_" } # ---------------- Helpers ---------------- def aminoacid_percentage(codons): '''Calculate the percentage and count of corrsponding codons for each amino acid. - codons: list of codons seperated, like [ACT,CCU,GTT,...] ''' amino_dict_count = defaultdict(list) amino_dict_per = defaultdict(list) for i,v in enumerate(codons): amino = DNA_Codons[v] amino_dict_per[amino].append(v) for k,v in amino_dict_per.items(): c = Counter(v) sub_dict = {kk:np.round(vv/len(v),2) for kk,vv in c.items()} amino_dict_per[k] = sub_dict amino_dict_count[k] = c return amino_dict_per,amino_dict_count def gc_content(sequence=None,fasta_dir=None): """calculate the GC content for the whole and each position of the codon (1,2 and 3) sequence: as a list and without any space. ['AGCCCCTTT...']""" if sequence!=None: sequences=sequence else: sequences=fasta_to_list(fasta_dir,seq_to_codon=False,sos_eos=False) #calculate the original GC content gc_content=[[(seq.lower().count('g')+seq.lower().count('c'))/len(seq)*100] for seq in sequences] for index,seq in enumerate(sequences): seq=seq.lower() for i in range(3): position_nucleotides=seq[i::3] # Count G and C in the position i nucleotides gc_count = position_nucleotides.count('g') + position_nucleotides.count('c') total_position_nucleotides = len(position_nucleotides) # Calculate GC content percentage gc_content_percentage = (gc_count / total_position_nucleotides) * 100 gc_content[index].append(gc_content_percentage) return pd.DataFrame(gc_content,columns=['Original','Position One','Position Two','Position Three']).round(1) def parse_kmer_list(x) -> List[str]: """Parse semicolon-separated kmers 'AAA;TTT;...' into a list (uppercased).""" if x is None: return [] s = str(x).strip() if not s: return [] return [k.strip().upper() for k in s.split(";") if k.strip()] def load_summary(summary_path: str) -> pd.DataFrame: """Read CSV or XLSX summary to DataFrame.""" sp = summary_path.lower() if sp.endswith(".xlsx") or sp.endswith(".xls"): return pd.read_excel(summary_path) return pd.read_csv(summary_path) def scale_interval_codon(a_codon: int, b_codon: int, L_train_cds: int, L_target_cds: int) -> Tuple[int, int]: """ Percentage-map training codon interval [a,b] (1-based, inclusive) to target codon length. """ a2 = 1 + int(((a_codon - 1) / max(1, L_train_cds)) * max(1, L_target_cds - 1)) b2 = int(math.ceil((b_codon / max(1, L_train_cds)) * L_target_cds)) a2 = max(1, min(a2, L_target_cds)) b2 = max(1, min(b2, L_target_cds)) return a2, b2 def codon_region_to_nt_span(a_codon: int, b_codon: int) -> Tuple[int, int]: """Convert a codon region to 1-based nucleotide span [nt_start, nt_end].""" return 3*(a_codon-1)+1, 3*b_codon def feasible_codons_with_pattern(aa: str, pattern: str) -> List[str]: """ pattern is a 3-char string like '.C.' where '.' = free. Return all codons for this amino acid matching the pattern. """ outs = [] for c in AA2CODONS[aa]: ok = True for i, ch in enumerate(pattern): if ch != '.' and c[i] != ch: ok = False break if ok: outs.append(c) return outs # ---------------- Wobble preference helpers ---------------- def wobble_bonus(base: str) -> int: """3rd-base preference: C=+2, G=-1, A/T=-2; '.' or others -> 0.""" if base == "C": return 2 if base == "G": return -1 if base in ("A", "T"): return -2 return 0 def expected_wobble_for_pattern(aa: str, pattern: str) -> float: """ If wobble (pos 3) is fixed, return its bonus. If wobble is free ('.'), return the average wobble bonus over all feasible codons. """ wob = pattern[2] if wob in "ACGT": return float(wobble_bonus(wob)) feas = feasible_codons_with_pattern(aa, pattern) if not feas: return 0.0 return sum(wobble_bonus(c[2]) for c in feas) / len(feas) def placement_wobble_score(constraints: Dict[int, str], aa_seq: str) -> float: """ Sum wobble preferences over all codons touched by a placement. constraints: {codon_index_0based: '...|.C.|..A' patterns} """ total = 0.0 for ci, patt in constraints.items(): if ci < 0 or ci >= len(aa_seq): continue total += expected_wobble_for_pattern(aa_seq[ci], patt) return total # ---------------- Seeding best_kmers (enumerate placements) ---------------- def place_kmer_seed_in_region_codon(aa_seq: str, a_codon: int, b_codon: int, kmer: str, fixed_nt: Dict[int, str]) -> List[Dict]: """ Enumerate feasible placements of a k-mer inside codon region [a_codon,b_codon] (1-based, inclusive). fixed_nt: dict(nt_index_0based -> 'A/C/G/T') already fixed nts. Returns a list of feasible placements (each with nt start/end and codon patterns). """ nt_start, nt_end = codon_region_to_nt_span(a_codon, b_codon) region_start_nt0, region_end_nt0 = nt_start-1, nt_end-1 k = len(kmer) placements = [] for u in range(region_start_nt0, region_end_nt0 - k + 2): v = u + k - 1 # conflict with fixed nts? conflict = False for t in range(k): idx = u + t need = kmer[t] if idx in fixed_nt and fixed_nt[idx] != need: conflict = True break if conflict: continue # build codon-level constraints constraints: Dict[int, str] = {} codon_i0 = u // 3 codon_i1 = v // 3 ok = True for ci in range(codon_i0, codon_i1 + 1): if ci >= len(aa_seq): ok = False break patt = list("...") for ofs in range(3): nt_idx = ci*3 + ofs if u <= nt_idx <= v: patt[ofs] = kmer[nt_idx - u] patt_s = "".join(patt) if not feasible_codons_with_pattern(aa_seq[ci], patt_s): ok = False break constraints[ci] = patt_s if not ok: continue placements.append({"start_nt": u, "end_nt": v, "constraints": constraints}) return placements # ---------------- Scoring while filling ---------------- def _parse_klist(s): if pd.isna(s) or not str(s).strip(): return [] return [t.strip().upper() for t in str(s).split(";") if t.strip()] def _parse_kwmap(s): """ Parse 'kmer:weight;kmer:weight;...' -> dict. If weights absent, return {}. """ if pd.isna(s) or not str(s).strip(): return {} out = {} for tok in str(s).split(";"): tok = tok.strip() if not tok: continue if ":" in tok: k, w = tok.split(":", 1) try: out[k.strip().upper()] = float(w) except: pass return out # ========= New helpers ========= def len_weight(K: int, is_pos: bool) -> float: """ Length-aware default weights. Pos: +0.5*K (e.g., {+1,+1.5,+2.5,+3} for K={2,3,5,6}) Neg: -1.0*K (e.g., {-2,-3,-5,-6}) Tune or replace with info-weighting if you have stats. """ return (0.5 * K) if is_pos else (-1.0 * K) def collect_right_known_nt(ci: int, fixed_nt: dict, L_target_cds: int, limit_nt: int) -> str: """ Collect up to 'limit_nt' contiguous known nts to the RIGHT of codon index 'ci' (i.e., starting immediately after the newly added codon). - ci is 0-based codon index we're currently filling. - fixed_nt: {nt_index_0based: 'A/C/G/T'} (from seeding or previous commits) - We stop when we hit the first unknown nt, or reach 'limit_nt'. Returns a string of known nts (may be empty). """ out = [] start_nt = (ci + 1) * 3 # first nt AFTER the new codon (0-based) nt = start_nt while len(out) < limit_nt: if nt in fixed_nt: out.append(fixed_nt[nt]) nt += 1 # also require contiguity: if (nt) not in fixed_nt we break below continue break return "".join(out) def enumerate_local_windows_overlapping_new(block: str, new_start: int, # index in block (0-based) new_len: int, K: int): """ Yield all K-length windows (substring indices [s, s+K)) within 'block' that overlap the 'new' segment [new_start, new_start+new_len). """ L = len(block) if K > L: return new_end = new_start + new_len # exclusive # window [s, s+K) overlaps new if: s < new_end and s+K > new_start s_min = max(0, new_end - K) # smallest s whose window can still reach new_end s_max = min(new_start, L - K) # largest s that still starts before/at new_start # Expand a bit to cover all overlaps safely: s_lo = max(0, new_start - (K - 1)) s_hi = min(L - K, new_end - 1) s_from = min(s_min, s_lo) s_to = max(s_max, s_hi) for s in range(s_from, s_to + 1): # overlap check if s < new_end and (s + K) > new_start: yield s, block[s:s+K] def score_increment_multiKs(tail_nt: str, new_codon: str, best_sets: dict, # {K: set(kmer)} avoid_sets: dict, # {K: set(kmer)} pos_w: dict = None, # {(K,kmer): weight} neg_w: dict = None, # {(K,kmer): weight} wobble: bool = True, wobble_scale: float = 1.0, # NEW: scoring_mode: str = "local", # "local" or "suffix" right_known_nt: str = "", # only used for "local" hard_forbid: set = None # optional set of k-mers that must kill a branch ) -> float: """ Returns the incremental score for appending 'new_codon'. - 'tail_nt' is the last (Kmax-1) nts already known to the LEFT. - In 'local' mode, we also use 'right_known_nt' (contiguous known nts to the right). - Only windows that overlap 'new_codon' are counted in 'local' mode. - In 'suffix' mode, we keep legacy behavior: only the trailing window per K. """ HARD_KILL = -1e9 # Build the left→new→right local block for "local" mode if scoring_mode == "local": left = tail_nt or "" mid = new_codon right = right_known_nt or "" block = left + mid + right new_start = len(left) new_len = 3 L_block = len(block) else: # suffix mode uses only left+mid sequence seq = (tail_nt + new_codon) if tail_nt else new_codon L_seq = len(seq) gain = 0.0 # Per-K evaluation Ks = sorted(best_sets.keys() | avoid_sets.keys()) for K in Ks: # skip if we don't have enough context if scoring_mode == "local": if K > (L_block): continue windows = list(enumerate_local_windows_overlapping_new( block, new_start, new_len, K )) if not windows: continue # Sum all windows’ contributions for _, km in windows: # Hard-forbid first (rare, red-flag patterns) if hard_forbid and km in hard_forbid: return HARD_KILL # soft negatives / pos if km in avoid_sets.get(K, set()): if neg_w and (K, km) in neg_w: gain += -abs(neg_w[(K, km)]) else: gain += len_weight(K, is_pos=False) if km in best_sets.get(K, set()): if pos_w and (K, km) in pos_w: gain += abs(pos_w[(K, km)]) else: gain += len_weight(K, is_pos=True) else: # suffix-only: check only the trailing K-mer if K > L_seq: continue km = seq[-K:] if hard_forbid and km in hard_forbid: return HARD_KILL if km in avoid_sets.get(K, set()): if neg_w and (K, km) in neg_w: gain += -abs(neg_w[(K, km)]) else: gain += len_weight(K, is_pos=False) if km in best_sets.get(K, set()): if pos_w and (K, km) in pos_w: gain += abs(pos_w[(K, km)]) else: gain += len_weight(K, is_pos=True) # Wobble bonus for the 3rd base of the new codon if wobble: b3 = new_codon[2] wb = 2 if b3 == "C" else (-1 if b3 == "G" else (-2 if b3 in ("A", "T") else 0)) gain += wobble_scale * wb return gain # ---------------- Main: use ALL best_kmers, forbid avoid_kmers ---------------- def optimization(summary_path: str, aa_seq: str, use_percent_intervals: bool = True) -> Tuple[str, List[str], List[dict]]: """ For each region: - Seed: place listed positive k-mers (wobble-aware placement). - Fill: beam-search codon-by-codon using multi-K positive/negative k-mer sets (+ optional per-kmer weights) and a wobble bonus. Regions are CODON-based (start/end = 1-based codon indices). """ df = load_summary(summary_path) # ---- Flexible schema check: accept either legacy or multi-K columns has_legacy = all(c in df.columns for c in ["start","end","best_kmers","avoid_kmers"]) has_multik = all(c in df.columns for c in ["start","end"]) and any( str(col).startswith("K") and (str(col).endswith("_pos") or str(col).endswith("_neg")) for col in df.columns ) if not (has_legacy or has_multik): raise ValueError( "Input must have 'start','end' and either legacy columns " "['best_kmers','avoid_kmers'] OR per-K columns like 'K4_pos','K4_neg' (optionally *_pos_w/*_neg_w)." ) L_target_cds = len(aa_seq) L_train_cds = int(df["end"].max()) # training CDS length in codons # Build codon regions (scaled or clipped) regions: List[Tuple[int, int, pd.Series]] = [] for _, r in df.iterrows(): a_c, b_c = int(r["start"]), int(r["end"]) if use_percent_intervals: a2, b2 = scale_interval_codon(a_c, b_c, L_train_cds, L_target_cds) else: a2, b2 = max(1, a_c), min(L_target_cds, b_c) if a2 <= b2: regions.append((a2, b2, r)) # Global state accumulated across regions chosen_codons: List[str] = [None] * L_target_cds fixed_nt: Dict[int, str] = {} # nt index (0-based) -> 'A/C/G/T' tail_nt: str = "" # last (maxK-1) nts across regions log_info: List[dict] = [] # notes per region for (a_c, b_c, row) in regions: # 0) parse per-K lists if present; else fall back to single best/avoid best_sets, avoid_sets = {}, {} pos_w, neg_w = {}, {} # optional weights {(K,kmer)->w} found_any_K = False for K in range(2, 10): # adjust upper bound if you export more K's pos_col, neg_col = f"K{K}_pos", f"K{K}_neg" if pos_col in row and neg_col in row and (pd.notna(row[pos_col]) or pd.notna(row[neg_col])): pos_list = _parse_klist(row.get(pos_col, "")) neg_list = _parse_klist(row.get(neg_col, "")) if pos_list or neg_list: best_sets[K] = set(pos_list) avoid_sets[K] = set(neg_list) # optional weight columns like "K4_pos_w", "K4_neg_w" in "kmer:weight;..." posw_col, negw_col = f"K{K}_pos_w", f"K{K}_neg_w" if posw_col in row and pd.notna(row[posw_col]): for kmer, w in _parse_kwmap(row[posw_col]).items(): pos_w[(K, kmer)] = w if negw_col in row and pd.notna(row[negw_col]): for kmer, w in _parse_kwmap(row[negw_col]).items(): neg_w[(K, kmer)] = w found_any_K = True if not found_any_K: # fallback: your old single-K interface best_kmers = parse_kmer_list(row.get("best_kmers", "")) avoid_kmers = parse_kmer_list(row.get("avoid_kmers", "")) lengths = set(len(k) for k in best_kmers) if best_kmers else set() if len(lengths) > 1: raise ValueError(f"Region {a_c}-{b_c}: multiple k lengths found in best_kmers: {lengths}") k_len = lengths.pop() if lengths else 4 best_sets = {k_len: set(best_kmers)} avoid_sets = {k_len: set(avoid_kmers)} # 1) SEED (unchanged): place all listed best_kmers by wobble-aware placement placed, skipped = [], [] # choose one K to iterate seeds when multi-K (use the longest set for seeding priority) seed_K = max(best_sets.keys()) if best_sets else None seeds_for_loop = sorted(best_sets.get(seed_K, []), key=len, reverse=True) if seed_K else [] for km in seeds_for_loop: places = place_kmer_seed_in_region_codon(aa_seq, a_c, b_c, km, fixed_nt) if not places: skipped.append(km) continue # pick placement with highest wobble score (tie: fewer forced A/T, then earlier) best_p, best_s = None, float("-inf") for cand in places: s = placement_wobble_score(cand["constraints"], aa_seq) for patt in cand["constraints"].values(): if patt[2] in ("A", "T"): s -= 0.25 if (s > best_s) or (s == best_s and (best_p is None or cand["start_nt"] < best_p["start_nt"])): best_s, best_p = s, cand # check conflict then fix nts conflict = False for t in range(best_p["start_nt"], best_p["end_nt"] + 1): need = km[t - best_p["start_nt"]] if t in fixed_nt and fixed_nt[t] != need: conflict = True break if conflict: skipped.append(km) continue for t in range(best_p["start_nt"], best_p["end_nt"] + 1): fixed_nt[t] = km[t - best_p["start_nt"]] placed.append(km) # 2) BEAM-SEARCH fill codons in this region (codon-by-codon) BEAM = 5 # beam width (tune as you like) # derive “reference” k_len for tail buffering: use max K present k_len_ref = (max(best_sets.keys()) if best_sets else 4) tail_nt = tail_nt[-(k_len_ref - 1):] if k_len_ref > 1 else "" # beam state: (score, tail_nt, local_choices[list of codons], local_fixed_nt{nt->base}) # we'll commit to chosen_codons/fixed_nt after region end init_state = (0.0, tail_nt, [], dict()) beam = [init_state] for ci in range(a_c - 1, b_c): aa = aa_seq[ci] # If already fixed by seeding, only that codon is allowed patt = list("...") for ofs in range(3): nt_idx = ci * 3 + ofs if nt_idx in fixed_nt: patt[ofs] = fixed_nt[nt_idx] patt_s = "".join(patt) cand_codons = feasible_codons_with_pattern(aa, patt_s) or AA2CODONS[aa] new_beam = [] for score, tail, local_codons, local_fix in beam: # Kmax for local context (use the largest K defined this region; fallback 6) Kmax = max(best_sets.keys() or [6]) # Collect up to Kmax-1 contiguous known nts to the RIGHT # NOTE: we use 'fixed_nt' (global seeded/committed nts), not 'local_fix' right_known = collect_right_known_nt(ci, fixed_nt, L_target_cds, limit_nt=Kmax-1) for c in cand_codons: # build temporary tail and check multi-K score gain = score_increment_multiKs( tail_nt=tail, new_codon=c, best_sets=best_sets, avoid_sets=avoid_sets, pos_w=pos_w if pos_w else None, neg_w=neg_w if neg_w else None, # now supported wobble=True, wobble_scale=1.0, scoring_mode="local", right_known_nt=right_known, hard_forbid=None # or a small set for true red-flags ) if gain <= -1e5: continue # forbidden # make copies tail2 = (tail + c)[-(k_len_ref - 1):] if k_len_ref > 1 else "" local2 = local_codons + [c] fix2 = dict(local_fix) for ofs, ch in enumerate(c): fix2[ci * 3 + ofs] = ch new_beam.append((score + gain, tail2, local2, fix2)) if not new_beam: # fall back: force first feasible to keep going c = cand_codons[0] tail2 = (beam[0][1] + c)[-(k_len_ref - 1):] if k_len_ref > 1 else "" local2 = beam[0][2] + [c] fix2 = dict(beam[0][3]) for ofs, ch in enumerate(c): fix2[ci * 3 + ofs] = ch new_beam = [(beam[0][0], tail2, local2, fix2)] # prune new_beam.sort(key=lambda t: t[0], reverse=True) beam = new_beam[:BEAM] # choose best path from beam and commit to global arrays best_score, best_tail, best_local, best_fix = max(beam, key=lambda t: t[0]) # write chosen codons for this region for ofs, c in enumerate(best_local): idx = (a_c - 1) + ofs chosen_codons[idx] = c # update fixed_nt fixed_nt.update(best_fix) # update running tail tail_nt = best_tail log_info.append({ "region": f"{a_c}-{b_c}", "k_len_ref": k_len_ref, "best_total": sum(len(s) for s in best_sets.values()), "best_placed": len(placed), "best_skipped": skipped, "beam_best_score": best_score, "beam_kept": BEAM }) # build final nt string designed_nt = "".join(chosen_codons if all(chosen_codons) else [c or "NNN" for c in chosen_codons]) aa_percent = aminoacid_percentage(chosen_codons)[0] gc_percent = gc_content([designed_nt]) return designed_nt, aa_percent, gc_percent, log_info # ---------------- Example usage (comment out in library use) ---------------- if __name__ == "__main__": # Example (replace with your real paths and AA sequence) # summary_path = r"C:\path\to\region_sweep_summary.csv" # aa_seq = "M..." # your amino-acid sequence # nt_seq, codons, log = optimization(summary_path, aa_seq, use_percent_intervals=True) # print(nt_seq) pass