|
|
import sentencepiece as spm |
|
|
import os |
|
|
import json |
|
|
|
|
|
|
|
|
class MTPTokenizer: |
|
|
"""Tokenizer using SentencePiece BPE""" |
|
|
|
|
|
def __init__(self, model_path=None): |
|
|
self.sp = None |
|
|
self.model_path = model_path |
|
|
|
|
|
if model_path and os.path.exists(model_path): |
|
|
self.load(model_path) |
|
|
|
|
|
def train(self, corpus_path, vocab_size=4000, model_prefix='mtp_tokenizer'): |
|
|
"""Train SentencePiece BPE tokenizer on corpus""" |
|
|
|
|
|
|
|
|
texts = [] |
|
|
with open(corpus_path, 'r', encoding='utf-8') as f: |
|
|
for line in f: |
|
|
data = json.loads(line) |
|
|
if 'instruction' in data: |
|
|
texts.append(data['instruction']) |
|
|
if 'response' in data: |
|
|
texts.append(data['response']) |
|
|
|
|
|
|
|
|
temp_file = 'temp_corpus.txt' |
|
|
with open(temp_file, 'w', encoding='utf-8') as f: |
|
|
f.write('\n'.join(texts)) |
|
|
|
|
|
|
|
|
total_chars = sum(len(text) for text in texts) |
|
|
max_vocab = min(vocab_size, int(total_chars * 0.15)) |
|
|
|
|
|
print(f" β Corpus stats: {len(texts)} texts, {total_chars} characters") |
|
|
print(f" β Adjusted vocab size: {max_vocab} (requested: {vocab_size})") |
|
|
|
|
|
|
|
|
try: |
|
|
spm.SentencePieceTrainer.train( |
|
|
input=temp_file, |
|
|
model_prefix=model_prefix, |
|
|
vocab_size=max_vocab, |
|
|
model_type='bpe', |
|
|
pad_id=0, |
|
|
unk_id=1, |
|
|
bos_id=2, |
|
|
eos_id=3, |
|
|
character_coverage=1.0, |
|
|
normalization_rule_name='identity', |
|
|
num_threads=4, |
|
|
split_digits=True, |
|
|
allow_whitespace_only_pieces=False, |
|
|
byte_fallback=False, |
|
|
max_sentencepiece_length=16 |
|
|
) |
|
|
except RuntimeError as e: |
|
|
if "Vocabulary size too high" in str(e): |
|
|
|
|
|
import re |
|
|
match = re.search(r'value <= (\d+)', str(e)) |
|
|
if match: |
|
|
suggested_max = int(match.group(1)) |
|
|
print(f" β Retrying with vocab size: {suggested_max}") |
|
|
spm.SentencePieceTrainer.train( |
|
|
input=temp_file, |
|
|
model_prefix=model_prefix, |
|
|
vocab_size=suggested_max, |
|
|
model_type='bpe', |
|
|
pad_id=0, |
|
|
unk_id=1, |
|
|
bos_id=2, |
|
|
eos_id=3, |
|
|
character_coverage=1.0, |
|
|
normalization_rule_name='identity', |
|
|
num_threads=4, |
|
|
split_digits=True, |
|
|
allow_whitespace_only_pieces=False, |
|
|
byte_fallback=False, |
|
|
max_sentencepiece_length=16 |
|
|
) |
|
|
else: |
|
|
raise |
|
|
else: |
|
|
raise |
|
|
|
|
|
|
|
|
os.remove(temp_file) |
|
|
|
|
|
|
|
|
self.model_path = f"{model_prefix}.model" |
|
|
self.load(self.model_path) |
|
|
|
|
|
print(f"β Tokenizer trained: {self.vocab_size()} tokens") |
|
|
print(f"β Model saved: {self.model_path}") |
|
|
|
|
|
def load(self, model_path): |
|
|
"""Load trained tokenizer""" |
|
|
self.sp = spm.SentencePieceProcessor() |
|
|
self.sp.load(model_path) |
|
|
self.model_path = model_path |
|
|
|
|
|
def encode(self, text): |
|
|
"""Encode text to token IDs""" |
|
|
if self.sp is None: |
|
|
raise ValueError("Tokenizer not loaded. Train or load a model first.") |
|
|
return self.sp.encode_as_ids(text) |
|
|
|
|
|
def decode(self, ids): |
|
|
"""Decode token IDs to text""" |
|
|
if self.sp is None: |
|
|
raise ValueError("Tokenizer not loaded. Train or load a model first.") |
|
|
return self.sp.decode_ids(ids) |
|
|
|
|
|
def vocab_size(self): |
|
|
"""Get vocabulary size""" |
|
|
if self.sp is None: |
|
|
return 0 |
|
|
return self.sp.get_piece_size() |
|
|
|
|
|
def bos_id(self): |
|
|
"""Beginning of sentence token ID""" |
|
|
return self.sp.bos_id() |
|
|
|
|
|
def eos_id(self): |
|
|
"""End of sentence token ID""" |
|
|
return self.sp.eos_id() |
|
|
|
|
|
def pad_id(self): |
|
|
"""Padding token ID""" |
|
|
return self.sp.pad_id() |
|
|
|
|
|
def unk_id(self): |
|
|
"""Unknown token ID""" |
|
|
return self.sp.unk_id() |