# Copyright 2023-2024 Xiaomi Corp. (authors: Zengwei Yao # Han Zhu, # Wei Kang) # # See ../../LICENSE for clarification regarding multiple authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import re from abc import ABC, abstractmethod from functools import reduce from typing import Dict, List, Optional import jieba from pypinyin import Style, lazy_pinyin from pypinyin.contrib.tone_convert import to_finals_tone3, to_initials from zipvoice.tokenizer.normalizer import ChineseTextNormalizer, EnglishTextNormalizer try: from piper_phonemize import phonemize_espeak except Exception as ex: raise RuntimeError( f"{ex}\nPlease run\n" "pip install piper_phonemize -f \ https://k2-fsa.github.io/icefall/piper_phonemize.html" ) class Tokenizer(ABC): """Abstract base class for tokenizers, defining common interface.""" @abstractmethod def texts_to_token_ids(self, texts: List[str]) -> List[List[int]]: """Convert list of texts to list of token id sequences.""" raise NotImplementedError @abstractmethod def texts_to_tokens(self, texts: List[str]) -> List[List[str]]: """Convert list of texts to list of token sequences.""" raise NotImplementedError @abstractmethod def tokens_to_token_ids(self, tokens: List[List[str]]) -> List[List[int]]: """Convert list of token sequences to list of token id sequences.""" raise NotImplementedError class SimpleTokenizer(Tokenizer): """The simplpest tokenizer, treat every character as a token, without text normalization. """ def __init__(self, token_file: Optional[str] = None): """ Args: tokens: the file that contains information that maps tokens to ids, which is a text file with '{token}\t{token_id}' per line. """ # Parse token file self.has_tokens = False if token_file is None: logging.debug( "Initialize Tokenizer without tokens file, \ will fail when map to ids." ) return self.token2id: Dict[str, int] = {} with open(token_file, "r", encoding="utf-8") as f: for line in f.readlines(): info = line.rstrip().split("\t") token, id = info[0], int(info[1]) assert token not in self.token2id, token self.token2id[token] = id self.pad_id = self.token2id["_"] # padding self.vocab_size = len(self.token2id) self.has_tokens = True def texts_to_token_ids( self, texts: List[str], ) -> List[List[int]]: return self.tokens_to_token_ids(self.texts_to_tokens(texts)) def texts_to_tokens( self, texts: List[str], ) -> List[List[str]]: tokens_list = [list(texts[i]) for i in range(len(texts))] return tokens_list def tokens_to_token_ids( self, tokens_list: List[List[str]], ) -> List[List[int]]: assert self.has_tokens, "Please initialize Tokenizer with a tokens file." token_ids_list = [] for tokens in tokens_list: token_ids = [] for t in tokens: if t not in self.token2id: logging.debug(f"Skip OOV {t}") continue token_ids.append(self.token2id[t]) token_ids_list.append(token_ids) return token_ids_list class EspeakTokenizer(Tokenizer): """A simple tokenizer with Espeak g2p function.""" def __init__(self, token_file: Optional[str] = None, lang: str = "en-us"): """ Args: tokens: the file that contains information that maps tokens to ids, which is a text file with '{token}\t{token_id}' per line. lang: the language identifier, see https://github.com/rhasspy/espeak-ng/blob/master/docs/languages.md """ # Parse token file self.has_tokens = False if token_file is None: logging.debug( "Initialize Tokenizer without tokens file, \ will fail when map to ids." ) return self.token2id: Dict[str, int] = {} with open(token_file, "r", encoding="utf-8") as f: for line in f.readlines(): info = line.rstrip().split("\t") token, id = info[0], int(info[1]) assert token not in self.token2id, token self.token2id[token] = id self.pad_id = self.token2id["_"] # padding self.vocab_size = len(self.token2id) self.has_tokens = True self.lang = lang def g2p(self, text: str) -> List[str]: try: tokens = phonemize_espeak(text, self.lang) tokens = reduce(lambda x, y: x + y, tokens) return tokens except Exception as ex: logging.warning(f"Tokenization of {self.lang} texts failed: {ex}") return [] def texts_to_token_ids( self, texts: List[str], ) -> List[List[int]]: return self.tokens_to_token_ids(self.texts_to_tokens(texts)) def texts_to_tokens( self, texts: List[str], ) -> List[List[str]]: tokens_list = [self.g2p(texts[i]) for i in range(len(texts))] return tokens_list def tokens_to_token_ids( self, tokens_list: List[List[str]], ) -> List[List[int]]: assert self.has_tokens, "Please initialize Tokenizer with a tokens file." token_ids_list = [] for tokens in tokens_list: token_ids = [] for t in tokens: if t not in self.token2id: logging.debug(f"Skip OOV {t}") continue token_ids.append(self.token2id[t]) token_ids_list.append(token_ids) return token_ids_list class EmiliaTokenizer(Tokenizer): def __init__(self, token_file: Optional[str] = None, token_type="phone"): """ Args: tokens: the file that contains information that maps tokens to ids, which is a text file with '{token}\t{token_id}' per line. """ assert ( token_type == "phone" ), f"Only support phone tokenizer for Emilia, but get {token_type}." self.english_normalizer = EnglishTextNormalizer() self.chinese_normalizer = ChineseTextNormalizer() self.has_tokens = False if token_file is None: logging.debug( "Initialize Tokenizer without tokens file, \ will fail when map to ids." ) return self.token2id: Dict[str, int] = {} with open(token_file, "r", encoding="utf-8") as f: for line in f.readlines(): info = line.rstrip().split("\t") token, id = info[0], int(info[1]) assert token not in self.token2id, token self.token2id[token] = id self.pad_id = self.token2id["_"] # padding self.vocab_size = len(self.token2id) self.has_tokens = True def texts_to_token_ids( self, texts: List[str], ) -> List[List[int]]: return self.tokens_to_token_ids(self.texts_to_tokens(texts)) def preprocess_text( self, text: str, ) -> str: return self.map_punctuations(text) def texts_to_tokens( self, texts: List[str], ) -> List[List[str]]: for i in range(len(texts)): # Text normalization texts[i] = self.preprocess_text(texts[i]) phoneme_list = [] for text in texts: # now only en and ch segments = self.get_segment(text) all_phoneme = [] for index in range(len(segments)): seg = segments[index] if seg[1] == "zh": phoneme = self.tokenize_ZH(seg[0]) elif seg[1] == "en": phoneme = self.tokenize_EN(seg[0]) elif seg[1] == "pinyin": phoneme = self.tokenize_pinyin(seg[0]) elif seg[1] == "tag": phoneme = [seg[0]] else: logging.warning( f"No English or Chinese characters found, \ skipping segment of unknown language: {seg}" ) continue all_phoneme += phoneme phoneme_list.append(all_phoneme) return phoneme_list def tokens_to_token_ids( self, tokens_list: List[List[str]], ) -> List[List[int]]: assert self.has_tokens, "Please initialize Tokenizer with a tokens file." token_ids_list = [] for tokens in tokens_list: token_ids = [] for t in tokens: if t not in self.token2id: logging.debug(f"Skip OOV {t}") continue token_ids.append(self.token2id[t]) token_ids_list.append(token_ids) return token_ids_list def tokenize_ZH(self, text: str) -> List[str]: try: text = self.chinese_normalizer.normalize(text) segs = list(jieba.cut(text)) full = lazy_pinyin( segs, style=Style.TONE3, tone_sandhi=True, neutral_tone_with_five=True, ) phones = [] for x in full: # valid pinyin (in tone3 style) is alphabet + 1 number in [1-5]. if not (x[0:-1].isalpha() and x[-1] in ("1", "2", "3", "4", "5")): phones.append(x) continue else: phones.extend(self.seperate_pinyin(x)) return phones except Exception as ex: logging.warning(f"Tokenization of Chinese texts failed: {ex}") return [] def tokenize_EN(self, text: str) -> List[str]: try: text = self.english_normalizer.normalize(text) tokens = phonemize_espeak(text, "en-us") tokens = reduce(lambda x, y: x + y, tokens) return tokens except Exception as ex: logging.warning(f"Tokenization of English texts failed: {ex}") return [] def tokenize_pinyin(self, text: str) -> List[str]: try: assert text.startswith("<") and text.endswith(">") text = text.lstrip("<").rstrip(">") # valid pinyin (in tone3 style) is alphabet + 1 number in [1-5]. if not (text[0:-1].isalpha() and text[-1] in ("1", "2", "3", "4", "5")): logging.warning( f"Strings enclosed with <> should be pinyin, \ but got: {text}. Skipped it. " ) return [] else: return self.seperate_pinyin(text) except Exception as ex: logging.warning(f"Tokenize pinyin failed: {ex}") return [] def seperate_pinyin(self, text: str) -> List[str]: """ Separate pinyin into initial and final """ pinyins = [] initial = to_initials(text, strict=False) # don't want to share tokens with espeak tokens, # so use tone3 style final = to_finals_tone3( text, strict=False, neutral_tone_with_five=True, ) if initial != "": # don't want to share tokens with espeak tokens, # so add a '0' after each initial pinyins.append(initial + "0") if final != "": pinyins.append(final) return pinyins def map_punctuations(self, text): text = text.replace(",", ",") text = text.replace("。", ".") text = text.replace("!", "!") text = text.replace("?", "?") text = text.replace(";", ";") text = text.replace(":", ":") text = text.replace("、", ",") text = text.replace("‘", "'") text = text.replace("“", '"') text = text.replace("”", '"') text = text.replace("’", "'") text = text.replace("⋯", "…") text = text.replace("···", "…") text = text.replace("・・・", "…") text = text.replace("...", "…") return text def get_segment(self, text: str) -> List[str]: """ Split a text into segments based on language types (Chinese, English, Pinyin, tags, etc.) Args: text (str): Input text to be segmented Returns: List[str]: Segmented text parts with their language types Example: Input: 我们是小米人,是吗? Yes I think so!霍...啦啦啦 Output: [('我们是小米人,是吗? ', 'zh'), ('Yes I think so!', 'en'), ('霍...啦啦啦', 'zh')] """ # Stores the final segmented parts and their language types segments = [] # Stores the language type of each character in the input text types = [] temp_seg = "" temp_lang = "" # Each part is a character, or a special string enclosed in <> and [] # <> denotes pinyin string, [] denotes other special strings. _part_pattern = re.compile(r"[<[].*?[>\]]|.") text = _part_pattern.findall(text) for i, part in enumerate(text): if self.is_chinese(part) or self.is_pinyin(part): types.append("zh") elif self.is_alphabet(part): types.append("en") else: types.append("other") assert len(types) == len(text) for i in range(len(types)): # find the first char of the seg if i == 0: temp_seg += text[i] temp_lang = types[i] else: if temp_lang == "other": temp_seg += text[i] temp_lang = types[i] else: if types[i] in [temp_lang, "other"]: temp_seg += text[i] else: segments.append((temp_seg, temp_lang)) temp_seg = text[i] temp_lang = types[i] segments.append((temp_seg, temp_lang)) # Handle "pinyin" and "tag" types segments = self.split_segments(segments) return segments def split_segments(self, segments): """ split segments into smaller parts if special strings enclosed by [] or <> are found, where <> denotes pinyin strings, [] denotes other special strings. Args: segments (list): A list of tuples where each tuple contains: - temp_seg (str): The text segment to be split. - temp_lang (str): The language code associated with the segment. Returns: list: A list of smaller segments. """ result = [] for temp_seg, temp_lang in segments: parts = re.split(r"([<[].*?[>\]])", temp_seg) for part in parts: if not part: continue if self.is_pinyin(part): result.append((part, "pinyin")) elif self.is_tag(part): result.append((part, "tag")) else: result.append((part, temp_lang)) return result def is_chinese(self, char: str) -> bool: if char >= "\u4e00" and char <= "\u9fa5": return True else: return False def is_alphabet(self, char: str) -> bool: if (char >= "\u0041" and char <= "\u005a") or ( char >= "\u0061" and char <= "\u007a" ): return True else: return False def is_pinyin(self, part: str) -> bool: if part.startswith("<") and part.endswith(">"): return True else: return False def is_tag(self, part: str) -> bool: if part.startswith("[") and part.endswith("]"): return True else: return False class DialogTokenizer(EmiliaTokenizer): def __init__(self, token_file: Optional[str] = None, token_type="phone"): super().__init__(token_file=token_file, token_type=token_type) self.spk_a_id = self.token2id["[S1]"] self.spk_b_id = self.token2id["[S2]"] def preprocess_text( self, text: str, ) -> str: text = re.sub(r"\s*(\[S[12]\])\s*", r"\1", text) text = self.map_punctuations(text) return text class LibriTTSTokenizer(Tokenizer): def __init__(self, token_file: Optional[str] = None, token_type="char"): """ Args: type: the type of tokenizer, e.g., bpe, char, phone. tokens: the file that contains information that maps tokens to ids, which is a text file with '{token}\t{token_id}' per line if type is char or phone, otherwise it is a bpe_model file. """ self.type = token_type assert token_type in ["bpe", "char", "phone"] try: import tacotron_cleaner.cleaners except Exception as ex: raise RuntimeError(f"{ex}\nPlease run\n" "pip install espnet_tts_frontend") self.normalize = tacotron_cleaner.cleaners.custom_english_cleaners self.has_tokens = False if token_file is None: logging.debug( "Initialize Tokenizer without tokens file, \ will fail when map to ids." ) return if token_type == "bpe": import sentencepiece as spm self.sp = spm.SentencePieceProcessor() self.sp.load(token_file) self.pad_id = self.sp.piece_to_id("") self.vocab_size = self.sp.get_piece_size() else: self.token2id: Dict[str, int] = {} with open(token_file, "r", encoding="utf-8") as f: for line in f.readlines(): info = line.rstrip().split("\t") token, id = info[0], int(info[1]) assert token not in self.token2id, token self.token2id[token] = id self.pad_id = self.token2id["_"] # padding self.vocab_size = len(self.token2id) self.has_tokens = True def texts_to_token_ids( self, texts: List[str], ) -> List[List[int]]: if self.type == "bpe": for i in range(len(texts)): texts[i] = self.normalize(texts[i]) return self.sp.encode(texts) else: return self.tokens_to_token_ids(self.texts_to_tokens(texts)) def texts_to_tokens( self, texts: List[str], ) -> List[List[str]]: for i in range(len(texts)): texts[i] = self.normalize(texts[i]) if self.type == "char": tokens_list = [list(texts[i]) for i in range(len(texts))] elif self.type == "phone": tokens_list = [ phonemize_espeak(texts[i].lower(), "en-us") for i in range(len(texts)) ] elif self.type == "bpe": tokens_list = self.sp.encode(texts, out_type=str) return tokens_list def tokens_to_token_ids( self, tokens_list: List[List[str]], ) -> List[List[int]]: assert self.has_tokens, "Please initialize Tokenizer with a tokens file." assert self.type != "bpe", "BPE tokenizer does not support this function." token_ids_list = [] for tokens in tokens_list: token_ids = [] for t in tokens: if t not in self.token2id: logging.debug(f"Skip OOV {t}") continue token_ids.append(self.token2id[t]) token_ids_list.append(token_ids) return token_ids_list if __name__ == "__main__": text = ( "我们是5年小米人,是吗? Yes I think so! " "mr king, 5 years, from 2019 to 2024." "霍...啦啦啦超过90%的人...?!9204" ) tokenizer = EmiliaTokenizer() tokens = tokenizer.texts_to_tokens([text]) print(f"tokens: {'|'.join(tokens[0])}")