| | import re |
| | from typing import List, Optional, Union, Dict, Any |
| | from functools import cached_property |
| |
|
| | import pypinyin |
| | import torch |
| | from hangul_romanize import Transliter |
| | from hangul_romanize.rule import academic |
| | from num2words import num2words |
| | from spacy.lang.ar import Arabic |
| | from spacy.lang.en import English |
| | from spacy.lang.es import Spanish |
| | from spacy.lang.ja import Japanese |
| | from spacy.lang.zh import Chinese |
| | from transformers import PreTrainedTokenizerFast, BatchEncoding |
| | from transformers.tokenization_utils_base import TruncationStrategy, PaddingStrategy |
| | from tokenizers import Tokenizer |
| | from tokenizers.pre_tokenizers import WhitespaceSplit |
| | from tokenizers.processors import TemplateProcessing |
| |
|
| | from auralis.models.xttsv2.components.tts.layers.xtts.zh_num2words import TextNorm as zh_num2words |
| |
|
| | import cutlet |
| |
|
| | def get_spacy_lang(lang): |
| | if lang == "zh": |
| | return Chinese() |
| | elif lang == "ja": |
| | return Japanese() |
| | elif lang == "ar": |
| | return Arabic() |
| | elif lang == "es": |
| | return Spanish() |
| | else: |
| | |
| | return English() |
| |
|
| |
|
| | def find_best_split_point(text: str, target_pos: int, window_size: int = 30) -> int: |
| | """ |
| | Find best split point near target position considering punctuation and language markers. |
| | added for better sentence splitting in TTS. |
| | """ |
| | |
| | markers = [ |
| | |
| | (r'[.!?؟။။။]+[\s]*', 1.0), |
| | (r'[\n\r]+\s*[\n\r]+', 1.0), |
| | (r'[:|;;:;][\s]*', 0.9), |
| |
|
| | |
| | (r'[,,،、][\s]*', 0.8), |
| | (r'[)}\])】』»›》\s]+', 0.7), |
| | (r'[-—−]+[\s]*', 0.7), |
| |
|
| | |
| | (r'\s+[&+=/\s]+\s+', 0.6), |
| | (r'[\s]+', 0.5), |
| | ] |
| |
|
| | |
| | start = max(0, target_pos - window_size) |
| | end = min(len(text), target_pos + window_size) |
| | window = text[start:end] |
| |
|
| | best_pos = target_pos |
| | best_score = 0 |
| |
|
| | for pattern, priority in markers: |
| | matches = list(re.finditer(pattern, window)) |
| | for match in matches: |
| | |
| | pos = start + match.end() |
| | distance = abs(pos - target_pos) |
| | distance_score = 1 - (distance / (window_size * 2)) |
| |
|
| | |
| | score = priority * distance_score |
| |
|
| | if score > best_score: |
| | best_score = score |
| | best_pos = pos |
| |
|
| | return best_pos |
| |
|
| |
|
| | def split_sentence(text: str, lang: str, text_split_length: int = 250) -> List[str]: |
| | """ |
| | Enhanced sentence splitting with language awareness and optimal breakpoints. |
| | |
| | Args: |
| | text: Input text to split |
| | lang: Language code |
| | text_split_length: Target length for splits |
| | |
| | Returns: |
| | List of text splits optimized for TTS |
| | """ |
| | text = text.strip() |
| | if len(text) <= text_split_length: |
| | return [text] |
| |
|
| | nlp = get_spacy_lang(lang) |
| | if "sentencizer" not in nlp.pipe_names: |
| | nlp.add_pipe("sentencizer") |
| |
|
| | |
| | doc = nlp(text) |
| | sentences = list(doc.sents) |
| |
|
| | splits = [] |
| | current_split = [] |
| | current_length = 0 |
| |
|
| | for sent in sentences: |
| | sentence_text = str(sent).strip() |
| | sentence_length = len(sentence_text) |
| |
|
| | |
| | if current_length + sentence_length <= text_split_length: |
| | current_split.append(sentence_text) |
| | current_length += sentence_length + 1 |
| |
|
| | |
| | elif sentence_length > text_split_length: |
| | |
| | if current_split: |
| | splits.append(" ".join(current_split)) |
| | current_split = [] |
| | current_length = 0 |
| |
|
| | |
| | remaining = sentence_text |
| | while len(remaining) > text_split_length: |
| | split_pos = find_best_split_point( |
| | remaining, |
| | text_split_length, |
| | window_size=30 |
| | ) |
| |
|
| | |
| | splits.append(remaining[:split_pos].strip()) |
| | remaining = remaining[split_pos:].strip() |
| |
|
| | |
| | if remaining: |
| | current_split = [remaining] |
| | current_length = len(remaining) |
| |
|
| | |
| | else: |
| | splits.append(" ".join(current_split)) |
| | current_split = [sentence_text] |
| | current_length = sentence_length |
| |
|
| | |
| | if current_split: |
| | splits.append(" ".join(current_split)) |
| |
|
| | cleaned_sentences = [s[:-1]+' ' if s.endswith('.') else s for s in splits if s] |
| | |
| | return cleaned_sentences |
| |
|
| | _whitespace_re = re.compile(r"\s+") |
| |
|
| | |
| | _abbreviations = { |
| | "en": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("mrs", "misess"), |
| | ("mr", "mister"), |
| | ("dr", "doctor"), |
| | ("st", "saint"), |
| | ("co", "company"), |
| | ("jr", "junior"), |
| | ("maj", "major"), |
| | ("gen", "general"), |
| | ("drs", "doctors"), |
| | ("rev", "reverend"), |
| | ("lt", "lieutenant"), |
| | ("hon", "honorable"), |
| | ("sgt", "sergeant"), |
| | ("capt", "captain"), |
| | ("esq", "esquire"), |
| | ("ltd", "limited"), |
| | ("col", "colonel"), |
| | ("ft", "fort"), |
| | ] |
| | ], |
| | "es": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("sra", "señora"), |
| | ("sr", "señor"), |
| | ("dr", "doctor"), |
| | ("dra", "doctora"), |
| | ("st", "santo"), |
| | ("co", "compañía"), |
| | ("jr", "junior"), |
| | ("ltd", "limitada"), |
| | ] |
| | ], |
| | "fr": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("mme", "madame"), |
| | ("mr", "monsieur"), |
| | ("dr", "docteur"), |
| | ("st", "saint"), |
| | ("co", "compagnie"), |
| | ("jr", "junior"), |
| | ("ltd", "limitée"), |
| | ] |
| | ], |
| | "de": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("fr", "frau"), |
| | ("dr", "doktor"), |
| | ("st", "sankt"), |
| | ("co", "firma"), |
| | ("jr", "junior"), |
| | ] |
| | ], |
| | "pt": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("sra", "senhora"), |
| | ("sr", "senhor"), |
| | ("dr", "doutor"), |
| | ("dra", "doutora"), |
| | ("st", "santo"), |
| | ("co", "companhia"), |
| | ("jr", "júnior"), |
| | ("ltd", "limitada"), |
| | ] |
| | ], |
| | "it": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | |
| | ("sig", "signore"), |
| | ("dr", "dottore"), |
| | ("st", "santo"), |
| | ("co", "compagnia"), |
| | ("jr", "junior"), |
| | ("ltd", "limitata"), |
| | ] |
| | ], |
| | "pl": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("p", "pani"), |
| | ("m", "pan"), |
| | ("dr", "doktor"), |
| | ("sw", "święty"), |
| | ("jr", "junior"), |
| | ] |
| | ], |
| | "ar": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | |
| | ] |
| | ], |
| | "zh": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | |
| | ] |
| | ], |
| | "cs": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("dr", "doktor"), |
| | ("ing", "inženýr"), |
| | ("p", "pan"), |
| | |
| | ] |
| | ], |
| | "ru": [ |
| | (re.compile("\\b%s\\b" % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("г-жа", "госпожа"), |
| | ("г-н", "господин"), |
| | ("д-р", "доктор"), |
| | |
| | ] |
| | ], |
| | "nl": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("dhr", "de heer"), |
| | ("mevr", "mevrouw"), |
| | ("dr", "dokter"), |
| | ("jhr", "jonkheer"), |
| | |
| | ] |
| | ], |
| | "tr": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("b", "bay"), |
| | ("byk", "büyük"), |
| | ("dr", "doktor"), |
| | |
| | ] |
| | ], |
| | "hu": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("dr", "doktor"), |
| | ("b", "bácsi"), |
| | ("nőv", "nővér"), |
| | |
| | ] |
| | ], |
| | "ko": [ |
| | (re.compile("\\b%s\\." % x[0], re.IGNORECASE), x[1]) |
| | for x in [ |
| | |
| | ] |
| | ], |
| | } |
| |
|
| | def expand_abbreviations_multilingual(text, lang="en"): |
| | if lang in _abbreviations: |
| | for regex, replacement in _abbreviations[lang]: |
| | text = re.sub(regex, replacement, text) |
| | return text |
| |
|
| | _symbols_multilingual = { |
| | "en": [ |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " and "), |
| | ("@", " at "), |
| | ("%", " percent "), |
| | ("#", " hash "), |
| | ("$", " dollar "), |
| | ("£", " pound "), |
| | ("°", " degree "), |
| | ] |
| | ], |
| | "es": [ |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " y "), |
| | ("@", " arroba "), |
| | ("%", " por ciento "), |
| | ("#", " numeral "), |
| | ("$", " dolar "), |
| | ("£", " libra "), |
| | ("°", " grados "), |
| | ] |
| | ], |
| | "fr": [ |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " et "), |
| | ("@", " arobase "), |
| | ("%", " pour cent "), |
| | ("#", " dièse "), |
| | ("$", " dollar "), |
| | ("£", " livre "), |
| | ("°", " degrés "), |
| | ] |
| | ], |
| | "de": [ |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " und "), |
| | ("@", " at "), |
| | ("%", " prozent "), |
| | ("#", " raute "), |
| | ("$", " dollar "), |
| | ("£", " pfund "), |
| | ("°", " grad "), |
| | ] |
| | ], |
| | "pt": [ |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " e "), |
| | ("@", " arroba "), |
| | ("%", " por cento "), |
| | ("#", " cardinal "), |
| | ("$", " dólar "), |
| | ("£", " libra "), |
| | ("°", " graus "), |
| | ] |
| | ], |
| | "it": [ |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " e "), |
| | ("@", " chiocciola "), |
| | ("%", " per cento "), |
| | ("#", " cancelletto "), |
| | ("$", " dollaro "), |
| | ("£", " sterlina "), |
| | ("°", " gradi "), |
| | ] |
| | ], |
| | "pl": [ |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " i "), |
| | ("@", " małpa "), |
| | ("%", " procent "), |
| | ("#", " krzyżyk "), |
| | ("$", " dolar "), |
| | ("£", " funt "), |
| | ("°", " stopnie "), |
| | ] |
| | ], |
| | "ar": [ |
| | |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " و "), |
| | ("@", " على "), |
| | ("%", " في المئة "), |
| | ("#", " رقم "), |
| | ("$", " دولار "), |
| | ("£", " جنيه "), |
| | ("°", " درجة "), |
| | ] |
| | ], |
| | "zh": [ |
| | |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " 和 "), |
| | ("@", " 在 "), |
| | ("%", " 百分之 "), |
| | ("#", " 号 "), |
| | ("$", " 美元 "), |
| | ("£", " 英镑 "), |
| | ("°", " 度 "), |
| | ] |
| | ], |
| | "cs": [ |
| | |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " a "), |
| | ("@", " na "), |
| | ("%", " procento "), |
| | ("#", " křížek "), |
| | ("$", " dolar "), |
| | ("£", " libra "), |
| | ("°", " stupně "), |
| | ] |
| | ], |
| | "ru": [ |
| | |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " и "), |
| | ("@", " собака "), |
| | ("%", " процентов "), |
| | ("#", " номер "), |
| | ("$", " доллар "), |
| | ("£", " фунт "), |
| | ("°", " градус "), |
| | ] |
| | ], |
| | "nl": [ |
| | |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " en "), |
| | ("@", " bij "), |
| | ("%", " procent "), |
| | ("#", " hekje "), |
| | ("$", " dollar "), |
| | ("£", " pond "), |
| | ("°", " graden "), |
| | ] |
| | ], |
| | "tr": [ |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " ve "), |
| | ("@", " at "), |
| | ("%", " yüzde "), |
| | ("#", " diyez "), |
| | ("$", " dolar "), |
| | ("£", " sterlin "), |
| | ("°", " derece "), |
| | ] |
| | ], |
| | "hu": [ |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " és "), |
| | ("@", " kukac "), |
| | ("%", " százalék "), |
| | ("#", " kettőskereszt "), |
| | ("$", " dollár "), |
| | ("£", " font "), |
| | ("°", " fok "), |
| | ] |
| | ], |
| | "ko": [ |
| | |
| | (re.compile(r"%s" % re.escape(x[0]), re.IGNORECASE), x[1]) |
| | for x in [ |
| | ("&", " 그리고 "), |
| | ("@", " 에 "), |
| | ("%", " 퍼센트 "), |
| | ("#", " 번호 "), |
| | ("$", " 달러 "), |
| | ("£", " 파운드 "), |
| | ("°", " 도 "), |
| | ] |
| | ], |
| | } |
| |
|
| | def expand_symbols_multilingual(text, lang="en"): |
| | if lang in _symbols_multilingual: |
| | for regex, replacement in _symbols_multilingual[lang]: |
| | text = re.sub(regex, replacement, text) |
| | text = text.replace(" ", " ") |
| | return text.strip() |
| |
|
| | _ordinal_re = { |
| | "en": re.compile(r"([0-9]+)(st|nd|rd|th)"), |
| | "es": re.compile(r"([0-9]+)(º|ª|er|o|a|os|as)"), |
| | "fr": re.compile(r"([0-9]+)(º|ª|er|re|e|ème)"), |
| | "de": re.compile(r"([0-9]+)(st|nd|rd|th|º|ª|\.(?=\s|$))"), |
| | "pt": re.compile(r"([0-9]+)(º|ª|o|a|os|as)"), |
| | "it": re.compile(r"([0-9]+)(º|°|ª|o|a|i|e)"), |
| | "pl": re.compile(r"([0-9]+)(º|ª|st|nd|rd|th)"), |
| | "ar": re.compile(r"([0-9]+)(ون|ين|ث|ر|ى)"), |
| | "cs": re.compile(r"([0-9]+)\.(?=\s|$)"), |
| | "ru": re.compile(r"([0-9]+)(-й|-я|-е|-ое|-ье|-го)"), |
| | "nl": re.compile(r"([0-9]+)(de|ste|e)"), |
| | "tr": re.compile(r"([0-9]+)(\.|inci|nci|uncu|üncü|\.)"), |
| | "hu": re.compile(r"([0-9]+)(\.|adik|edik|odik|edik|ödik|ödike|ik)"), |
| | "ko": re.compile(r"([0-9]+)(번째|번|차|째)"), |
| | } |
| | _number_re = re.compile(r"[0-9]+") |
| | |
| | _currency_re = { |
| | "USD": re.compile(r"((\$[0-9\.\,]*[0-9]+)|([0-9\.\,]*[0-9]+\$))"), |
| | "GBP": re.compile(r"((£[0-9\.\,]*[0-9]+)|([0-9\.\,]*[0-9]+£))"), |
| | "EUR": re.compile(r"(([0-9\.\,]*[0-9]+€)|((€[0-9\.\,]*[0-9]+)))"), |
| | } |
| |
|
| | _comma_number_re = re.compile(r"\b\d{1,3}(,\d{3})*(\.\d+)?\b") |
| | _dot_number_re = re.compile(r"\b\d{1,3}(\.\d{3})*(\,\d+)?\b") |
| | _decimal_number_re = re.compile(r"([0-9]+[.,][0-9]+)") |
| |
|
| | def _remove_commas(m): |
| | text = m.group(0) |
| | if "," in text: |
| | text = text.replace(",", "") |
| | return text |
| |
|
| | def _remove_dots(m): |
| | text = m.group(0) |
| | if "." in text: |
| | text = text.replace(".", "") |
| | return text |
| |
|
| | def _expand_decimal_point(m, lang="en"): |
| | amount = m.group(1).replace(",", ".") |
| | return num2words(float(amount), lang=lang if lang != "cs" else "cz") |
| |
|
| | def _expand_currency(m, lang="en", currency="USD"): |
| | amount = float((re.sub(r"[^\d.]", "", m.group(0).replace(",", ".")))) |
| | full_amount = num2words(amount, to="currency", currency=currency, lang=lang if lang != "cs" else "cz") |
| |
|
| | and_equivalents = { |
| | "en": ", ", |
| | "es": " con ", |
| | "fr": " et ", |
| | "de": " und ", |
| | "pt": " e ", |
| | "it": " e ", |
| | "pl": ", ", |
| | "cs": ", ", |
| | "ru": ", ", |
| | "nl": ", ", |
| | "ar": ", ", |
| | "tr": ", ", |
| | "hu": ", ", |
| | "ko": ", ", |
| | } |
| |
|
| | if amount.is_integer(): |
| | last_and = full_amount.rfind(and_equivalents.get(lang, ", ")) |
| | if last_and != -1: |
| | full_amount = full_amount[:last_and] |
| |
|
| | return full_amount |
| |
|
| | def _expand_ordinal(m, lang="en"): |
| | return num2words(int(m.group(1)), ordinal=True, lang=lang if lang != "cs" else "cz") |
| |
|
| | def _expand_number(m, lang="en"): |
| | return num2words(int(m.group(0)), lang=lang if lang != "cs" else "cz") |
| |
|
| | def expand_numbers_multilingual(text, lang="en"): |
| | if lang == "zh": |
| | text = zh_num2words()(text) |
| | else: |
| | if lang in ["en", "ru"]: |
| | text = re.sub(_comma_number_re, _remove_commas, text) |
| | else: |
| | text = re.sub(_dot_number_re, _remove_dots, text) |
| | try: |
| | text = re.sub(_currency_re["GBP"], lambda m: _expand_currency(m, lang, "GBP"), text) |
| | text = re.sub(_currency_re["USD"], lambda m: _expand_currency(m, lang, "USD"), text) |
| | text = re.sub(_currency_re["EUR"], lambda m: _expand_currency(m, lang, "EUR"), text) |
| | except Exception as e: |
| | pass |
| | if lang != "tr": |
| | text = re.sub(_decimal_number_re, lambda m: _expand_decimal_point(m, lang), text) |
| | if lang in _ordinal_re: |
| | text = re.sub(_ordinal_re[lang], lambda m: _expand_ordinal(m, lang), text) |
| | text = re.sub(_number_re, lambda m: _expand_number(m, lang), text) |
| | return text |
| |
|
| | def lowercase(text): |
| | return text.lower() |
| |
|
| | def collapse_whitespace(text): |
| | return re.sub(_whitespace_re, " ", text) |
| |
|
| | def multilingual_cleaners(text, lang): |
| | text = text.replace('"', "") |
| | if lang == "tr": |
| | text = text.replace("İ", "i") |
| | text = text.replace("Ö", "ö") |
| | text = text.replace("Ü", "ü") |
| | text = lowercase(text) |
| | text = expand_numbers_multilingual(text, lang) |
| | text = expand_abbreviations_multilingual(text, lang) |
| | text = expand_symbols_multilingual(text, lang=lang) |
| | text = collapse_whitespace(text) |
| | return text |
| |
|
| | def basic_cleaners(text): |
| | """Basic pipeline that lowercases and collapses whitespace without transliteration.""" |
| | text = lowercase(text) |
| | text = collapse_whitespace(text) |
| | return text |
| |
|
| | def chinese_transliterate(text): |
| | return "".join( |
| | [p[0] for p in pypinyin.pinyin(text, style=pypinyin.Style.TONE3, heteronym=False, neutral_tone_with_five=True)] |
| | ) |
| |
|
| | def japanese_cleaners(text, katsu): |
| | text = katsu.romaji(text) |
| | text = lowercase(text) |
| | return text |
| |
|
| | def korean_transliterate(text, transliter): |
| | return transliter.translit(text) |
| |
|
| | |
| |
|
| | class XTTSTokenizerFast(PreTrainedTokenizerFast): |
| | """ |
| | Fast Tokenizer implementation for XTTS model using HuggingFace's PreTrainedTokenizerFast |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | vocab_file: str = None, |
| | tokenizer_object: Optional[Tokenizer] = None, |
| | unk_token: str = "[UNK]", |
| | pad_token: str = "[PAD]", |
| | bos_token: str = "[START]", |
| | eos_token: str = "[STOP]", |
| | auto_map: dict = {"AutoTokenizer": ["AstraMindAI/xtts2-gpt--tokenizer.XTTSTokenizerFast", None]}, |
| | clean_up_tokenization_spaces: bool = True, |
| | **kwargs |
| | ): |
| | if tokenizer_object is None and vocab_file is not None: |
| | tokenizer_object = Tokenizer.from_file(vocab_file) |
| |
|
| | if tokenizer_object is not None: |
| | |
| | tokenizer_object.pre_tokenizer = WhitespaceSplit() |
| | tokenizer_object.post_processor = TemplateProcessing( |
| | single=f"{bos_token} $A {eos_token}", |
| | special_tokens=[ |
| | (bos_token, tokenizer_object.token_to_id(bos_token)), |
| | (eos_token, tokenizer_object.token_to_id(eos_token)), |
| | ], |
| | ) |
| |
|
| | super().__init__( |
| | tokenizer_object=tokenizer_object, |
| | unk_token=unk_token, |
| | pad_token=pad_token, |
| | bos_token=bos_token, |
| | eos_token=eos_token, |
| | clean_up_tokenization_spaces=clean_up_tokenization_spaces, |
| | **kwargs |
| | ) |
| |
|
| | |
| | self.char_limits = { |
| | "en": 250, "de": 253, "fr": 273, "es": 239, |
| | "it": 213, "pt": 203, "pl": 224, "zh": 82, |
| | "ar": 166, "cs": 186, "ru": 182, "nl": 251, |
| | "tr": 226, "ja": 71, "hu": 224, "ko": 95, |
| | } |
| |
|
| | |
| | self._katsu = None |
| | self._korean_transliter = Transliter(academic) |
| |
|
| | |
| | if self.pad_token_id is None: |
| | self.pad_token_id = self.tokenizer.token_to_id(self.pad_token) |
| |
|
| | @cached_property |
| | def katsu(self): |
| | if self._katsu is None: |
| | self._katsu = cutlet.Cutlet() |
| | return self._katsu |
| |
|
| | def preprocess_text(self, text: str, lang: str) -> str: |
| | """Apply text preprocessing for language""" |
| | base_lang = lang.split("-")[0] |
| | if base_lang in {"ar", "cs", "de", "en", "es", "fr", "hu", "it", |
| | "nl", "pl", "pt", "ru", "tr", "zh", "ko"}: |
| | text = multilingual_cleaners(text, base_lang) |
| | if base_lang == "zh": |
| | text = chinese_transliterate(text) |
| | if base_lang == "ko": |
| | text = korean_transliterate(text, self._korean_transliter) |
| | elif base_lang == "ja": |
| | text = japanese_cleaners(text, self.katsu) |
| | else: |
| | text = basic_cleaners(text) |
| | return text |
| |
|
| | def batch_encode_with_split(self, texts: Union[str, List[str]], lang: Union[str, List[str]], |
| | **kwargs) -> torch.Tensor: |
| | """ |
| | Split texts into smaller chunks based on language character limits and encode them using HuggingFace fast tokenizer. |
| | strictly mimic the xttsv2 tokenizer |
| | """ |
| | |
| | if isinstance(texts, str): |
| | texts = [texts] |
| | if isinstance(lang, str): |
| | lang = [lang] |
| | |
| | if len(lang) == 1 and len(texts) > 1: |
| | lang = lang * len(texts) |
| |
|
| | |
| | if len(texts) != len(lang): |
| | raise ValueError(f"Number of texts ({len(texts)}) does not match number of languages ({len(lang)}).") |
| |
|
| | chunk_list = [] |
| | max_splits = 0 |
| |
|
| | |
| | for text, text_lang in zip(texts, lang): |
| | |
| | base_lang = text_lang.split("-")[0] |
| | char_limit = self.char_limits.get(base_lang, 250) |
| |
|
| | |
| | |
| |
|
| | |
| | chunk_list = split_sentence(text, base_lang, text_split_length=char_limit) |
| |
|
| | |
| | if not self.is_fast: |
| | raise ValueError("The tokenizer must be a fast tokenizer.") |
| |
|
| | |
| | encoding: BatchEncoding = self( |
| | chunk_list, |
| | lang = lang, |
| | add_special_tokens=False, |
| | padding=False, |
| | **kwargs |
| | ) |
| |
|
| | |
| | return encoding['input_ids'] |
| |
|
| | def _batch_encode_plus( |
| | self, |
| | batch_text_or_text_pairs, |
| | add_special_tokens: bool = True, |
| | padding_strategy=PaddingStrategy.DO_NOT_PAD, |
| | truncation_strategy=TruncationStrategy.DO_NOT_TRUNCATE, |
| | max_length: Optional[int] = None, |
| | stride: int = 0, |
| | is_split_into_words: bool = False, |
| | pad_to_multiple_of: Optional[int] = None, |
| | return_tensors: Optional[str] = None, |
| | return_token_type_ids: Optional[bool] = None, |
| | return_attention_mask: Optional[bool] = None, |
| | return_overflowing_tokens: bool = False, |
| | return_special_tokens_mask: bool = False, |
| | return_offsets_mapping: bool = False, |
| | return_length: bool = False, |
| | verbose: bool = True, |
| | **kwargs |
| | ) -> Dict[str, Any]: |
| | """ |
| | Override batch encoding to handle language-specific preprocessing |
| | """ |
| | lang = kwargs.pop("lang", ["en"] * len(batch_text_or_text_pairs)) |
| | if isinstance(lang, str): |
| | lang = [lang] |
| | |
| | if len(lang) == 1 and len(batch_text_or_text_pairs) > 1: |
| | lang = lang * len(batch_text_or_text_pairs) |
| |
|
| | |
| | if len(batch_text_or_text_pairs) != len(lang): |
| | raise ValueError(f"Number of texts ({len(batch_text_or_text_pairs)}) does not match number of languages ({len(lang)}).") |
| |
|
| | |
| | processed_texts = [] |
| | for text, text_lang in zip(batch_text_or_text_pairs, lang): |
| | if isinstance(text, str): |
| | |
| | |
| | processed_text = self.preprocess_text(text, text_lang) |
| |
|
| | |
| | base_lang = text_lang.split("-")[0] |
| | lang_code = "zh-cn" if base_lang == "zh" else base_lang |
| | processed_text = f"[{lang_code}]{processed_text}" |
| | processed_text = processed_text.replace(" ", "[SPACE]") |
| |
|
| | processed_texts.append(processed_text) |
| | else: |
| | processed_texts.append(text) |
| |
|
| | |
| | return super()._batch_encode_plus( |
| | processed_texts, |
| | add_special_tokens=add_special_tokens, |
| | padding_strategy=padding_strategy, |
| | truncation_strategy=truncation_strategy, |
| | max_length=max_length, |
| | stride=stride, |
| | is_split_into_words=is_split_into_words, |
| | pad_to_multiple_of=pad_to_multiple_of, |
| | return_tensors=return_tensors, |
| | return_token_type_ids=return_token_type_ids, |
| | return_attention_mask=return_attention_mask, |
| | return_overflowing_tokens=return_overflowing_tokens, |
| | return_special_tokens_mask=return_special_tokens_mask, |
| | return_offsets_mapping=return_offsets_mapping, |
| | return_length=return_length, |
| | verbose=verbose, |
| | **kwargs |
| | ) |
| |
|
| |
|
| | def __call__( |
| | self, |
| | text: Union[str, List[str]], |
| | lang: Union[str, List[str]] = "en", |
| | add_special_tokens: bool = True, |
| | padding: Union[bool, str, PaddingStrategy] = False, |
| | truncation: Union[bool, str, TruncationStrategy] = False, |
| | max_length: Optional[int] = None, |
| | stride: int = 0, |
| | return_tensors: Optional[str] = None, |
| | return_token_type_ids: Optional[bool] = None, |
| | return_attention_mask: Optional[bool] = True, |
| | **kwargs |
| | ): |
| | """ |
| | Main tokenization method |
| | """ |
| | |
| | if isinstance(text, str): |
| | text = [text] |
| | if isinstance(lang, str): |
| | lang = [lang] |
| | |
| | if len(lang) == 1 and len(text) > 1: |
| | lang = lang * len(text) |
| |
|
| | |
| | if len(text) != len(lang): |
| | raise ValueError(f"Number of texts ({len(text)}) does not match number of languages ({len(lang)}).") |
| |
|
| | |
| | if isinstance(padding, bool): |
| | padding_strategy = PaddingStrategy.LONGEST if padding else PaddingStrategy.DO_NOT_PAD |
| | else: |
| | padding_strategy = PaddingStrategy(padding) |
| |
|
| | |
| | if isinstance(truncation, bool): |
| | truncation_strategy = TruncationStrategy.LONGEST_FIRST if truncation else TruncationStrategy.DO_NOT_TRUNCATE |
| | else: |
| | truncation_strategy = TruncationStrategy(truncation) |
| |
|
| | |
| | encoded = self._batch_encode_plus( |
| | text, |
| | add_special_tokens=add_special_tokens, |
| | padding_strategy=padding_strategy, |
| | truncation_strategy=truncation_strategy, |
| | max_length=max_length, |
| | stride=stride, |
| | return_tensors=return_tensors, |
| | return_token_type_ids=return_token_type_ids, |
| | return_attention_mask=return_attention_mask, |
| | lang=lang, |
| | **kwargs |
| | ) |
| |
|
| | return encoded |
| |
|