|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
import re |
|
from abc import ABC, abstractmethod |
|
from functools import reduce |
|
from typing import Dict, List, Optional |
|
|
|
import jieba |
|
from pypinyin import Style, lazy_pinyin |
|
from pypinyin.contrib.tone_convert import to_finals_tone3, to_initials |
|
|
|
from zipvoice.tokenizer.normalizer import ChineseTextNormalizer, EnglishTextNormalizer |
|
|
|
try: |
|
from piper_phonemize import phonemize_espeak |
|
except Exception as ex: |
|
raise RuntimeError( |
|
f"{ex}\nPlease run\n" |
|
"pip install piper_phonemize -f \ |
|
https://k2-fsa.github.io/icefall/piper_phonemize.html" |
|
) |
|
|
|
|
|
class Tokenizer(ABC): |
|
"""Abstract base class for tokenizers, defining common interface.""" |
|
|
|
@abstractmethod |
|
def texts_to_token_ids(self, texts: List[str]) -> List[List[int]]: |
|
"""Convert list of texts to list of token id sequences.""" |
|
raise NotImplementedError |
|
|
|
@abstractmethod |
|
def texts_to_tokens(self, texts: List[str]) -> List[List[str]]: |
|
"""Convert list of texts to list of token sequences.""" |
|
raise NotImplementedError |
|
|
|
@abstractmethod |
|
def tokens_to_token_ids(self, tokens: List[List[str]]) -> List[List[int]]: |
|
"""Convert list of token sequences to list of token id sequences.""" |
|
raise NotImplementedError |
|
|
|
|
|
class SimpleTokenizer(Tokenizer): |
|
"""The simplpest tokenizer, treat every character as a token, |
|
without text normalization. |
|
""" |
|
|
|
def __init__(self, token_file: Optional[str] = None): |
|
""" |
|
Args: |
|
tokens: the file that contains information that maps tokens to ids, |
|
which is a text file with '{token}\t{token_id}' per line. |
|
""" |
|
|
|
self.has_tokens = False |
|
if token_file is None: |
|
logging.debug( |
|
"Initialize Tokenizer without tokens file, \ |
|
will fail when map to ids." |
|
) |
|
return |
|
self.token2id: Dict[str, int] = {} |
|
with open(token_file, "r", encoding="utf-8") as f: |
|
for line in f.readlines(): |
|
info = line.rstrip().split("\t") |
|
token, id = info[0], int(info[1]) |
|
assert token not in self.token2id, token |
|
self.token2id[token] = id |
|
self.pad_id = self.token2id["_"] |
|
self.vocab_size = len(self.token2id) |
|
self.has_tokens = True |
|
|
|
def texts_to_token_ids( |
|
self, |
|
texts: List[str], |
|
) -> List[List[int]]: |
|
return self.tokens_to_token_ids(self.texts_to_tokens(texts)) |
|
|
|
def texts_to_tokens( |
|
self, |
|
texts: List[str], |
|
) -> List[List[str]]: |
|
tokens_list = [list(texts[i]) for i in range(len(texts))] |
|
return tokens_list |
|
|
|
def tokens_to_token_ids( |
|
self, |
|
tokens_list: List[List[str]], |
|
) -> List[List[int]]: |
|
assert self.has_tokens, "Please initialize Tokenizer with a tokens file." |
|
|
|
token_ids_list = [] |
|
|
|
for tokens in tokens_list: |
|
token_ids = [] |
|
for t in tokens: |
|
if t not in self.token2id: |
|
logging.debug(f"Skip OOV {t}") |
|
continue |
|
token_ids.append(self.token2id[t]) |
|
|
|
token_ids_list.append(token_ids) |
|
|
|
return token_ids_list |
|
|
|
|
|
class EspeakTokenizer(Tokenizer): |
|
"""A simple tokenizer with Espeak g2p function.""" |
|
|
|
def __init__(self, token_file: Optional[str] = None, lang: str = "en-us"): |
|
""" |
|
Args: |
|
tokens: the file that contains information that maps tokens to ids, |
|
which is a text file with '{token}\t{token_id}' per line. |
|
lang: the language identifier, see |
|
https://github.com/rhasspy/espeak-ng/blob/master/docs/languages.md |
|
""" |
|
|
|
self.has_tokens = False |
|
if token_file is None: |
|
logging.debug( |
|
"Initialize Tokenizer without tokens file, \ |
|
will fail when map to ids." |
|
) |
|
return |
|
self.token2id: Dict[str, int] = {} |
|
with open(token_file, "r", encoding="utf-8") as f: |
|
for line in f.readlines(): |
|
info = line.rstrip().split("\t") |
|
token, id = info[0], int(info[1]) |
|
assert token not in self.token2id, token |
|
self.token2id[token] = id |
|
self.pad_id = self.token2id["_"] |
|
self.vocab_size = len(self.token2id) |
|
self.has_tokens = True |
|
self.lang = lang |
|
|
|
def g2p(self, text: str) -> List[str]: |
|
try: |
|
tokens = phonemize_espeak(text, self.lang) |
|
tokens = reduce(lambda x, y: x + y, tokens) |
|
return tokens |
|
except Exception as ex: |
|
logging.warning(f"Tokenization of {self.lang} texts failed: {ex}") |
|
return [] |
|
|
|
def texts_to_token_ids( |
|
self, |
|
texts: List[str], |
|
) -> List[List[int]]: |
|
return self.tokens_to_token_ids(self.texts_to_tokens(texts)) |
|
|
|
def texts_to_tokens( |
|
self, |
|
texts: List[str], |
|
) -> List[List[str]]: |
|
tokens_list = [self.g2p(texts[i]) for i in range(len(texts))] |
|
return tokens_list |
|
|
|
def tokens_to_token_ids( |
|
self, |
|
tokens_list: List[List[str]], |
|
) -> List[List[int]]: |
|
assert self.has_tokens, "Please initialize Tokenizer with a tokens file." |
|
|
|
token_ids_list = [] |
|
|
|
for tokens in tokens_list: |
|
token_ids = [] |
|
for t in tokens: |
|
if t not in self.token2id: |
|
logging.debug(f"Skip OOV {t}") |
|
continue |
|
token_ids.append(self.token2id[t]) |
|
|
|
token_ids_list.append(token_ids) |
|
|
|
return token_ids_list |
|
|
|
|
|
class EmiliaTokenizer(Tokenizer): |
|
def __init__(self, token_file: Optional[str] = None, token_type="phone"): |
|
""" |
|
Args: |
|
tokens: the file that contains information that maps tokens to ids, |
|
which is a text file with '{token}\t{token_id}' per line. |
|
""" |
|
assert ( |
|
token_type == "phone" |
|
), f"Only support phone tokenizer for Emilia, but get {token_type}." |
|
|
|
self.english_normalizer = EnglishTextNormalizer() |
|
self.chinese_normalizer = ChineseTextNormalizer() |
|
|
|
self.has_tokens = False |
|
if token_file is None: |
|
logging.debug( |
|
"Initialize Tokenizer without tokens file, \ |
|
will fail when map to ids." |
|
) |
|
return |
|
self.token2id: Dict[str, int] = {} |
|
with open(token_file, "r", encoding="utf-8") as f: |
|
for line in f.readlines(): |
|
info = line.rstrip().split("\t") |
|
token, id = info[0], int(info[1]) |
|
assert token not in self.token2id, token |
|
self.token2id[token] = id |
|
self.pad_id = self.token2id["_"] |
|
|
|
self.vocab_size = len(self.token2id) |
|
self.has_tokens = True |
|
|
|
def texts_to_token_ids( |
|
self, |
|
texts: List[str], |
|
) -> List[List[int]]: |
|
return self.tokens_to_token_ids(self.texts_to_tokens(texts)) |
|
|
|
def preprocess_text( |
|
self, |
|
text: str, |
|
) -> str: |
|
return self.map_punctuations(text) |
|
|
|
def texts_to_tokens( |
|
self, |
|
texts: List[str], |
|
) -> List[List[str]]: |
|
for i in range(len(texts)): |
|
|
|
texts[i] = self.preprocess_text(texts[i]) |
|
|
|
phoneme_list = [] |
|
for text in texts: |
|
|
|
segments = self.get_segment(text) |
|
all_phoneme = [] |
|
for index in range(len(segments)): |
|
seg = segments[index] |
|
if seg[1] == "zh": |
|
phoneme = self.tokenize_ZH(seg[0]) |
|
elif seg[1] == "en": |
|
phoneme = self.tokenize_EN(seg[0]) |
|
elif seg[1] == "pinyin": |
|
phoneme = self.tokenize_pinyin(seg[0]) |
|
elif seg[1] == "tag": |
|
phoneme = [seg[0]] |
|
else: |
|
logging.warning( |
|
f"No English or Chinese characters found, \ |
|
skipping segment of unknown language: {seg}" |
|
) |
|
continue |
|
all_phoneme += phoneme |
|
phoneme_list.append(all_phoneme) |
|
return phoneme_list |
|
|
|
def tokens_to_token_ids( |
|
self, |
|
tokens_list: List[List[str]], |
|
) -> List[List[int]]: |
|
assert self.has_tokens, "Please initialize Tokenizer with a tokens file." |
|
token_ids_list = [] |
|
|
|
for tokens in tokens_list: |
|
token_ids = [] |
|
for t in tokens: |
|
if t not in self.token2id: |
|
logging.debug(f"Skip OOV {t}") |
|
continue |
|
token_ids.append(self.token2id[t]) |
|
|
|
token_ids_list.append(token_ids) |
|
|
|
return token_ids_list |
|
|
|
def tokenize_ZH(self, text: str) -> List[str]: |
|
try: |
|
text = self.chinese_normalizer.normalize(text) |
|
segs = list(jieba.cut(text)) |
|
full = lazy_pinyin( |
|
segs, |
|
style=Style.TONE3, |
|
tone_sandhi=True, |
|
neutral_tone_with_five=True, |
|
) |
|
phones = [] |
|
for x in full: |
|
|
|
if not (x[0:-1].isalpha() and x[-1] in ("1", "2", "3", "4", "5")): |
|
phones.append(x) |
|
continue |
|
else: |
|
phones.extend(self.seperate_pinyin(x)) |
|
return phones |
|
except Exception as ex: |
|
logging.warning(f"Tokenization of Chinese texts failed: {ex}") |
|
return [] |
|
|
|
def tokenize_EN(self, text: str) -> List[str]: |
|
try: |
|
text = self.english_normalizer.normalize(text) |
|
tokens = phonemize_espeak(text, "en-us") |
|
tokens = reduce(lambda x, y: x + y, tokens) |
|
return tokens |
|
except Exception as ex: |
|
logging.warning(f"Tokenization of English texts failed: {ex}") |
|
return [] |
|
|
|
def tokenize_pinyin(self, text: str) -> List[str]: |
|
try: |
|
assert text.startswith("<") and text.endswith(">") |
|
text = text.lstrip("<").rstrip(">") |
|
|
|
if not (text[0:-1].isalpha() and text[-1] in ("1", "2", "3", "4", "5")): |
|
logging.warning( |
|
f"Strings enclosed with <> should be pinyin, \ |
|
but got: {text}. Skipped it. " |
|
) |
|
return [] |
|
else: |
|
return self.seperate_pinyin(text) |
|
except Exception as ex: |
|
logging.warning(f"Tokenize pinyin failed: {ex}") |
|
return [] |
|
|
|
def seperate_pinyin(self, text: str) -> List[str]: |
|
""" |
|
Separate pinyin into initial and final |
|
""" |
|
pinyins = [] |
|
initial = to_initials(text, strict=False) |
|
|
|
|
|
final = to_finals_tone3( |
|
text, |
|
strict=False, |
|
neutral_tone_with_five=True, |
|
) |
|
if initial != "": |
|
|
|
|
|
pinyins.append(initial + "0") |
|
if final != "": |
|
pinyins.append(final) |
|
return pinyins |
|
|
|
def map_punctuations(self, text): |
|
text = text.replace(",", ",") |
|
text = text.replace("。", ".") |
|
text = text.replace("!", "!") |
|
text = text.replace("?", "?") |
|
text = text.replace(";", ";") |
|
text = text.replace(":", ":") |
|
text = text.replace("、", ",") |
|
text = text.replace("‘", "'") |
|
text = text.replace("“", '"') |
|
text = text.replace("”", '"') |
|
text = text.replace("’", "'") |
|
text = text.replace("⋯", "…") |
|
text = text.replace("···", "…") |
|
text = text.replace("・・・", "…") |
|
text = text.replace("...", "…") |
|
return text |
|
|
|
def get_segment(self, text: str) -> List[str]: |
|
""" |
|
Split a text into segments based on language types |
|
(Chinese, English, Pinyin, tags, etc.) |
|
|
|
Args: |
|
text (str): Input text to be segmented |
|
|
|
Returns: |
|
List[str]: Segmented text parts with their language types |
|
|
|
Example: |
|
Input: 我们是小米人,是吗? Yes I think so!霍...啦啦啦 |
|
Output: [('我们是小米人,是吗? ', 'zh'), |
|
('Yes I think so!', 'en'), ('霍...啦啦啦', 'zh')] |
|
""" |
|
|
|
segments = [] |
|
|
|
types = [] |
|
temp_seg = "" |
|
temp_lang = "" |
|
|
|
|
|
|
|
_part_pattern = re.compile(r"[<[].*?[>\]]|.") |
|
text = _part_pattern.findall(text) |
|
|
|
for i, part in enumerate(text): |
|
if self.is_chinese(part) or self.is_pinyin(part): |
|
types.append("zh") |
|
elif self.is_alphabet(part): |
|
types.append("en") |
|
else: |
|
types.append("other") |
|
|
|
assert len(types) == len(text) |
|
|
|
for i in range(len(types)): |
|
|
|
if i == 0: |
|
temp_seg += text[i] |
|
temp_lang = types[i] |
|
else: |
|
if temp_lang == "other": |
|
temp_seg += text[i] |
|
temp_lang = types[i] |
|
else: |
|
if types[i] in [temp_lang, "other"]: |
|
temp_seg += text[i] |
|
else: |
|
segments.append((temp_seg, temp_lang)) |
|
temp_seg = text[i] |
|
temp_lang = types[i] |
|
|
|
segments.append((temp_seg, temp_lang)) |
|
|
|
|
|
segments = self.split_segments(segments) |
|
return segments |
|
|
|
def split_segments(self, segments): |
|
""" |
|
split segments into smaller parts if special strings enclosed by [] or <> |
|
are found, where <> denotes pinyin strings, [] denotes other special strings. |
|
|
|
Args: |
|
segments (list): A list of tuples where each tuple contains: |
|
- temp_seg (str): The text segment to be split. |
|
- temp_lang (str): The language code associated with the segment. |
|
|
|
Returns: |
|
list: A list of smaller segments. |
|
""" |
|
result = [] |
|
for temp_seg, temp_lang in segments: |
|
parts = re.split(r"([<[].*?[>\]])", temp_seg) |
|
for part in parts: |
|
if not part: |
|
continue |
|
if self.is_pinyin(part): |
|
result.append((part, "pinyin")) |
|
elif self.is_tag(part): |
|
result.append((part, "tag")) |
|
else: |
|
result.append((part, temp_lang)) |
|
return result |
|
|
|
def is_chinese(self, char: str) -> bool: |
|
if char >= "\u4e00" and char <= "\u9fa5": |
|
return True |
|
else: |
|
return False |
|
|
|
def is_alphabet(self, char: str) -> bool: |
|
if (char >= "\u0041" and char <= "\u005a") or ( |
|
char >= "\u0061" and char <= "\u007a" |
|
): |
|
return True |
|
else: |
|
return False |
|
|
|
def is_pinyin(self, part: str) -> bool: |
|
if part.startswith("<") and part.endswith(">"): |
|
return True |
|
else: |
|
return False |
|
|
|
def is_tag(self, part: str) -> bool: |
|
if part.startswith("[") and part.endswith("]"): |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
class DialogTokenizer(EmiliaTokenizer): |
|
def __init__(self, token_file: Optional[str] = None, token_type="phone"): |
|
super().__init__(token_file=token_file, token_type=token_type) |
|
self.spk_a_id = self.token2id["[S1]"] |
|
self.spk_b_id = self.token2id["[S2]"] |
|
|
|
def preprocess_text( |
|
self, |
|
text: str, |
|
) -> str: |
|
text = re.sub(r"\s*(\[S[12]\])\s*", r"\1", text) |
|
text = self.map_punctuations(text) |
|
return text |
|
|
|
|
|
class LibriTTSTokenizer(Tokenizer): |
|
def __init__(self, token_file: Optional[str] = None, token_type="char"): |
|
""" |
|
Args: |
|
type: the type of tokenizer, e.g., bpe, char, phone. |
|
tokens: the file that contains information that maps tokens to ids, |
|
which is a text file with '{token}\t{token_id}' per line if type is |
|
char or phone, otherwise it is a bpe_model file. |
|
""" |
|
self.type = token_type |
|
assert token_type in ["bpe", "char", "phone"] |
|
try: |
|
import tacotron_cleaner.cleaners |
|
except Exception as ex: |
|
raise RuntimeError(f"{ex}\nPlease run\n" "pip install espnet_tts_frontend") |
|
|
|
self.normalize = tacotron_cleaner.cleaners.custom_english_cleaners |
|
|
|
self.has_tokens = False |
|
if token_file is None: |
|
logging.debug( |
|
"Initialize Tokenizer without tokens file, \ |
|
will fail when map to ids." |
|
) |
|
return |
|
if token_type == "bpe": |
|
import sentencepiece as spm |
|
|
|
self.sp = spm.SentencePieceProcessor() |
|
self.sp.load(token_file) |
|
self.pad_id = self.sp.piece_to_id("<pad>") |
|
self.vocab_size = self.sp.get_piece_size() |
|
else: |
|
self.token2id: Dict[str, int] = {} |
|
with open(token_file, "r", encoding="utf-8") as f: |
|
for line in f.readlines(): |
|
info = line.rstrip().split("\t") |
|
token, id = info[0], int(info[1]) |
|
assert token not in self.token2id, token |
|
self.token2id[token] = id |
|
self.pad_id = self.token2id["_"] |
|
self.vocab_size = len(self.token2id) |
|
self.has_tokens = True |
|
|
|
def texts_to_token_ids( |
|
self, |
|
texts: List[str], |
|
) -> List[List[int]]: |
|
if self.type == "bpe": |
|
for i in range(len(texts)): |
|
texts[i] = self.normalize(texts[i]) |
|
return self.sp.encode(texts) |
|
else: |
|
return self.tokens_to_token_ids(self.texts_to_tokens(texts)) |
|
|
|
def texts_to_tokens( |
|
self, |
|
texts: List[str], |
|
) -> List[List[str]]: |
|
for i in range(len(texts)): |
|
texts[i] = self.normalize(texts[i]) |
|
|
|
if self.type == "char": |
|
tokens_list = [list(texts[i]) for i in range(len(texts))] |
|
elif self.type == "phone": |
|
tokens_list = [ |
|
phonemize_espeak(texts[i].lower(), "en-us") for i in range(len(texts)) |
|
] |
|
elif self.type == "bpe": |
|
tokens_list = self.sp.encode(texts, out_type=str) |
|
|
|
return tokens_list |
|
|
|
def tokens_to_token_ids( |
|
self, |
|
tokens_list: List[List[str]], |
|
) -> List[List[int]]: |
|
assert self.has_tokens, "Please initialize Tokenizer with a tokens file." |
|
|
|
assert self.type != "bpe", "BPE tokenizer does not support this function." |
|
|
|
token_ids_list = [] |
|
|
|
for tokens in tokens_list: |
|
token_ids = [] |
|
for t in tokens: |
|
if t not in self.token2id: |
|
logging.debug(f"Skip OOV {t}") |
|
continue |
|
token_ids.append(self.token2id[t]) |
|
|
|
token_ids_list.append(token_ids) |
|
|
|
return token_ids_list |
|
|
|
|
|
if __name__ == "__main__": |
|
text = ( |
|
"我们是5年小米人,是吗? Yes I think so! " |
|
"mr king, 5 years, from 2019 to 2024." |
|
"霍...啦啦啦超过90%的人<le5>...?!9204" |
|
) |
|
tokenizer = EmiliaTokenizer() |
|
tokens = tokenizer.texts_to_tokens([text]) |
|
print(f"tokens: {'|'.join(tokens[0])}") |
|
|