Spaces:
Running
Running
from typing import Dict, List, Tuple | |
import os, json, math, numpy as np, pandas as pd | |
from collections import defaultdict, Counter | |
# ---------------- Genetic code (DNA) ---------------- | |
AA2CODONS = { | |
'A':['GCT','GCC','GCA','GCG'], | |
'R':['CGT','CGC','CGA','CGG','AGA','AGG'], | |
'N':['AAT','AAC'], | |
'D':['GAT','GAC'], | |
'C':['TGT','TGC'], | |
'Q':['CAA','CAG'], | |
'E':['GAA','GAG'], | |
'G':['GGT','GGC','GGA','GGG'], | |
'H':['CAT','CAC'], | |
'I':['ATT','ATC','ATA'], | |
'L':['TTA','TTG','CTT','CTC','CTA','CTG'], | |
'K':['AAA','AAG'], | |
'M':['ATG'], | |
'F':['TTT','TTC'], | |
'P':['CCT','CCC','CCA','CCG'], | |
'S':['TCT','TCC','TCA','TCG','AGT','AGC'], | |
'T':['ACT','ACC','ACA','ACG'], | |
'W':['TGG'], | |
'Y':['TAT','TAC'], | |
'V':['GTT','GTC','GTA','GTG'], | |
'*':['TAA','TAG','TGA'] | |
} | |
DNA_Codons = { | |
# 'M' - START, '_' - STOP | |
"GCT": "A", "GCC": "A", "GCA": "A", "GCG": "A", | |
"TGT": "C", "TGC": "C", | |
"GAT": "D", "GAC": "D", | |
"GAA": "E", "GAG": "E", | |
"TTT": "F", "TTC": "F", | |
"GGT": "G", "GGC": "G", "GGA": "G", "GGG": "G", | |
"CAT": "H", "CAC": "H", | |
"ATA": "I", "ATT": "I", "ATC": "I", | |
"AAA": "K", "AAG": "K", | |
"TTA": "L", "TTG": "L", "CTT": "L", "CTC": "L", "CTA": "L", "CTG": "L", | |
"ATG": "M", | |
"AAT": "N", "AAC": "N", | |
"CCT": "P", "CCC": "P", "CCA": "P", "CCG": "P", | |
"CAA": "Q", "CAG": "Q", | |
"CGT": "R", "CGC": "R", "CGA": "R", "CGG": "R", "AGA": "R", "AGG": "R", | |
"TCT": "S", "TCC": "S", "TCA": "S", "TCG": "S", "AGT": "S", "AGC": "S", | |
"ACT": "T", "ACC": "T", "ACA": "T", "ACG": "T", | |
"GTT": "V", "GTC": "V", "GTA": "V", "GTG": "V", | |
"TGG": "W", | |
"TAT": "Y", "TAC": "Y", | |
"TAA": "_", "TAG": "_", "TGA": "_" | |
} | |
# ---------------- Helpers ---------------- | |
def aminoacid_percentage(codons): | |
'''Calculate the percentage and count of corrsponding codons for each amino acid. | |
- codons: list of codons seperated, like [ACT,CCU,GTT,...] | |
''' | |
amino_dict_count = defaultdict(list) | |
amino_dict_per = defaultdict(list) | |
for i,v in enumerate(codons): | |
amino = DNA_Codons[v] | |
amino_dict_per[amino].append(v) | |
for k,v in amino_dict_per.items(): | |
c = Counter(v) | |
sub_dict = {kk:np.round(vv/len(v),2) for kk,vv in c.items()} | |
amino_dict_per[k] = sub_dict | |
amino_dict_count[k] = c | |
return amino_dict_per,amino_dict_count | |
def gc_content(sequence=None,fasta_dir=None): | |
"""calculate the GC content for the whole and each position of the codon (1,2 and 3) | |
sequence: as a list and without any space. ['AGCCCCTTT...']""" | |
if sequence!=None: | |
sequences=sequence | |
else: | |
sequences=fasta_to_list(fasta_dir,seq_to_codon=False,sos_eos=False) | |
#calculate the original GC content | |
gc_content=[[(seq.lower().count('g')+seq.lower().count('c'))/len(seq)*100] for seq in sequences] | |
for index,seq in enumerate(sequences): | |
seq=seq.lower() | |
for i in range(3): | |
position_nucleotides=seq[i::3] | |
# Count G and C in the position i nucleotides | |
gc_count = position_nucleotides.count('g') + position_nucleotides.count('c') | |
total_position_nucleotides = len(position_nucleotides) | |
# Calculate GC content percentage | |
gc_content_percentage = (gc_count / total_position_nucleotides) * 100 | |
gc_content[index].append(gc_content_percentage) | |
return pd.DataFrame(gc_content,columns=['Original','Position One','Position Two','Position Three']).round(1) | |
def parse_kmer_list(x) -> List[str]: | |
"""Parse semicolon-separated kmers 'AAA;TTT;...' into a list (uppercased).""" | |
if x is None: | |
return [] | |
s = str(x).strip() | |
if not s: | |
return [] | |
return [k.strip().upper() for k in s.split(";") if k.strip()] | |
def load_summary(summary_path: str) -> pd.DataFrame: | |
"""Read CSV or XLSX summary to DataFrame.""" | |
sp = summary_path.lower() | |
if sp.endswith(".xlsx") or sp.endswith(".xls"): | |
return pd.read_excel(summary_path) | |
return pd.read_csv(summary_path) | |
def scale_interval_codon(a_codon: int, b_codon: int, | |
L_train_cds: int, L_target_cds: int) -> Tuple[int, int]: | |
""" | |
Percentage-map training codon interval [a,b] (1-based, inclusive) to target codon length. | |
""" | |
a2 = 1 + int(((a_codon - 1) / max(1, L_train_cds)) * max(1, L_target_cds - 1)) | |
b2 = int(math.ceil((b_codon / max(1, L_train_cds)) * L_target_cds)) | |
a2 = max(1, min(a2, L_target_cds)) | |
b2 = max(1, min(b2, L_target_cds)) | |
return a2, b2 | |
def codon_region_to_nt_span(a_codon: int, b_codon: int) -> Tuple[int, int]: | |
"""Convert a codon region to 1-based nucleotide span [nt_start, nt_end].""" | |
return 3*(a_codon-1)+1, 3*b_codon | |
def feasible_codons_with_pattern(aa: str, pattern: str) -> List[str]: | |
""" | |
pattern is a 3-char string like '.C.' where '.' = free. | |
Return all codons for this amino acid matching the pattern. | |
""" | |
outs = [] | |
for c in AA2CODONS[aa]: | |
ok = True | |
for i, ch in enumerate(pattern): | |
if ch != '.' and c[i] != ch: | |
ok = False | |
break | |
if ok: | |
outs.append(c) | |
return outs | |
# ---------------- Wobble preference helpers ---------------- | |
def wobble_bonus(base: str) -> int: | |
"""3rd-base preference: C=+2, G=-1, A/T=-2; '.' or others -> 0.""" | |
if base == "C": | |
return 2 | |
if base == "G": | |
return -1 | |
if base in ("A", "T"): | |
return -2 | |
return 0 | |
def expected_wobble_for_pattern(aa: str, pattern: str) -> float: | |
""" | |
If wobble (pos 3) is fixed, return its bonus. | |
If wobble is free ('.'), return the average wobble bonus over all feasible codons. | |
""" | |
wob = pattern[2] | |
if wob in "ACGT": | |
return float(wobble_bonus(wob)) | |
feas = feasible_codons_with_pattern(aa, pattern) | |
if not feas: | |
return 0.0 | |
return sum(wobble_bonus(c[2]) for c in feas) / len(feas) | |
def placement_wobble_score(constraints: Dict[int, str], aa_seq: str) -> float: | |
""" | |
Sum wobble preferences over all codons touched by a placement. | |
constraints: {codon_index_0based: '...|.C.|..A' patterns} | |
""" | |
total = 0.0 | |
for ci, patt in constraints.items(): | |
if ci < 0 or ci >= len(aa_seq): | |
continue | |
total += expected_wobble_for_pattern(aa_seq[ci], patt) | |
return total | |
# ---------------- Seeding best_kmers (enumerate placements) ---------------- | |
def place_kmer_seed_in_region_codon(aa_seq: str, | |
a_codon: int, b_codon: int, | |
kmer: str, | |
fixed_nt: Dict[int, str]) -> List[Dict]: | |
""" | |
Enumerate feasible placements of a k-mer inside codon region [a_codon,b_codon] (1-based, inclusive). | |
fixed_nt: dict(nt_index_0based -> 'A/C/G/T') already fixed nts. | |
Returns a list of feasible placements (each with nt start/end and codon patterns). | |
""" | |
nt_start, nt_end = codon_region_to_nt_span(a_codon, b_codon) | |
region_start_nt0, region_end_nt0 = nt_start-1, nt_end-1 | |
k = len(kmer) | |
placements = [] | |
for u in range(region_start_nt0, region_end_nt0 - k + 2): | |
v = u + k - 1 | |
# conflict with fixed nts? | |
conflict = False | |
for t in range(k): | |
idx = u + t | |
need = kmer[t] | |
if idx in fixed_nt and fixed_nt[idx] != need: | |
conflict = True | |
break | |
if conflict: | |
continue | |
# build codon-level constraints | |
constraints: Dict[int, str] = {} | |
codon_i0 = u // 3 | |
codon_i1 = v // 3 | |
ok = True | |
for ci in range(codon_i0, codon_i1 + 1): | |
if ci >= len(aa_seq): | |
ok = False | |
break | |
patt = list("...") | |
for ofs in range(3): | |
nt_idx = ci*3 + ofs | |
if u <= nt_idx <= v: | |
patt[ofs] = kmer[nt_idx - u] | |
patt_s = "".join(patt) | |
if not feasible_codons_with_pattern(aa_seq[ci], patt_s): | |
ok = False | |
break | |
constraints[ci] = patt_s | |
if not ok: | |
continue | |
placements.append({"start_nt": u, "end_nt": v, "constraints": constraints}) | |
return placements | |
# ---------------- Scoring while filling ---------------- | |
def _parse_klist(s): | |
if pd.isna(s) or not str(s).strip(): | |
return [] | |
return [t.strip().upper() for t in str(s).split(";") if t.strip()] | |
def _parse_kwmap(s): | |
""" | |
Parse 'kmer:weight;kmer:weight;...' -> dict. If weights absent, return {}. | |
""" | |
if pd.isna(s) or not str(s).strip(): | |
return {} | |
out = {} | |
for tok in str(s).split(";"): | |
tok = tok.strip() | |
if not tok: | |
continue | |
if ":" in tok: | |
k, w = tok.split(":", 1) | |
try: | |
out[k.strip().upper()] = float(w) | |
except: | |
pass | |
return out | |
# ========= New helpers ========= | |
def len_weight(K: int, is_pos: bool) -> float: | |
""" | |
Length-aware default weights. | |
Pos: +0.5*K (e.g., {+1,+1.5,+2.5,+3} for K={2,3,5,6}) | |
Neg: -1.0*K (e.g., {-2,-3,-5,-6}) | |
Tune or replace with info-weighting if you have stats. | |
""" | |
return (0.5 * K) if is_pos else (-1.0 * K) | |
def collect_right_known_nt(ci: int, | |
fixed_nt: dict, | |
L_target_cds: int, | |
limit_nt: int) -> str: | |
""" | |
Collect up to 'limit_nt' contiguous known nts to the RIGHT of codon index 'ci' | |
(i.e., starting immediately after the newly added codon). | |
- ci is 0-based codon index we're currently filling. | |
- fixed_nt: {nt_index_0based: 'A/C/G/T'} (from seeding or previous commits) | |
- We stop when we hit the first unknown nt, or reach 'limit_nt'. | |
Returns a string of known nts (may be empty). | |
""" | |
out = [] | |
start_nt = (ci + 1) * 3 # first nt AFTER the new codon (0-based) | |
nt = start_nt | |
while len(out) < limit_nt: | |
if nt in fixed_nt: | |
out.append(fixed_nt[nt]) | |
nt += 1 | |
# also require contiguity: if (nt) not in fixed_nt we break below | |
continue | |
break | |
return "".join(out) | |
def enumerate_local_windows_overlapping_new(block: str, | |
new_start: int, # index in block (0-based) | |
new_len: int, | |
K: int): | |
""" | |
Yield all K-length windows (substring indices [s, s+K)) within 'block' that | |
overlap the 'new' segment [new_start, new_start+new_len). | |
""" | |
L = len(block) | |
if K > L: | |
return | |
new_end = new_start + new_len # exclusive | |
# window [s, s+K) overlaps new if: s < new_end and s+K > new_start | |
s_min = max(0, new_end - K) # smallest s whose window can still reach new_end | |
s_max = min(new_start, L - K) # largest s that still starts before/at new_start | |
# Expand a bit to cover all overlaps safely: | |
s_lo = max(0, new_start - (K - 1)) | |
s_hi = min(L - K, new_end - 1) | |
s_from = min(s_min, s_lo) | |
s_to = max(s_max, s_hi) | |
for s in range(s_from, s_to + 1): | |
# overlap check | |
if s < new_end and (s + K) > new_start: | |
yield s, block[s:s+K] | |
def score_increment_multiKs(tail_nt: str, | |
new_codon: str, | |
best_sets: dict, # {K: set(kmer)} | |
avoid_sets: dict, # {K: set(kmer)} | |
pos_w: dict = None, # {(K,kmer): weight} | |
neg_w: dict = None, # {(K,kmer): weight} | |
wobble: bool = True, | |
wobble_scale: float = 1.0, | |
# NEW: | |
scoring_mode: str = "local", # "local" or "suffix" | |
right_known_nt: str = "", # only used for "local" | |
hard_forbid: set = None # optional set of k-mers that must kill a branch | |
) -> float: | |
""" | |
Returns the incremental score for appending 'new_codon'. | |
- 'tail_nt' is the last (Kmax-1) nts already known to the LEFT. | |
- In 'local' mode, we also use 'right_known_nt' (contiguous known nts to the right). | |
- Only windows that overlap 'new_codon' are counted in 'local' mode. | |
- In 'suffix' mode, we keep legacy behavior: only the trailing window per K. | |
""" | |
HARD_KILL = -1e9 | |
# Build the left→new→right local block for "local" mode | |
if scoring_mode == "local": | |
left = tail_nt or "" | |
mid = new_codon | |
right = right_known_nt or "" | |
block = left + mid + right | |
new_start = len(left) | |
new_len = 3 | |
L_block = len(block) | |
else: | |
# suffix mode uses only left+mid sequence | |
seq = (tail_nt + new_codon) if tail_nt else new_codon | |
L_seq = len(seq) | |
gain = 0.0 | |
# Per-K evaluation | |
Ks = sorted(best_sets.keys() | avoid_sets.keys()) | |
for K in Ks: | |
# skip if we don't have enough context | |
if scoring_mode == "local": | |
if K > (L_block): | |
continue | |
windows = list(enumerate_local_windows_overlapping_new( | |
block, new_start, new_len, K | |
)) | |
if not windows: | |
continue | |
# Sum all windows’ contributions | |
for _, km in windows: | |
# Hard-forbid first (rare, red-flag patterns) | |
if hard_forbid and km in hard_forbid: | |
return HARD_KILL | |
# soft negatives / pos | |
if km in avoid_sets.get(K, set()): | |
if neg_w and (K, km) in neg_w: | |
gain += -abs(neg_w[(K, km)]) | |
else: | |
gain += len_weight(K, is_pos=False) | |
if km in best_sets.get(K, set()): | |
if pos_w and (K, km) in pos_w: | |
gain += abs(pos_w[(K, km)]) | |
else: | |
gain += len_weight(K, is_pos=True) | |
else: | |
# suffix-only: check only the trailing K-mer | |
if K > L_seq: | |
continue | |
km = seq[-K:] | |
if hard_forbid and km in hard_forbid: | |
return HARD_KILL | |
if km in avoid_sets.get(K, set()): | |
if neg_w and (K, km) in neg_w: | |
gain += -abs(neg_w[(K, km)]) | |
else: | |
gain += len_weight(K, is_pos=False) | |
if km in best_sets.get(K, set()): | |
if pos_w and (K, km) in pos_w: | |
gain += abs(pos_w[(K, km)]) | |
else: | |
gain += len_weight(K, is_pos=True) | |
# Wobble bonus for the 3rd base of the new codon | |
if wobble: | |
b3 = new_codon[2] | |
wb = 2 if b3 == "C" else (-1 if b3 == "G" else (-2 if b3 in ("A", "T") else 0)) | |
gain += wobble_scale * wb | |
return gain | |
# ---------------- Main: use ALL best_kmers, forbid avoid_kmers ---------------- | |
def optimization(summary_path: str, | |
aa_seq: str, | |
use_percent_intervals: bool = True) -> Tuple[str, List[str], List[dict]]: | |
""" | |
For each region: | |
- Seed: place listed positive k-mers (wobble-aware placement). | |
- Fill: beam-search codon-by-codon using multi-K positive/negative k-mer sets | |
(+ optional per-kmer weights) and a wobble bonus. | |
Regions are CODON-based (start/end = 1-based codon indices). | |
""" | |
df = load_summary(summary_path) | |
# ---- Flexible schema check: accept either legacy or multi-K columns | |
has_legacy = all(c in df.columns for c in ["start","end","best_kmers","avoid_kmers"]) | |
has_multik = all(c in df.columns for c in ["start","end"]) and any( | |
str(col).startswith("K") and (str(col).endswith("_pos") or str(col).endswith("_neg")) | |
for col in df.columns | |
) | |
if not (has_legacy or has_multik): | |
raise ValueError( | |
"Input must have 'start','end' and either legacy columns " | |
"['best_kmers','avoid_kmers'] OR per-K columns like 'K4_pos','K4_neg' (optionally *_pos_w/*_neg_w)." | |
) | |
L_target_cds = len(aa_seq) | |
L_train_cds = int(df["end"].max()) # training CDS length in codons | |
# Build codon regions (scaled or clipped) | |
regions: List[Tuple[int, int, pd.Series]] = [] | |
for _, r in df.iterrows(): | |
a_c, b_c = int(r["start"]), int(r["end"]) | |
if use_percent_intervals: | |
a2, b2 = scale_interval_codon(a_c, b_c, L_train_cds, L_target_cds) | |
else: | |
a2, b2 = max(1, a_c), min(L_target_cds, b_c) | |
if a2 <= b2: | |
regions.append((a2, b2, r)) | |
# Global state accumulated across regions | |
chosen_codons: List[str] = [None] * L_target_cds | |
fixed_nt: Dict[int, str] = {} # nt index (0-based) -> 'A/C/G/T' | |
tail_nt: str = "" # last (maxK-1) nts across regions | |
log_info: List[dict] = [] # notes per region | |
for (a_c, b_c, row) in regions: | |
# 0) parse per-K lists if present; else fall back to single best/avoid | |
best_sets, avoid_sets = {}, {} | |
pos_w, neg_w = {}, {} # optional weights {(K,kmer)->w} | |
found_any_K = False | |
for K in range(2, 10): # adjust upper bound if you export more K's | |
pos_col, neg_col = f"K{K}_pos", f"K{K}_neg" | |
if pos_col in row and neg_col in row and (pd.notna(row[pos_col]) or pd.notna(row[neg_col])): | |
pos_list = _parse_klist(row.get(pos_col, "")) | |
neg_list = _parse_klist(row.get(neg_col, "")) | |
if pos_list or neg_list: | |
best_sets[K] = set(pos_list) | |
avoid_sets[K] = set(neg_list) | |
# optional weight columns like "K4_pos_w", "K4_neg_w" in "kmer:weight;..." | |
posw_col, negw_col = f"K{K}_pos_w", f"K{K}_neg_w" | |
if posw_col in row and pd.notna(row[posw_col]): | |
for kmer, w in _parse_kwmap(row[posw_col]).items(): | |
pos_w[(K, kmer)] = w | |
if negw_col in row and pd.notna(row[negw_col]): | |
for kmer, w in _parse_kwmap(row[negw_col]).items(): | |
neg_w[(K, kmer)] = w | |
found_any_K = True | |
if not found_any_K: | |
# fallback: your old single-K interface | |
best_kmers = parse_kmer_list(row.get("best_kmers", "")) | |
avoid_kmers = parse_kmer_list(row.get("avoid_kmers", "")) | |
lengths = set(len(k) for k in best_kmers) if best_kmers else set() | |
if len(lengths) > 1: | |
raise ValueError(f"Region {a_c}-{b_c}: multiple k lengths found in best_kmers: {lengths}") | |
k_len = lengths.pop() if lengths else 4 | |
best_sets = {k_len: set(best_kmers)} | |
avoid_sets = {k_len: set(avoid_kmers)} | |
# 1) SEED (unchanged): place all listed best_kmers by wobble-aware placement | |
placed, skipped = [], [] | |
# choose one K to iterate seeds when multi-K (use the longest set for seeding priority) | |
seed_K = max(best_sets.keys()) if best_sets else None | |
seeds_for_loop = sorted(best_sets.get(seed_K, []), key=len, reverse=True) if seed_K else [] | |
for km in seeds_for_loop: | |
places = place_kmer_seed_in_region_codon(aa_seq, a_c, b_c, km, fixed_nt) | |
if not places: | |
skipped.append(km) | |
continue | |
# pick placement with highest wobble score (tie: fewer forced A/T, then earlier) | |
best_p, best_s = None, float("-inf") | |
for cand in places: | |
s = placement_wobble_score(cand["constraints"], aa_seq) | |
for patt in cand["constraints"].values(): | |
if patt[2] in ("A", "T"): | |
s -= 0.25 | |
if (s > best_s) or (s == best_s and (best_p is None or cand["start_nt"] < best_p["start_nt"])): | |
best_s, best_p = s, cand | |
# check conflict then fix nts | |
conflict = False | |
for t in range(best_p["start_nt"], best_p["end_nt"] + 1): | |
need = km[t - best_p["start_nt"]] | |
if t in fixed_nt and fixed_nt[t] != need: | |
conflict = True | |
break | |
if conflict: | |
skipped.append(km) | |
continue | |
for t in range(best_p["start_nt"], best_p["end_nt"] + 1): | |
fixed_nt[t] = km[t - best_p["start_nt"]] | |
placed.append(km) | |
# 2) BEAM-SEARCH fill codons in this region (codon-by-codon) | |
BEAM = 5 # beam width (tune as you like) | |
# derive “reference” k_len for tail buffering: use max K present | |
k_len_ref = (max(best_sets.keys()) if best_sets else 4) | |
tail_nt = tail_nt[-(k_len_ref - 1):] if k_len_ref > 1 else "" | |
# beam state: (score, tail_nt, local_choices[list of codons], local_fixed_nt{nt->base}) | |
# we'll commit to chosen_codons/fixed_nt after region end | |
init_state = (0.0, tail_nt, [], dict()) | |
beam = [init_state] | |
for ci in range(a_c - 1, b_c): | |
aa = aa_seq[ci] | |
# If already fixed by seeding, only that codon is allowed | |
patt = list("...") | |
for ofs in range(3): | |
nt_idx = ci * 3 + ofs | |
if nt_idx in fixed_nt: | |
patt[ofs] = fixed_nt[nt_idx] | |
patt_s = "".join(patt) | |
cand_codons = feasible_codons_with_pattern(aa, patt_s) or AA2CODONS[aa] | |
new_beam = [] | |
for score, tail, local_codons, local_fix in beam: | |
# Kmax for local context (use the largest K defined this region; fallback 6) | |
Kmax = max(best_sets.keys() or [6]) | |
# Collect up to Kmax-1 contiguous known nts to the RIGHT | |
# NOTE: we use 'fixed_nt' (global seeded/committed nts), not 'local_fix' | |
right_known = collect_right_known_nt(ci, fixed_nt, L_target_cds, limit_nt=Kmax-1) | |
for c in cand_codons: | |
# build temporary tail and check multi-K score | |
gain = score_increment_multiKs( | |
tail_nt=tail, | |
new_codon=c, | |
best_sets=best_sets, | |
avoid_sets=avoid_sets, | |
pos_w=pos_w if pos_w else None, | |
neg_w=neg_w if neg_w else None, # now supported | |
wobble=True, | |
wobble_scale=1.0, | |
scoring_mode="local", | |
right_known_nt=right_known, | |
hard_forbid=None # or a small set for true red-flags | |
) | |
if gain <= -1e5: | |
continue # forbidden | |
# make copies | |
tail2 = (tail + c)[-(k_len_ref - 1):] if k_len_ref > 1 else "" | |
local2 = local_codons + [c] | |
fix2 = dict(local_fix) | |
for ofs, ch in enumerate(c): | |
fix2[ci * 3 + ofs] = ch | |
new_beam.append((score + gain, tail2, local2, fix2)) | |
if not new_beam: | |
# fall back: force first feasible to keep going | |
c = cand_codons[0] | |
tail2 = (beam[0][1] + c)[-(k_len_ref - 1):] if k_len_ref > 1 else "" | |
local2 = beam[0][2] + [c] | |
fix2 = dict(beam[0][3]) | |
for ofs, ch in enumerate(c): | |
fix2[ci * 3 + ofs] = ch | |
new_beam = [(beam[0][0], tail2, local2, fix2)] | |
# prune | |
new_beam.sort(key=lambda t: t[0], reverse=True) | |
beam = new_beam[:BEAM] | |
# choose best path from beam and commit to global arrays | |
best_score, best_tail, best_local, best_fix = max(beam, key=lambda t: t[0]) | |
# write chosen codons for this region | |
for ofs, c in enumerate(best_local): | |
idx = (a_c - 1) + ofs | |
chosen_codons[idx] = c | |
# update fixed_nt | |
fixed_nt.update(best_fix) | |
# update running tail | |
tail_nt = best_tail | |
log_info.append({ | |
"region": f"{a_c}-{b_c}", | |
"k_len_ref": k_len_ref, | |
"best_total": sum(len(s) for s in best_sets.values()), | |
"best_placed": len(placed), | |
"best_skipped": skipped, | |
"beam_best_score": best_score, | |
"beam_kept": BEAM | |
}) | |
# build final nt string | |
designed_nt = "".join(chosen_codons if all(chosen_codons) else [c or "NNN" for c in chosen_codons]) | |
aa_percent = aminoacid_percentage(chosen_codons)[0] | |
gc_percent = gc_content([designed_nt]) | |
return designed_nt, aa_percent, gc_percent, log_info | |
# ---------------- Example usage (comment out in library use) ---------------- | |
if __name__ == "__main__": | |
# Example (replace with your real paths and AA sequence) | |
# summary_path = r"C:\path\to\region_sweep_summary.csv" | |
# aa_seq = "M..." # your amino-acid sequence | |
# nt_seq, codons, log = optimization(summary_path, aa_seq, use_percent_intervals=True) | |
# print(nt_seq) | |
pass |