codon-optimizer / optimizer.py
farshidk's picture
Upload optimizer.py
54a06d3 verified
from typing import Dict, List, Tuple
import os, json, math, numpy as np, pandas as pd
from collections import defaultdict, Counter
# ---------------- Genetic code (DNA) ----------------
AA2CODONS = {
'A':['GCT','GCC','GCA','GCG'],
'R':['CGT','CGC','CGA','CGG','AGA','AGG'],
'N':['AAT','AAC'],
'D':['GAT','GAC'],
'C':['TGT','TGC'],
'Q':['CAA','CAG'],
'E':['GAA','GAG'],
'G':['GGT','GGC','GGA','GGG'],
'H':['CAT','CAC'],
'I':['ATT','ATC','ATA'],
'L':['TTA','TTG','CTT','CTC','CTA','CTG'],
'K':['AAA','AAG'],
'M':['ATG'],
'F':['TTT','TTC'],
'P':['CCT','CCC','CCA','CCG'],
'S':['TCT','TCC','TCA','TCG','AGT','AGC'],
'T':['ACT','ACC','ACA','ACG'],
'W':['TGG'],
'Y':['TAT','TAC'],
'V':['GTT','GTC','GTA','GTG'],
'*':['TAA','TAG','TGA']
}
DNA_Codons = {
# 'M' - START, '_' - STOP
"GCT": "A", "GCC": "A", "GCA": "A", "GCG": "A",
"TGT": "C", "TGC": "C",
"GAT": "D", "GAC": "D",
"GAA": "E", "GAG": "E",
"TTT": "F", "TTC": "F",
"GGT": "G", "GGC": "G", "GGA": "G", "GGG": "G",
"CAT": "H", "CAC": "H",
"ATA": "I", "ATT": "I", "ATC": "I",
"AAA": "K", "AAG": "K",
"TTA": "L", "TTG": "L", "CTT": "L", "CTC": "L", "CTA": "L", "CTG": "L",
"ATG": "M",
"AAT": "N", "AAC": "N",
"CCT": "P", "CCC": "P", "CCA": "P", "CCG": "P",
"CAA": "Q", "CAG": "Q",
"CGT": "R", "CGC": "R", "CGA": "R", "CGG": "R", "AGA": "R", "AGG": "R",
"TCT": "S", "TCC": "S", "TCA": "S", "TCG": "S", "AGT": "S", "AGC": "S",
"ACT": "T", "ACC": "T", "ACA": "T", "ACG": "T",
"GTT": "V", "GTC": "V", "GTA": "V", "GTG": "V",
"TGG": "W",
"TAT": "Y", "TAC": "Y",
"TAA": "_", "TAG": "_", "TGA": "_"
}
# ---------------- Helpers ----------------
def aminoacid_percentage(codons):
'''Calculate the percentage and count of corrsponding codons for each amino acid.
- codons: list of codons seperated, like [ACT,CCU,GTT,...]
'''
amino_dict_count = defaultdict(list)
amino_dict_per = defaultdict(list)
for i,v in enumerate(codons):
amino = DNA_Codons[v]
amino_dict_per[amino].append(v)
for k,v in amino_dict_per.items():
c = Counter(v)
sub_dict = {kk:np.round(vv/len(v),2) for kk,vv in c.items()}
amino_dict_per[k] = sub_dict
amino_dict_count[k] = c
return amino_dict_per,amino_dict_count
def gc_content(sequence=None,fasta_dir=None):
"""calculate the GC content for the whole and each position of the codon (1,2 and 3)
sequence: as a list and without any space. ['AGCCCCTTT...']"""
if sequence!=None:
sequences=sequence
else:
sequences=fasta_to_list(fasta_dir,seq_to_codon=False,sos_eos=False)
#calculate the original GC content
gc_content=[[(seq.lower().count('g')+seq.lower().count('c'))/len(seq)*100] for seq in sequences]
for index,seq in enumerate(sequences):
seq=seq.lower()
for i in range(3):
position_nucleotides=seq[i::3]
# Count G and C in the position i nucleotides
gc_count = position_nucleotides.count('g') + position_nucleotides.count('c')
total_position_nucleotides = len(position_nucleotides)
# Calculate GC content percentage
gc_content_percentage = (gc_count / total_position_nucleotides) * 100
gc_content[index].append(gc_content_percentage)
return pd.DataFrame(gc_content,columns=['Original','Position One','Position Two','Position Three']).round(1)
def parse_kmer_list(x) -> List[str]:
"""Parse semicolon-separated kmers 'AAA;TTT;...' into a list (uppercased)."""
if x is None:
return []
s = str(x).strip()
if not s:
return []
return [k.strip().upper() for k in s.split(";") if k.strip()]
def load_summary(summary_path: str) -> pd.DataFrame:
"""Read CSV or XLSX summary to DataFrame."""
sp = summary_path.lower()
if sp.endswith(".xlsx") or sp.endswith(".xls"):
return pd.read_excel(summary_path)
return pd.read_csv(summary_path)
def scale_interval_codon(a_codon: int, b_codon: int,
L_train_cds: int, L_target_cds: int) -> Tuple[int, int]:
"""
Percentage-map training codon interval [a,b] (1-based, inclusive) to target codon length.
"""
a2 = 1 + int(((a_codon - 1) / max(1, L_train_cds)) * max(1, L_target_cds - 1))
b2 = int(math.ceil((b_codon / max(1, L_train_cds)) * L_target_cds))
a2 = max(1, min(a2, L_target_cds))
b2 = max(1, min(b2, L_target_cds))
return a2, b2
def codon_region_to_nt_span(a_codon: int, b_codon: int) -> Tuple[int, int]:
"""Convert a codon region to 1-based nucleotide span [nt_start, nt_end]."""
return 3*(a_codon-1)+1, 3*b_codon
def feasible_codons_with_pattern(aa: str, pattern: str) -> List[str]:
"""
pattern is a 3-char string like '.C.' where '.' = free.
Return all codons for this amino acid matching the pattern.
"""
outs = []
for c in AA2CODONS[aa]:
ok = True
for i, ch in enumerate(pattern):
if ch != '.' and c[i] != ch:
ok = False
break
if ok:
outs.append(c)
return outs
# ---------------- Wobble preference helpers ----------------
def wobble_bonus(base: str) -> int:
"""3rd-base preference: C=+2, G=-1, A/T=-2; '.' or others -> 0."""
if base == "C":
return 2
if base == "G":
return -1
if base in ("A", "T"):
return -2
return 0
def expected_wobble_for_pattern(aa: str, pattern: str) -> float:
"""
If wobble (pos 3) is fixed, return its bonus.
If wobble is free ('.'), return the average wobble bonus over all feasible codons.
"""
wob = pattern[2]
if wob in "ACGT":
return float(wobble_bonus(wob))
feas = feasible_codons_with_pattern(aa, pattern)
if not feas:
return 0.0
return sum(wobble_bonus(c[2]) for c in feas) / len(feas)
def placement_wobble_score(constraints: Dict[int, str], aa_seq: str) -> float:
"""
Sum wobble preferences over all codons touched by a placement.
constraints: {codon_index_0based: '...|.C.|..A' patterns}
"""
total = 0.0
for ci, patt in constraints.items():
if ci < 0 or ci >= len(aa_seq):
continue
total += expected_wobble_for_pattern(aa_seq[ci], patt)
return total
# ---------------- Seeding best_kmers (enumerate placements) ----------------
def place_kmer_seed_in_region_codon(aa_seq: str,
a_codon: int, b_codon: int,
kmer: str,
fixed_nt: Dict[int, str]) -> List[Dict]:
"""
Enumerate feasible placements of a k-mer inside codon region [a_codon,b_codon] (1-based, inclusive).
fixed_nt: dict(nt_index_0based -> 'A/C/G/T') already fixed nts.
Returns a list of feasible placements (each with nt start/end and codon patterns).
"""
nt_start, nt_end = codon_region_to_nt_span(a_codon, b_codon)
region_start_nt0, region_end_nt0 = nt_start-1, nt_end-1
k = len(kmer)
placements = []
for u in range(region_start_nt0, region_end_nt0 - k + 2):
v = u + k - 1
# conflict with fixed nts?
conflict = False
for t in range(k):
idx = u + t
need = kmer[t]
if idx in fixed_nt and fixed_nt[idx] != need:
conflict = True
break
if conflict:
continue
# build codon-level constraints
constraints: Dict[int, str] = {}
codon_i0 = u // 3
codon_i1 = v // 3
ok = True
for ci in range(codon_i0, codon_i1 + 1):
if ci >= len(aa_seq):
ok = False
break
patt = list("...")
for ofs in range(3):
nt_idx = ci*3 + ofs
if u <= nt_idx <= v:
patt[ofs] = kmer[nt_idx - u]
patt_s = "".join(patt)
if not feasible_codons_with_pattern(aa_seq[ci], patt_s):
ok = False
break
constraints[ci] = patt_s
if not ok:
continue
placements.append({"start_nt": u, "end_nt": v, "constraints": constraints})
return placements
# ---------------- Scoring while filling ----------------
def _parse_klist(s):
if pd.isna(s) or not str(s).strip():
return []
return [t.strip().upper() for t in str(s).split(";") if t.strip()]
def _parse_kwmap(s):
"""
Parse 'kmer:weight;kmer:weight;...' -> dict. If weights absent, return {}.
"""
if pd.isna(s) or not str(s).strip():
return {}
out = {}
for tok in str(s).split(";"):
tok = tok.strip()
if not tok:
continue
if ":" in tok:
k, w = tok.split(":", 1)
try:
out[k.strip().upper()] = float(w)
except:
pass
return out
# ========= New helpers =========
def len_weight(K: int, is_pos: bool) -> float:
"""
Length-aware default weights.
Pos: +0.5*K (e.g., {+1,+1.5,+2.5,+3} for K={2,3,5,6})
Neg: -1.0*K (e.g., {-2,-3,-5,-6})
Tune or replace with info-weighting if you have stats.
"""
return (0.5 * K) if is_pos else (-1.0 * K)
def collect_right_known_nt(ci: int,
fixed_nt: dict,
L_target_cds: int,
limit_nt: int) -> str:
"""
Collect up to 'limit_nt' contiguous known nts to the RIGHT of codon index 'ci'
(i.e., starting immediately after the newly added codon).
- ci is 0-based codon index we're currently filling.
- fixed_nt: {nt_index_0based: 'A/C/G/T'} (from seeding or previous commits)
- We stop when we hit the first unknown nt, or reach 'limit_nt'.
Returns a string of known nts (may be empty).
"""
out = []
start_nt = (ci + 1) * 3 # first nt AFTER the new codon (0-based)
nt = start_nt
while len(out) < limit_nt:
if nt in fixed_nt:
out.append(fixed_nt[nt])
nt += 1
# also require contiguity: if (nt) not in fixed_nt we break below
continue
break
return "".join(out)
def enumerate_local_windows_overlapping_new(block: str,
new_start: int, # index in block (0-based)
new_len: int,
K: int):
"""
Yield all K-length windows (substring indices [s, s+K)) within 'block' that
overlap the 'new' segment [new_start, new_start+new_len).
"""
L = len(block)
if K > L:
return
new_end = new_start + new_len # exclusive
# window [s, s+K) overlaps new if: s < new_end and s+K > new_start
s_min = max(0, new_end - K) # smallest s whose window can still reach new_end
s_max = min(new_start, L - K) # largest s that still starts before/at new_start
# Expand a bit to cover all overlaps safely:
s_lo = max(0, new_start - (K - 1))
s_hi = min(L - K, new_end - 1)
s_from = min(s_min, s_lo)
s_to = max(s_max, s_hi)
for s in range(s_from, s_to + 1):
# overlap check
if s < new_end and (s + K) > new_start:
yield s, block[s:s+K]
def score_increment_multiKs(tail_nt: str,
new_codon: str,
best_sets: dict, # {K: set(kmer)}
avoid_sets: dict, # {K: set(kmer)}
pos_w: dict = None, # {(K,kmer): weight}
neg_w: dict = None, # {(K,kmer): weight}
wobble: bool = True,
wobble_scale: float = 1.0,
# NEW:
scoring_mode: str = "local", # "local" or "suffix"
right_known_nt: str = "", # only used for "local"
hard_forbid: set = None # optional set of k-mers that must kill a branch
) -> float:
"""
Returns the incremental score for appending 'new_codon'.
- 'tail_nt' is the last (Kmax-1) nts already known to the LEFT.
- In 'local' mode, we also use 'right_known_nt' (contiguous known nts to the right).
- Only windows that overlap 'new_codon' are counted in 'local' mode.
- In 'suffix' mode, we keep legacy behavior: only the trailing window per K.
"""
HARD_KILL = -1e9
# Build the left→new→right local block for "local" mode
if scoring_mode == "local":
left = tail_nt or ""
mid = new_codon
right = right_known_nt or ""
block = left + mid + right
new_start = len(left)
new_len = 3
L_block = len(block)
else:
# suffix mode uses only left+mid sequence
seq = (tail_nt + new_codon) if tail_nt else new_codon
L_seq = len(seq)
gain = 0.0
# Per-K evaluation
Ks = sorted(best_sets.keys() | avoid_sets.keys())
for K in Ks:
# skip if we don't have enough context
if scoring_mode == "local":
if K > (L_block):
continue
windows = list(enumerate_local_windows_overlapping_new(
block, new_start, new_len, K
))
if not windows:
continue
# Sum all windows’ contributions
for _, km in windows:
# Hard-forbid first (rare, red-flag patterns)
if hard_forbid and km in hard_forbid:
return HARD_KILL
# soft negatives / pos
if km in avoid_sets.get(K, set()):
if neg_w and (K, km) in neg_w:
gain += -abs(neg_w[(K, km)])
else:
gain += len_weight(K, is_pos=False)
if km in best_sets.get(K, set()):
if pos_w and (K, km) in pos_w:
gain += abs(pos_w[(K, km)])
else:
gain += len_weight(K, is_pos=True)
else:
# suffix-only: check only the trailing K-mer
if K > L_seq:
continue
km = seq[-K:]
if hard_forbid and km in hard_forbid:
return HARD_KILL
if km in avoid_sets.get(K, set()):
if neg_w and (K, km) in neg_w:
gain += -abs(neg_w[(K, km)])
else:
gain += len_weight(K, is_pos=False)
if km in best_sets.get(K, set()):
if pos_w and (K, km) in pos_w:
gain += abs(pos_w[(K, km)])
else:
gain += len_weight(K, is_pos=True)
# Wobble bonus for the 3rd base of the new codon
if wobble:
b3 = new_codon[2]
wb = 2 if b3 == "C" else (-1 if b3 == "G" else (-2 if b3 in ("A", "T") else 0))
gain += wobble_scale * wb
return gain
# ---------------- Main: use ALL best_kmers, forbid avoid_kmers ----------------
def optimization(summary_path: str,
aa_seq: str,
use_percent_intervals: bool = True) -> Tuple[str, List[str], List[dict]]:
"""
For each region:
- Seed: place listed positive k-mers (wobble-aware placement).
- Fill: beam-search codon-by-codon using multi-K positive/negative k-mer sets
(+ optional per-kmer weights) and a wobble bonus.
Regions are CODON-based (start/end = 1-based codon indices).
"""
df = load_summary(summary_path)
# ---- Flexible schema check: accept either legacy or multi-K columns
has_legacy = all(c in df.columns for c in ["start","end","best_kmers","avoid_kmers"])
has_multik = all(c in df.columns for c in ["start","end"]) and any(
str(col).startswith("K") and (str(col).endswith("_pos") or str(col).endswith("_neg"))
for col in df.columns
)
if not (has_legacy or has_multik):
raise ValueError(
"Input must have 'start','end' and either legacy columns "
"['best_kmers','avoid_kmers'] OR per-K columns like 'K4_pos','K4_neg' (optionally *_pos_w/*_neg_w)."
)
L_target_cds = len(aa_seq)
L_train_cds = int(df["end"].max()) # training CDS length in codons
# Build codon regions (scaled or clipped)
regions: List[Tuple[int, int, pd.Series]] = []
for _, r in df.iterrows():
a_c, b_c = int(r["start"]), int(r["end"])
if use_percent_intervals:
a2, b2 = scale_interval_codon(a_c, b_c, L_train_cds, L_target_cds)
else:
a2, b2 = max(1, a_c), min(L_target_cds, b_c)
if a2 <= b2:
regions.append((a2, b2, r))
# Global state accumulated across regions
chosen_codons: List[str] = [None] * L_target_cds
fixed_nt: Dict[int, str] = {} # nt index (0-based) -> 'A/C/G/T'
tail_nt: str = "" # last (maxK-1) nts across regions
log_info: List[dict] = [] # notes per region
for (a_c, b_c, row) in regions:
# 0) parse per-K lists if present; else fall back to single best/avoid
best_sets, avoid_sets = {}, {}
pos_w, neg_w = {}, {} # optional weights {(K,kmer)->w}
found_any_K = False
for K in range(2, 10): # adjust upper bound if you export more K's
pos_col, neg_col = f"K{K}_pos", f"K{K}_neg"
if pos_col in row and neg_col in row and (pd.notna(row[pos_col]) or pd.notna(row[neg_col])):
pos_list = _parse_klist(row.get(pos_col, ""))
neg_list = _parse_klist(row.get(neg_col, ""))
if pos_list or neg_list:
best_sets[K] = set(pos_list)
avoid_sets[K] = set(neg_list)
# optional weight columns like "K4_pos_w", "K4_neg_w" in "kmer:weight;..."
posw_col, negw_col = f"K{K}_pos_w", f"K{K}_neg_w"
if posw_col in row and pd.notna(row[posw_col]):
for kmer, w in _parse_kwmap(row[posw_col]).items():
pos_w[(K, kmer)] = w
if negw_col in row and pd.notna(row[negw_col]):
for kmer, w in _parse_kwmap(row[negw_col]).items():
neg_w[(K, kmer)] = w
found_any_K = True
if not found_any_K:
# fallback: your old single-K interface
best_kmers = parse_kmer_list(row.get("best_kmers", ""))
avoid_kmers = parse_kmer_list(row.get("avoid_kmers", ""))
lengths = set(len(k) for k in best_kmers) if best_kmers else set()
if len(lengths) > 1:
raise ValueError(f"Region {a_c}-{b_c}: multiple k lengths found in best_kmers: {lengths}")
k_len = lengths.pop() if lengths else 4
best_sets = {k_len: set(best_kmers)}
avoid_sets = {k_len: set(avoid_kmers)}
# 1) SEED (unchanged): place all listed best_kmers by wobble-aware placement
placed, skipped = [], []
# choose one K to iterate seeds when multi-K (use the longest set for seeding priority)
seed_K = max(best_sets.keys()) if best_sets else None
seeds_for_loop = sorted(best_sets.get(seed_K, []), key=len, reverse=True) if seed_K else []
for km in seeds_for_loop:
places = place_kmer_seed_in_region_codon(aa_seq, a_c, b_c, km, fixed_nt)
if not places:
skipped.append(km)
continue
# pick placement with highest wobble score (tie: fewer forced A/T, then earlier)
best_p, best_s = None, float("-inf")
for cand in places:
s = placement_wobble_score(cand["constraints"], aa_seq)
for patt in cand["constraints"].values():
if patt[2] in ("A", "T"):
s -= 0.25
if (s > best_s) or (s == best_s and (best_p is None or cand["start_nt"] < best_p["start_nt"])):
best_s, best_p = s, cand
# check conflict then fix nts
conflict = False
for t in range(best_p["start_nt"], best_p["end_nt"] + 1):
need = km[t - best_p["start_nt"]]
if t in fixed_nt and fixed_nt[t] != need:
conflict = True
break
if conflict:
skipped.append(km)
continue
for t in range(best_p["start_nt"], best_p["end_nt"] + 1):
fixed_nt[t] = km[t - best_p["start_nt"]]
placed.append(km)
# 2) BEAM-SEARCH fill codons in this region (codon-by-codon)
BEAM = 5 # beam width (tune as you like)
# derive “reference” k_len for tail buffering: use max K present
k_len_ref = (max(best_sets.keys()) if best_sets else 4)
tail_nt = tail_nt[-(k_len_ref - 1):] if k_len_ref > 1 else ""
# beam state: (score, tail_nt, local_choices[list of codons], local_fixed_nt{nt->base})
# we'll commit to chosen_codons/fixed_nt after region end
init_state = (0.0, tail_nt, [], dict())
beam = [init_state]
for ci in range(a_c - 1, b_c):
aa = aa_seq[ci]
# If already fixed by seeding, only that codon is allowed
patt = list("...")
for ofs in range(3):
nt_idx = ci * 3 + ofs
if nt_idx in fixed_nt:
patt[ofs] = fixed_nt[nt_idx]
patt_s = "".join(patt)
cand_codons = feasible_codons_with_pattern(aa, patt_s) or AA2CODONS[aa]
new_beam = []
for score, tail, local_codons, local_fix in beam:
# Kmax for local context (use the largest K defined this region; fallback 6)
Kmax = max(best_sets.keys() or [6])
# Collect up to Kmax-1 contiguous known nts to the RIGHT
# NOTE: we use 'fixed_nt' (global seeded/committed nts), not 'local_fix'
right_known = collect_right_known_nt(ci, fixed_nt, L_target_cds, limit_nt=Kmax-1)
for c in cand_codons:
# build temporary tail and check multi-K score
gain = score_increment_multiKs(
tail_nt=tail,
new_codon=c,
best_sets=best_sets,
avoid_sets=avoid_sets,
pos_w=pos_w if pos_w else None,
neg_w=neg_w if neg_w else None, # now supported
wobble=True,
wobble_scale=1.0,
scoring_mode="local",
right_known_nt=right_known,
hard_forbid=None # or a small set for true red-flags
)
if gain <= -1e5:
continue # forbidden
# make copies
tail2 = (tail + c)[-(k_len_ref - 1):] if k_len_ref > 1 else ""
local2 = local_codons + [c]
fix2 = dict(local_fix)
for ofs, ch in enumerate(c):
fix2[ci * 3 + ofs] = ch
new_beam.append((score + gain, tail2, local2, fix2))
if not new_beam:
# fall back: force first feasible to keep going
c = cand_codons[0]
tail2 = (beam[0][1] + c)[-(k_len_ref - 1):] if k_len_ref > 1 else ""
local2 = beam[0][2] + [c]
fix2 = dict(beam[0][3])
for ofs, ch in enumerate(c):
fix2[ci * 3 + ofs] = ch
new_beam = [(beam[0][0], tail2, local2, fix2)]
# prune
new_beam.sort(key=lambda t: t[0], reverse=True)
beam = new_beam[:BEAM]
# choose best path from beam and commit to global arrays
best_score, best_tail, best_local, best_fix = max(beam, key=lambda t: t[0])
# write chosen codons for this region
for ofs, c in enumerate(best_local):
idx = (a_c - 1) + ofs
chosen_codons[idx] = c
# update fixed_nt
fixed_nt.update(best_fix)
# update running tail
tail_nt = best_tail
log_info.append({
"region": f"{a_c}-{b_c}",
"k_len_ref": k_len_ref,
"best_total": sum(len(s) for s in best_sets.values()),
"best_placed": len(placed),
"best_skipped": skipped,
"beam_best_score": best_score,
"beam_kept": BEAM
})
# build final nt string
designed_nt = "".join(chosen_codons if all(chosen_codons) else [c or "NNN" for c in chosen_codons])
aa_percent = aminoacid_percentage(chosen_codons)[0]
gc_percent = gc_content([designed_nt])
return designed_nt, aa_percent, gc_percent, log_info
# ---------------- Example usage (comment out in library use) ----------------
if __name__ == "__main__":
# Example (replace with your real paths and AA sequence)
# summary_path = r"C:\path\to\region_sweep_summary.csv"
# aa_seq = "M..." # your amino-acid sequence
# nt_seq, codons, log = optimization(summary_path, aa_seq, use_percent_intervals=True)
# print(nt_seq)
pass