Spaces:
Sleeping
Sleeping
import shutil | |
import string | |
import sys | |
import time | |
import os | |
from itertools import groupby | |
from subprocess import Popen, PIPE | |
import re | |
from src.aligner import Aligner | |
import glob | |
import spacy | |
from spacy.tokens import Doc | |
from gradio_client.exceptions import AppError | |
import tqdm | |
# Load multilingual model to use as sentence tokenizer | |
spacy_nlp = spacy.load("xx_ent_wiki_sm") | |
# Add the rule-based sentencizer | |
if "sentencizer" not in spacy_nlp.pipe_names: | |
spacy_nlp.add_pipe("sentencizer") | |
def doc_to_plain_text(input_file: str, source_lang: str, target_lang: str, tikal_folder: str, | |
original_xliff_file_path: str) -> str: | |
""" | |
Given a document, this function generates an xliff file and then a plain text file with the text contents | |
while keeping style and formatting using tags like <g id=1> </g> | |
Parameters: | |
input_file: Path to document to process | |
source_lang: Source language of the document | |
target_lang: Target language of the document | |
tikal_folder: Folder where tikal.sh is located | |
original_xliff_file_path: Path to xliff file to generate, which will be use later | |
Returns: | |
string: Path to plain text file | |
""" | |
tikal_xliff_command = [os.path.join(tikal_folder, "tikal.sh"), "-x", input_file, "-nocopy", "-sl", source_lang, | |
"-tl", target_lang] | |
Popen(tikal_xliff_command).wait() | |
tikal_moses_command = [os.path.join(tikal_folder, "tikal.sh"), "-xm", original_xliff_file_path, "-sl", source_lang, | |
"-tl", target_lang] | |
Popen(tikal_moses_command).wait() | |
return os.path.join(original_xliff_file_path + f".{source_lang}") | |
def get_runs_from_paragraph(paragraph: str, paragraph_index: int) -> list[dict[str, str | tuple[str, ...]]]: | |
""" | |
Given some text that may or may not contain some chunks tagged with something like <g id=1> </g>, extract each | |
of the runs of text and convert them into dictionaries to keep this information | |
Parameters: | |
text: Text to process | |
paragraph_index: Index of the paragraph in the file | |
Returns: | |
list[dict]: Where each element is a run with text, tag id (if any, if not None) and paragraph_index | |
""" | |
tag_stack = [] | |
runs = [] | |
pos = 0 | |
# Match any tag: <tag id="123"/>, </tag>, or <tag id="123"> | |
tag_pattern = re.compile(r'<(/?)(\w+)(?:\s+id="(\d+)")?\s*(/?)>') | |
for match in tag_pattern.finditer(paragraph): | |
start, end = match.span() | |
is_closing = match.group(1) == "/" | |
tag_name = match.group(2) | |
tag_id = match.group(3) | |
is_self_closing = match.group(4) == "/" | |
# Text before this tag | |
if start > pos: | |
text = paragraph[pos:start] | |
if text: | |
runs.append({ | |
"text": text, | |
"id": tag_stack.copy(), | |
"paragraph_index": paragraph_index | |
}) | |
if is_closing: | |
# Closing tag </tag> | |
expected_prefix = f"{tag_name}_" | |
if tag_stack and tag_stack[-1].startswith(expected_prefix): | |
tag_stack.pop() | |
else: | |
raise ValueError(f"Mismatched closing tag </{tag_name}>") | |
elif is_self_closing: | |
# Self-closing tag like <x id="1"/> | |
if tag_id is None: | |
raise ValueError(f"Self-closing tag <{tag_name}/> missing id") | |
runs.append({ | |
"text": "", | |
"id": [f"{tag_name}_{tag_id}"], | |
"paragraph_index": paragraph_index | |
}) | |
else: | |
# Opening tag <tag id="..."> | |
if tag_id is None: | |
raise ValueError(f"Opening tag <{tag_name}> missing id") | |
tag_stack.append(f"{tag_name}_{tag_id}") | |
pos = end | |
# Final trailing text | |
if pos < len(paragraph): | |
text = paragraph[pos:] | |
if text: | |
runs.append({ | |
"text": text, | |
"id": tag_stack.copy(), | |
"paragraph_index": paragraph_index | |
}) | |
return runs | |
def tokenize_text(text, tokenizer): | |
# To avoid the tokenizer destroying the url | |
def preserve_urls(text): | |
url_pattern = r'https?://[^\s\)\]\}\>]+|www\.[^\s\)\]\}\>]+' | |
# Find URLs using regex and replace them with a placeholder | |
urls = re.findall(url_pattern, text) | |
for idx, url in enumerate(urls): | |
placeholder = f"URL{idx}" | |
text = text.replace(url, placeholder) | |
return text, urls | |
# Replace URLs with placeholders | |
text, urls = preserve_urls(text) | |
# Tokenize using Sacremoses | |
tokens = tokenizer.tokenize(text) | |
# Revert placeholders back to original URLs | |
for idx, url in enumerate(urls): | |
placeholder = f"URL{idx}" | |
tokens = [token.replace(placeholder, url) for token in tokens] | |
return tokens | |
def tokenize_with_runs(runs: list[dict[str, str]]) -> tuple[list[list[dict[str, str]]], list[list[bool]]]: | |
""" | |
Given a list of runs, we need to tokenize them by sentence and token while keeping the style of each token according | |
to its original run | |
Parameters: | |
runs: List of runs, where each item is a chunk of text (possibly various tokens) and some style/formatting information | |
source_lang: Language of the document | |
Returns: | |
list[list[dict]]: A list of tokenized sentences where each token contains the style of its original run | |
""" | |
# it's a bit of a mess but first we get the tokenized sentences | |
# join runs and send through spacy to split into clean tokens | |
doc_from_runs = spacy_nlp("".join([run["text"] for run in runs]).strip()) | |
# extract sentences and tokenize each into words | |
tokenized_sentences = [[token.text.strip() for token in sent if token.text.strip()] for sent in doc_from_runs.sents] | |
tokenized_sentences_spaces = [[token.whitespace_ != '' for token in sent if token.text.strip()] for sent in | |
doc_from_runs.sents] | |
flat_tokens = [token for sentence in tokenized_sentences for token in sentence] | |
flat_spaces = [token for sentence in tokenized_sentences_spaces for token in sentence] | |
flat_tokens_with_style = [] | |
flat_spaces_with_style = [] | |
token_idx = 0 | |
for run in runs: | |
run["text"] = run["text"].strip() | |
while run["text"]: | |
if run["text"].startswith(flat_tokens[token_idx]): | |
run["text"] = run["text"][len(flat_tokens[token_idx]):] | |
if flat_spaces[token_idx]: | |
run["text"] = run["text"].lstrip() | |
item = run.copy() | |
item["text"] = flat_tokens[token_idx] | |
flat_tokens_with_style.append(item) | |
flat_spaces_with_style.append(flat_spaces[token_idx]) | |
token_idx += 1 | |
elif flat_tokens[token_idx].startswith(run["text"]): | |
subtoken = flat_tokens[token_idx][:len(run["text"])] | |
item = run.copy() | |
item["text"] = subtoken | |
flat_tokens_with_style.append(item) | |
flat_spaces_with_style.append(False) | |
flat_tokens[token_idx] = flat_tokens[token_idx][len(run["text"]):] | |
run["text"] = run["text"][len(subtoken):] | |
# reconstruct the sentences | |
token_idx = 0 | |
tokenized_sentences_with_style, tokenized_sentences_spaces_with_style = [], [] | |
for sentence, sentence_spaces in zip(tokenized_sentences, tokenized_sentences_spaces): | |
sentence_with_style, sentence_spaces_with_style = [], [] | |
for token in sentence: | |
if token == flat_tokens_with_style[token_idx]["text"]: | |
sentence_with_style.append(flat_tokens_with_style[token_idx]) | |
sentence_spaces_with_style.append(flat_spaces_with_style[token_idx]) | |
token_idx += 1 | |
elif token.startswith(flat_tokens_with_style[token_idx]["text"]): | |
while token: | |
token = token[len(flat_tokens_with_style[token_idx]["text"]):] | |
sentence_with_style.append(flat_tokens_with_style[token_idx]) | |
sentence_spaces_with_style.append(flat_spaces_with_style[token_idx]) | |
token_idx += 1 | |
else: | |
print(token) | |
print(sentence) | |
print(token_idx) | |
print(flat_tokens_with_style) | |
raise Exception(f"Something unexpected happened") | |
tokenized_sentences_with_style.append(sentence_with_style) | |
tokenized_sentences_spaces_with_style.append(sentence_spaces_with_style) | |
return tokenized_sentences_with_style, tokenized_sentences_spaces_with_style | |
def generate_alignments(original_tokenized_sentences_with_style: list[list[dict[str, str]]], | |
translated_sentences: list[str], aligner, temp_folder: str): | |
""" | |
Given some original sentences with style and formatting and its translation without formatting, try to match | |
the translated text formatting with the original. Since we only want to run fastalign once we have to temporarily | |
forget about paragraphs and work only in sentences, so the output is a list of sentences but with information about | |
from which paragraph that sentence came from | |
Parameters: | |
original_tokenized_sentences_with_style: Original text split into sentences with style information | |
translated_sentences: Translated text, split into sentences | |
aligner: Object of the aligner class, uses fastalign | |
temp_folder: Path to folder where to put all the intermediate files | |
source_lang: original language of the document | |
target_lang: target language of the translation | |
Returns: | |
list[list[dict]]: A list of tokenized sentences where each translated token contains the style of the associated | |
original token | |
""" | |
# clean temp folder | |
for f in glob.glob(os.path.join(temp_folder, "*align*")): | |
os.remove(f) | |
# tokenize the translated text by sentence and word | |
translated_tokenized_sentences = [] | |
# keep spacing information to detokenize properly later | |
translated_tokenized_sentences_spaces = [] | |
for sentence in translated_sentences: | |
tokens = spacy_nlp(sentence) | |
translated_tokenized_sentences_spaces.append([token.whitespace_ != '' for token in tokens]) | |
translated_tokenized_sentences.append([token.text for token in tokens]) | |
assert len(translated_tokenized_sentences) == len( | |
original_tokenized_sentences_with_style), "The original and translated texts contain a different number of sentences, likely due to a translation error" | |
original_sentences = [] | |
translated_sentences = [] | |
for original, translated in zip(original_tokenized_sentences_with_style, translated_tokenized_sentences): | |
original_sentences.append(' '.join(item['text'] for item in original)) | |
translated_sentences.append(' '.join(translated)) | |
alignments = aligner.align(original_sentences, translated_sentences) | |
# using the alignments generated by fastalign, we need to copy the style of the original token to the translated one | |
translated_sentences_with_style = [] | |
for sentence_idx, sentence_alignments in enumerate(alignments): | |
# reverse the order of the alignments and build a dict with it | |
sentence_alignments = {target: source for source, target in sentence_alignments} | |
translated_sentence_with_style: list[dict[str, str]] = [] | |
for token_idx, translated_token in enumerate(translated_tokenized_sentences[sentence_idx]): | |
# fastalign has found a token aligned with the translated one | |
if token_idx in sentence_alignments.keys(): | |
# get the aligned token | |
original_idx = sentence_alignments[token_idx] | |
new_entry = original_tokenized_sentences_with_style[sentence_idx][original_idx].copy() | |
new_entry["text"] = translated_token | |
translated_sentence_with_style.append(new_entry) | |
else: | |
# WARNING this is a test | |
# since fastalign doesn't know from which word to reference this token, copy the style of the previous word | |
new_entry = translated_sentence_with_style[-1].copy() | |
new_entry["text"] = translated_token | |
translated_sentence_with_style.append(new_entry) | |
translated_sentences_with_style.append(translated_sentence_with_style) | |
return translated_sentences_with_style, translated_tokenized_sentences_spaces | |
def group_by_style(tokens: list[dict[str, str]], spaces: list[bool]) -> list[dict[str, str]]: | |
""" | |
To avoid having issues in the future, we group the contiguous tokens that have the same style. Basically, we | |
reconstruct the runs. | |
Parameters: | |
tokens: Tokens with style information | |
Returns: | |
list[dict]: A list of translated runs with format and style | |
""" | |
groups = [] | |
zipped = zip(tokens, spaces) | |
for key, group in groupby(zipped, key=lambda x: (x[0]["id"], x[0]["paragraph_index"])): | |
group = list(group) | |
tokens = [item[0]['text'] for item in group] | |
spaces = [item[1] for item in group] | |
text = Doc(spacy_nlp.vocab, words=tokens, spaces=spaces).text | |
groups.append({"text": text, | |
"id": key[0], | |
"paragraph_index": key[1]}) | |
return groups | |
def runs_to_plain_text(paragraphs_with_style: dict[int, list[dict[str, str, str]]], out_file_path: str): | |
""" | |
Generate a plain text file restoring the original tag structure like <g id=1> </g> | |
Parameters: | |
paragraphs_with_style: Dictionary where each key is the paragraph_index and its contents are a list of runs | |
out_file_path: Path to the file where the plain text will be saved | |
""" | |
with open(out_file_path, "w") as out_file: | |
def close_tags(ids): | |
tag = "" | |
for gid in ids: | |
tag_type, tag_id = gid.split("_") | |
tag += f'</{tag_type}>' | |
return tag | |
def open_tags(ids): | |
tag = "" | |
for gid in ids: | |
tag_type, tag_id = gid.split("_") | |
tag += f'<{tag_type} id="{tag_id}">' | |
return tag | |
for key, paragraph in paragraphs_with_style.items(): | |
for run in paragraph: | |
ids = list(run["id"]) if run["id"] else [] | |
if ids: | |
output = open_tags(ids) + run["text"] + close_tags(ids) | |
out_file.write(output) | |
else: | |
out_file.write("".join(run["text"])) | |
out_file.write("\n") | |
def translate_document(input_file: str, source_lang: str, target_lang: str, | |
translator, | |
aligner: Aligner, | |
temp_folder: str = "tmp", | |
tikal_folder: str = "okapi-apps_gtk2-linux-x86_64_1.47.0", with_format: bool = True) -> (str, | |
str): | |
input_filename = input_file.split("/")[-1] | |
os.makedirs(temp_folder, exist_ok=True) | |
# copy the original file to the temporal folder to avoid common issues with tikal | |
temp_input_file = os.path.join(temp_folder, input_filename) | |
shutil.copy(input_file, temp_input_file) | |
original_xliff_file = os.path.join(temp_folder, input_filename + ".xlf") | |
plain_text_file = doc_to_plain_text(temp_input_file, source_lang, target_lang, tikal_folder, original_xliff_file) | |
# get paragraphs with runs | |
paragraphs_with_runs = [get_runs_from_paragraph(line.strip(), idx) for idx, line in | |
enumerate(open(plain_text_file).readlines())] | |
# translate using plaintext file | |
original_tokenized_sentences_with_style = [] | |
original_spacing = [] | |
for run in paragraphs_with_runs: | |
tokens, spaces = tokenize_with_runs(run) | |
original_tokenized_sentences_with_style += tokens | |
original_spacing += spaces | |
translated_sentences = [] | |
yield "Translating 0%...", None | |
total = len(original_tokenized_sentences_with_style) | |
pbar = tqdm.tqdm(desc="Translating paragraphs...", total=total) | |
for i, (sentence, spacing) in enumerate(zip(original_tokenized_sentences_with_style, original_spacing)): | |
text = Doc(spacy_nlp.vocab, words=[token["text"] for token in sentence], spaces=spacing).text | |
while True: | |
try: | |
translated_sentences.append(translator.translate(text, source_lang, target_lang)) | |
break | |
except AppError as e: | |
print(e) | |
sys.exit() | |
pbar.update(1) | |
percent_complete = int(((i + 1) / total) * 100) | |
yield f"Translating {percent_complete}%...", None | |
# time to align the translation with the original | |
print("Generating alignments...") | |
yield "Aligning...", None | |
start_time = time.time() | |
translated_sentences_with_style, translated_sentences_spacing = generate_alignments( | |
original_tokenized_sentences_with_style, | |
translated_sentences, aligner, | |
temp_folder) | |
print(f"Finished alignments in {time.time() - start_time} seconds") | |
# since we tokenized these sentences independently, the spacing information does not contain spaces after punctuation | |
# at the end of the sentence (there's no space at the end of a sentence that ends with ".", unless there's a sentence | |
# right after | |
for sentence, sentence_spaces in zip(translated_sentences_with_style, translated_sentences_spacing): | |
if sentence[-1]["text"] in string.punctuation: | |
sentence_spaces[-1] = True | |
# flatten the sentences into a list of tokens | |
translated_tokens_with_style = [item for sublist in translated_sentences_with_style for item in sublist] | |
tokens_spaces = [item for sublist in translated_sentences_spacing for item in sublist] | |
# group the tokens by style/run | |
translated_runs_with_style = group_by_style(translated_tokens_with_style, tokens_spaces) | |
# group the runs by original paragraph | |
translated_paragraphs_with_style = {key: [{'id': None, 'paragraph_index': key, 'text': ""}] for key in | |
range(len(paragraphs_with_runs))} | |
for item in translated_runs_with_style: | |
# first item in the paragraph, remove starting blank space we introduced in group_by_style(), where we | |
# didn't know where paragraphs started and ended | |
if not translated_paragraphs_with_style[item['paragraph_index']][0]["text"]: | |
first_item_in_paragraph = item.copy() | |
first_item_in_paragraph["text"] = first_item_in_paragraph["text"].lstrip(" ") | |
translated_paragraphs_with_style[item['paragraph_index']] = [] | |
translated_paragraphs_with_style[item['paragraph_index']].append(first_item_in_paragraph) | |
else: | |
translated_paragraphs_with_style[item['paragraph_index']].append(item) | |
# save to new plain text file | |
translated_moses_file = os.path.join(original_xliff_file + f".{target_lang}") | |
runs_to_plain_text(translated_paragraphs_with_style, translated_moses_file) | |
# put the translations into the xlf | |
tikal_moses_to_xliff_command = [os.path.join(tikal_folder, "tikal.sh"), "-lm", original_xliff_file, "-sl", | |
source_lang, "-tl", target_lang, "-from", translated_moses_file, "-totrg", | |
"-noalttrans", "-to", original_xliff_file] | |
Popen(tikal_moses_to_xliff_command).wait() | |
# any tags that are still <g> have not been paired between original and translated texts by tikal so we remove | |
# them. This may happen if a word in the original language has been split in more that one words that have other | |
# words in between, or an error in fastalign | |
text = open(original_xliff_file).read() | |
result = re.sub(r'<g id="\d+">(.*?)</g>', r'\1', text) | |
open(original_xliff_file, "w").write(result) | |
# merge into a docx again | |
tikal_merge_doc_command = [os.path.join(tikal_folder, "tikal.sh"), "-m", original_xliff_file] | |
final_process = Popen(tikal_merge_doc_command, stdout=PIPE, stderr=PIPE) | |
stdout, stderr = final_process.communicate() | |
final_process.wait() | |
# get the path to the output file | |
output = stdout.decode('utf-8') | |
translated_file_path = re.search(r'(?<=Output:\s)(.*)', output)[0] | |
print(f"Saved file in {translated_file_path}") | |
yield "", translated_file_path | |