import shutil
import string
import sys
import time
import os
from itertools import groupby
from subprocess import Popen, PIPE
import re
from src.aligner import Aligner
import glob
import spacy
from spacy.tokens import Doc
from gradio_client.exceptions import AppError
import tqdm
# Load multilingual model to use as sentence tokenizer
spacy_nlp = spacy.load("xx_ent_wiki_sm")
# Add the rule-based sentencizer
if "sentencizer" not in spacy_nlp.pipe_names:
spacy_nlp.add_pipe("sentencizer")
def doc_to_plain_text(input_file: str, source_lang: str, target_lang: str, tikal_folder: str,
original_xliff_file_path: str) -> str:
"""
Given a document, this function generates an xliff file and then a plain text file with the text contents
while keeping style and formatting using tags like
Parameters:
input_file: Path to document to process
source_lang: Source language of the document
target_lang: Target language of the document
tikal_folder: Folder where tikal.sh is located
original_xliff_file_path: Path to xliff file to generate, which will be use later
Returns:
string: Path to plain text file
"""
tikal_xliff_command = [os.path.join(tikal_folder, "tikal.sh"), "-x", input_file, "-nocopy", "-sl", source_lang,
"-tl", target_lang]
Popen(tikal_xliff_command).wait()
tikal_moses_command = [os.path.join(tikal_folder, "tikal.sh"), "-xm", original_xliff_file_path, "-sl", source_lang,
"-tl", target_lang]
Popen(tikal_moses_command).wait()
return os.path.join(original_xliff_file_path + f".{source_lang}")
def get_runs_from_paragraph(paragraph: str, paragraph_index: int) -> list[dict[str, str | tuple[str, ...]]]:
"""
Given some text that may or may not contain some chunks tagged with something like , extract each
of the runs of text and convert them into dictionaries to keep this information
Parameters:
text: Text to process
paragraph_index: Index of the paragraph in the file
Returns:
list[dict]: Where each element is a run with text, tag id (if any, if not None) and paragraph_index
"""
tag_stack = []
runs = []
pos = 0
# Match any tag: , , or
tag_pattern = re.compile(r'<(/?)(\w+)(?:\s+id="(\d+)")?\s*(/?)>')
for match in tag_pattern.finditer(paragraph):
start, end = match.span()
is_closing = match.group(1) == "/"
tag_name = match.group(2)
tag_id = match.group(3)
is_self_closing = match.group(4) == "/"
# Text before this tag
if start > pos:
text = paragraph[pos:start]
if text:
runs.append({
"text": text,
"id": tag_stack.copy(),
"paragraph_index": paragraph_index
})
if is_closing:
# Closing tag
expected_prefix = f"{tag_name}_"
if tag_stack and tag_stack[-1].startswith(expected_prefix):
tag_stack.pop()
else:
raise ValueError(f"Mismatched closing tag {tag_name}>")
elif is_self_closing:
# Self-closing tag like
if tag_id is None:
raise ValueError(f"Self-closing tag <{tag_name}/> missing id")
runs.append({
"text": "",
"id": [f"{tag_name}_{tag_id}"],
"paragraph_index": paragraph_index
})
else:
# Opening tag
if tag_id is None:
raise ValueError(f"Opening tag <{tag_name}> missing id")
tag_stack.append(f"{tag_name}_{tag_id}")
pos = end
# Final trailing text
if pos < len(paragraph):
text = paragraph[pos:]
if text:
runs.append({
"text": text,
"id": tag_stack.copy(),
"paragraph_index": paragraph_index
})
return runs
def tokenize_text(text, tokenizer):
# To avoid the tokenizer destroying the url
def preserve_urls(text):
url_pattern = r'https?://[^\s\)\]\}\>]+|www\.[^\s\)\]\}\>]+'
# Find URLs using regex and replace them with a placeholder
urls = re.findall(url_pattern, text)
for idx, url in enumerate(urls):
placeholder = f"URL{idx}"
text = text.replace(url, placeholder)
return text, urls
# Replace URLs with placeholders
text, urls = preserve_urls(text)
# Tokenize using Sacremoses
tokens = tokenizer.tokenize(text)
# Revert placeholders back to original URLs
for idx, url in enumerate(urls):
placeholder = f"URL{idx}"
tokens = [token.replace(placeholder, url) for token in tokens]
return tokens
def tokenize_with_runs(runs: list[dict[str, str]]) -> tuple[list[list[dict[str, str]]], list[list[bool]]]:
"""
Given a list of runs, we need to tokenize them by sentence and token while keeping the style of each token according
to its original run
Parameters:
runs: List of runs, where each item is a chunk of text (possibly various tokens) and some style/formatting information
source_lang: Language of the document
Returns:
list[list[dict]]: A list of tokenized sentences where each token contains the style of its original run
"""
# it's a bit of a mess but first we get the tokenized sentences
# join runs and send through spacy to split into clean tokens
doc_from_runs = spacy_nlp("".join([run["text"] for run in runs]).strip())
# extract sentences and tokenize each into words
tokenized_sentences = [[token.text.strip() for token in sent if token.text.strip()] for sent in doc_from_runs.sents]
tokenized_sentences_spaces = [[token.whitespace_ != '' for token in sent if token.text.strip()] for sent in
doc_from_runs.sents]
flat_tokens = [token for sentence in tokenized_sentences for token in sentence]
flat_spaces = [token for sentence in tokenized_sentences_spaces for token in sentence]
flat_tokens_with_style = []
flat_spaces_with_style = []
token_idx = 0
for run in runs:
run["text"] = run["text"].strip()
while run["text"]:
if run["text"].startswith(flat_tokens[token_idx]):
run["text"] = run["text"][len(flat_tokens[token_idx]):]
if flat_spaces[token_idx]:
run["text"] = run["text"].lstrip()
item = run.copy()
item["text"] = flat_tokens[token_idx]
flat_tokens_with_style.append(item)
flat_spaces_with_style.append(flat_spaces[token_idx])
token_idx += 1
elif flat_tokens[token_idx].startswith(run["text"]):
subtoken = flat_tokens[token_idx][:len(run["text"])]
item = run.copy()
item["text"] = subtoken
flat_tokens_with_style.append(item)
flat_spaces_with_style.append(False)
flat_tokens[token_idx] = flat_tokens[token_idx][len(run["text"]):]
run["text"] = run["text"][len(subtoken):]
# reconstruct the sentences
token_idx = 0
tokenized_sentences_with_style, tokenized_sentences_spaces_with_style = [], []
for sentence, sentence_spaces in zip(tokenized_sentences, tokenized_sentences_spaces):
sentence_with_style, sentence_spaces_with_style = [], []
for token in sentence:
if token == flat_tokens_with_style[token_idx]["text"]:
sentence_with_style.append(flat_tokens_with_style[token_idx])
sentence_spaces_with_style.append(flat_spaces_with_style[token_idx])
token_idx += 1
elif token.startswith(flat_tokens_with_style[token_idx]["text"]):
while token:
token = token[len(flat_tokens_with_style[token_idx]["text"]):]
sentence_with_style.append(flat_tokens_with_style[token_idx])
sentence_spaces_with_style.append(flat_spaces_with_style[token_idx])
token_idx += 1
else:
print(token)
print(sentence)
print(token_idx)
print(flat_tokens_with_style)
raise Exception(f"Something unexpected happened")
tokenized_sentences_with_style.append(sentence_with_style)
tokenized_sentences_spaces_with_style.append(sentence_spaces_with_style)
return tokenized_sentences_with_style, tokenized_sentences_spaces_with_style
def generate_alignments(original_tokenized_sentences_with_style: list[list[dict[str, str]]],
translated_sentences: list[str], aligner, temp_folder: str):
"""
Given some original sentences with style and formatting and its translation without formatting, try to match
the translated text formatting with the original. Since we only want to run fastalign once we have to temporarily
forget about paragraphs and work only in sentences, so the output is a list of sentences but with information about
from which paragraph that sentence came from
Parameters:
original_tokenized_sentences_with_style: Original text split into sentences with style information
translated_sentences: Translated text, split into sentences
aligner: Object of the aligner class, uses fastalign
temp_folder: Path to folder where to put all the intermediate files
source_lang: original language of the document
target_lang: target language of the translation
Returns:
list[list[dict]]: A list of tokenized sentences where each translated token contains the style of the associated
original token
"""
# clean temp folder
for f in glob.glob(os.path.join(temp_folder, "*align*")):
os.remove(f)
# tokenize the translated text by sentence and word
translated_tokenized_sentences = []
# keep spacing information to detokenize properly later
translated_tokenized_sentences_spaces = []
for sentence in translated_sentences:
tokens = spacy_nlp(sentence)
translated_tokenized_sentences_spaces.append([token.whitespace_ != '' for token in tokens])
translated_tokenized_sentences.append([token.text for token in tokens])
assert len(translated_tokenized_sentences) == len(
original_tokenized_sentences_with_style), "The original and translated texts contain a different number of sentences, likely due to a translation error"
original_sentences = []
translated_sentences = []
for original, translated in zip(original_tokenized_sentences_with_style, translated_tokenized_sentences):
original_sentences.append(' '.join(item['text'] for item in original))
translated_sentences.append(' '.join(translated))
alignments = aligner.align(original_sentences, translated_sentences)
# using the alignments generated by fastalign, we need to copy the style of the original token to the translated one
translated_sentences_with_style = []
for sentence_idx, sentence_alignments in enumerate(alignments):
# reverse the order of the alignments and build a dict with it
sentence_alignments = {target: source for source, target in sentence_alignments}
translated_sentence_with_style: list[dict[str, str]] = []
for token_idx, translated_token in enumerate(translated_tokenized_sentences[sentence_idx]):
# fastalign has found a token aligned with the translated one
if token_idx in sentence_alignments.keys():
# get the aligned token
original_idx = sentence_alignments[token_idx]
new_entry = original_tokenized_sentences_with_style[sentence_idx][original_idx].copy()
new_entry["text"] = translated_token
translated_sentence_with_style.append(new_entry)
else:
# WARNING this is a test
# since fastalign doesn't know from which word to reference this token, copy the style of the previous word
new_entry = translated_sentence_with_style[-1].copy()
new_entry["text"] = translated_token
translated_sentence_with_style.append(new_entry)
translated_sentences_with_style.append(translated_sentence_with_style)
return translated_sentences_with_style, translated_tokenized_sentences_spaces
def group_by_style(tokens: list[dict[str, str]], spaces: list[bool]) -> list[dict[str, str]]:
"""
To avoid having issues in the future, we group the contiguous tokens that have the same style. Basically, we
reconstruct the runs.
Parameters:
tokens: Tokens with style information
Returns:
list[dict]: A list of translated runs with format and style
"""
groups = []
zipped = zip(tokens, spaces)
for key, group in groupby(zipped, key=lambda x: (x[0]["id"], x[0]["paragraph_index"])):
group = list(group)
tokens = [item[0]['text'] for item in group]
spaces = [item[1] for item in group]
text = Doc(spacy_nlp.vocab, words=tokens, spaces=spaces).text
groups.append({"text": text,
"id": key[0],
"paragraph_index": key[1]})
return groups
def runs_to_plain_text(paragraphs_with_style: dict[int, list[dict[str, str, str]]], out_file_path: str):
"""
Generate a plain text file restoring the original tag structure like
Parameters:
paragraphs_with_style: Dictionary where each key is the paragraph_index and its contents are a list of runs
out_file_path: Path to the file where the plain text will be saved
"""
with open(out_file_path, "w") as out_file:
def close_tags(ids):
tag = ""
for gid in ids:
tag_type, tag_id = gid.split("_")
tag += f'{tag_type}>'
return tag
def open_tags(ids):
tag = ""
for gid in ids:
tag_type, tag_id = gid.split("_")
tag += f'<{tag_type} id="{tag_id}">'
return tag
for key, paragraph in paragraphs_with_style.items():
for run in paragraph:
ids = list(run["id"]) if run["id"] else []
if ids:
output = open_tags(ids) + run["text"] + close_tags(ids)
out_file.write(output)
else:
out_file.write("".join(run["text"]))
out_file.write("\n")
def translate_document(input_file: str, source_lang: str, target_lang: str,
translator,
aligner: Aligner,
temp_folder: str = "tmp",
tikal_folder: str = "okapi-apps_gtk2-linux-x86_64_1.47.0", with_format: bool = True) -> (str,
str):
input_filename = input_file.split("/")[-1]
os.makedirs(temp_folder, exist_ok=True)
# copy the original file to the temporal folder to avoid common issues with tikal
temp_input_file = os.path.join(temp_folder, input_filename)
shutil.copy(input_file, temp_input_file)
original_xliff_file = os.path.join(temp_folder, input_filename + ".xlf")
plain_text_file = doc_to_plain_text(temp_input_file, source_lang, target_lang, tikal_folder, original_xliff_file)
# get paragraphs with runs
paragraphs_with_runs = [get_runs_from_paragraph(line.strip(), idx) for idx, line in
enumerate(open(plain_text_file).readlines())]
# translate using plaintext file
original_tokenized_sentences_with_style = []
original_spacing = []
for run in paragraphs_with_runs:
tokens, spaces = tokenize_with_runs(run)
original_tokenized_sentences_with_style += tokens
original_spacing += spaces
translated_sentences = []
yield "Translating 0%...", None
total = len(original_tokenized_sentences_with_style)
pbar = tqdm.tqdm(desc="Translating paragraphs...", total=total)
for i, (sentence, spacing) in enumerate(zip(original_tokenized_sentences_with_style, original_spacing)):
text = Doc(spacy_nlp.vocab, words=[token["text"] for token in sentence], spaces=spacing).text
while True:
try:
translated_sentences.append(translator.translate(text, source_lang, target_lang))
break
except AppError as e:
print(e)
sys.exit()
pbar.update(1)
percent_complete = int(((i + 1) / total) * 100)
yield f"Translating {percent_complete}%...", None
# time to align the translation with the original
print("Generating alignments...")
yield "Aligning...", None
start_time = time.time()
translated_sentences_with_style, translated_sentences_spacing = generate_alignments(
original_tokenized_sentences_with_style,
translated_sentences, aligner,
temp_folder)
print(f"Finished alignments in {time.time() - start_time} seconds")
# since we tokenized these sentences independently, the spacing information does not contain spaces after punctuation
# at the end of the sentence (there's no space at the end of a sentence that ends with ".", unless there's a sentence
# right after
for sentence, sentence_spaces in zip(translated_sentences_with_style, translated_sentences_spacing):
if sentence[-1]["text"] in string.punctuation:
sentence_spaces[-1] = True
# flatten the sentences into a list of tokens
translated_tokens_with_style = [item for sublist in translated_sentences_with_style for item in sublist]
tokens_spaces = [item for sublist in translated_sentences_spacing for item in sublist]
# group the tokens by style/run
translated_runs_with_style = group_by_style(translated_tokens_with_style, tokens_spaces)
# group the runs by original paragraph
translated_paragraphs_with_style = {key: [{'id': None, 'paragraph_index': key, 'text': ""}] for key in
range(len(paragraphs_with_runs))}
for item in translated_runs_with_style:
# first item in the paragraph, remove starting blank space we introduced in group_by_style(), where we
# didn't know where paragraphs started and ended
if not translated_paragraphs_with_style[item['paragraph_index']][0]["text"]:
first_item_in_paragraph = item.copy()
first_item_in_paragraph["text"] = first_item_in_paragraph["text"].lstrip(" ")
translated_paragraphs_with_style[item['paragraph_index']] = []
translated_paragraphs_with_style[item['paragraph_index']].append(first_item_in_paragraph)
else:
translated_paragraphs_with_style[item['paragraph_index']].append(item)
# save to new plain text file
translated_moses_file = os.path.join(original_xliff_file + f".{target_lang}")
runs_to_plain_text(translated_paragraphs_with_style, translated_moses_file)
# put the translations into the xlf
tikal_moses_to_xliff_command = [os.path.join(tikal_folder, "tikal.sh"), "-lm", original_xliff_file, "-sl",
source_lang, "-tl", target_lang, "-from", translated_moses_file, "-totrg",
"-noalttrans", "-to", original_xliff_file]
Popen(tikal_moses_to_xliff_command).wait()
# any tags that are still have not been paired between original and translated texts by tikal so we remove
# them. This may happen if a word in the original language has been split in more that one words that have other
# words in between, or an error in fastalign
text = open(original_xliff_file).read()
result = re.sub(r'(.*?)', r'\1', text)
open(original_xliff_file, "w").write(result)
# merge into a docx again
tikal_merge_doc_command = [os.path.join(tikal_folder, "tikal.sh"), "-m", original_xliff_file]
final_process = Popen(tikal_merge_doc_command, stdout=PIPE, stderr=PIPE)
stdout, stderr = final_process.communicate()
final_process.wait()
# get the path to the output file
output = stdout.decode('utf-8')
translated_file_path = re.search(r'(?<=Output:\s)(.*)', output)[0]
print(f"Saved file in {translated_file_path}")
yield "", translated_file_path