|
|
from transformers.processing_utils import ProcessorMixin |
|
|
from transformers.tokenization_utils_base import BatchEncoding |
|
|
|
|
|
|
|
|
class PDeepPPProcessor(ProcessorMixin): |
|
|
def __init__(self, pad_char="X", target_length=33): |
|
|
self.pad_char = pad_char |
|
|
self.target_length = target_length |
|
|
|
|
|
def pad_sequence(self, seq): |
|
|
"""确保序列长度为 target_length,不足的部分用 pad_char 在两侧均匀填充""" |
|
|
if len(seq) < self.target_length: |
|
|
total_padding = self.target_length - len(seq) |
|
|
left_padding = total_padding // 2 |
|
|
right_padding = total_padding - left_padding |
|
|
seq = self.pad_char * left_padding + seq + self.pad_char * right_padding |
|
|
return seq[:self.target_length] |
|
|
|
|
|
def extract_ptm_sequences(self, sequences): |
|
|
"""处理 PTM 数据,确保目标氨基酸(S、T、Y)位于序列中心""" |
|
|
ptm_data = [] |
|
|
for seq in sequences: |
|
|
for i in range(len(seq)): |
|
|
if seq[i] in {'S', 'T', 'Y'}: |
|
|
start = max(0, i - self.target_length // 2) |
|
|
end = min(len(seq), start + self.target_length) |
|
|
padded_seq = self.pad_sequence(seq[start:end]) |
|
|
ptm_data.append(padded_seq) |
|
|
return ptm_data |
|
|
|
|
|
def extract_bps_sequences(self, sequences, overlapping=True, step_size=5): |
|
|
"""处理生物活性数据(BPS),关注整个序列,可重叠""" |
|
|
bioactive_data = [] |
|
|
for seq in sequences: |
|
|
if len(seq) < self.target_length: |
|
|
|
|
|
padded_seq = self.pad_sequence(seq) |
|
|
bioactive_data.append(padded_seq) |
|
|
else: |
|
|
|
|
|
for i in range(0, len(seq) - self.target_length + 1, |
|
|
step_size if overlapping else self.target_length): |
|
|
bioactive_data.append(self.pad_sequence(seq[i:i + self.target_length])) |
|
|
return bioactive_data |
|
|
|
|
|
def __call__( |
|
|
self, |
|
|
sequences, |
|
|
mode, |
|
|
overlapping=True, |
|
|
step_size=5, |
|
|
**kwargs |
|
|
): |
|
|
""" |
|
|
预处理蛋白质序列,仅处理数据到指定长度。 |
|
|
|
|
|
Args: |
|
|
sequences: 序列列表或单个序列字符串。 |
|
|
mode: 选择处理模式,必须从外部传入,"PTM" 或 "BPS"。 |
|
|
overlapping: BPS 模式下是否使用重叠窗口。 |
|
|
step_size: BPS 模式下的步长。 |
|
|
""" |
|
|
|
|
|
if isinstance(sequences, str): |
|
|
sequences = [sequences] |
|
|
|
|
|
|
|
|
if mode == "PTM": |
|
|
processed_sequences = self.extract_ptm_sequences(sequences) |
|
|
elif mode == "BPS": |
|
|
processed_sequences = self.extract_bps_sequences( |
|
|
sequences, |
|
|
overlapping=overlapping, |
|
|
step_size=step_size |
|
|
) |
|
|
else: |
|
|
raise ValueError("Invalid mode. Please choose 'PTM' or 'BPS'.") |
|
|
|
|
|
if len(processed_sequences) == 0: |
|
|
raise ValueError("No sequences processed. Check input data and processing logic.") |
|
|
|
|
|
|
|
|
model_inputs = { |
|
|
"raw_sequences": processed_sequences, |
|
|
} |
|
|
|
|
|
return BatchEncoding(data=model_inputs) |