File size: 20,714 Bytes
ad4ed41
186c0af
acd3a4d
ad4ed41
 
 
 
 
 
 
 
 
41ceab1
186c0af
ad4ed41
acd3a4d
 
41ceab1
ad4ed41
41ceab1
 
 
 
 
ad4ed41
186c0af
ad4ed41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4420a7f
ad4ed41
 
 
 
 
 
 
 
 
 
 
 
c79a1ef
 
 
 
4420a7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c79a1ef
4420a7f
 
 
 
 
 
 
 
 
 
c79a1ef
4420a7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad4ed41
c79a1ef
ad4ed41
186c0af
41ceab1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad4ed41
41ceab1
 
186c0af
 
ad4ed41
 
 
 
 
 
41ceab1
ad4ed41
 
 
 
41ceab1
 
186c0af
 
ad4ed41
186c0af
 
 
 
 
 
 
 
 
 
 
ad4ed41
186c0af
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad4ed41
186c0af
 
 
 
 
ad4ed41
186c0af
 
 
ad4ed41
 
186c0af
 
ad4ed41
186c0af
ad4ed41
 
 
 
 
186c0af
 
ad4ed41
 
41ceab1
 
ad4ed41
 
 
 
 
 
 
 
 
 
186c0af
 
 
 
 
 
 
ad4ed41
 
186c0af
ad4ed41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186c0af
ad4ed41
 
186c0af
ad4ed41
 
 
 
 
 
 
 
 
 
 
186c0af
 
 
 
 
ad4ed41
186c0af
ad4ed41
 
 
 
 
 
 
186c0af
ad4ed41
 
 
 
 
 
 
 
c79a1ef
186c0af
 
 
 
 
 
c79a1ef
186c0af
4420a7f
186c0af
4420a7f
 
 
c79a1ef
ad4ed41
 
c79a1ef
 
186c0af
 
 
c79a1ef
186c0af
 
c79a1ef
186c0af
ad4ed41
 
 
6e54822
ad4ed41
 
acd3a4d
 
ad4ed41
5c92fe7
 
ad4ed41
 
 
 
 
 
 
 
 
 
 
 
186c0af
 
 
 
 
 
 
 
acd3a4d
 
 
 
 
186c0af
 
 
 
 
 
acd3a4d
 
 
 
 
 
 
ad4ed41
 
 
acd3a4d
ad4ed41
186c0af
 
 
 
ad4ed41
 
186c0af
 
 
 
 
 
 
ad4ed41
 
186c0af
ad4ed41
 
186c0af
ad4ed41
 
4420a7f
 
 
ad4ed41
4420a7f
 
 
ad4ed41
 
 
 
4420a7f
 
ad4ed41
 
 
 
 
 
 
 
 
 
 
186c0af
 
 
 
 
 
 
ad4ed41
 
 
 
 
 
 
 
 
 
4420a7f
acd3a4d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
import shutil
import string
import sys
import time
import os
from itertools import groupby
from subprocess import Popen, PIPE
import re

from src.aligner import Aligner

import glob
import spacy
from spacy.tokens import Doc

from gradio_client.exceptions import AppError

import tqdm

# Load multilingual model to use as sentence tokenizer
spacy_nlp = spacy.load("xx_ent_wiki_sm")
# Add the rule-based sentencizer
if "sentencizer" not in spacy_nlp.pipe_names:
    spacy_nlp.add_pipe("sentencizer")


def doc_to_plain_text(input_file: str, source_lang: str, target_lang: str, tikal_folder: str,
                      original_xliff_file_path: str) -> str:
    """
    Given a document, this function generates an xliff file and then a plain text file with the text contents
    while keeping style and formatting using tags like <g id=1> </g>

    Parameters:
    input_file: Path to document to process
    source_lang: Source language of the document
    target_lang: Target language of the document
    tikal_folder: Folder where tikal.sh is located
    original_xliff_file_path: Path to xliff file to generate, which will be use later

    Returns:
    string: Path to plain text file
    """

    tikal_xliff_command = [os.path.join(tikal_folder, "tikal.sh"), "-x", input_file, "-nocopy", "-sl", source_lang,
                           "-tl", target_lang]
    Popen(tikal_xliff_command).wait()

    tikal_moses_command = [os.path.join(tikal_folder, "tikal.sh"), "-xm", original_xliff_file_path, "-sl", source_lang,
                           "-tl", target_lang]
    Popen(tikal_moses_command).wait()

    return os.path.join(original_xliff_file_path + f".{source_lang}")


def get_runs_from_paragraph(paragraph: str, paragraph_index: int) -> list[dict[str, str | tuple[str, ...]]]:
    """
    Given some text that may or may not contain some chunks tagged with something like <g id=1> </g>, extract each
    of the runs of text and convert them into dictionaries to keep this information

    Parameters:
    text: Text to process
    paragraph_index: Index of the paragraph in the file

    Returns:
    list[dict]: Where each element is a run with text, tag id (if any, if not None) and paragraph_index
    """

    tag_stack = []
    runs = []
    pos = 0

    # Match any tag: <tag id="123"/>, </tag>, or <tag id="123">
    tag_pattern = re.compile(r'<(/?)(\w+)(?:\s+id="(\d+)")?\s*(/?)>')

    for match in tag_pattern.finditer(paragraph):
        start, end = match.span()
        is_closing = match.group(1) == "/"
        tag_name = match.group(2)
        tag_id = match.group(3)
        is_self_closing = match.group(4) == "/"

        # Text before this tag
        if start > pos:
            text = paragraph[pos:start]
            if text:
                runs.append({
                    "text": text,
                    "id": tag_stack.copy(),
                    "paragraph_index": paragraph_index
                })

        if is_closing:
            # Closing tag </tag>
            expected_prefix = f"{tag_name}_"
            if tag_stack and tag_stack[-1].startswith(expected_prefix):
                tag_stack.pop()
            else:
                raise ValueError(f"Mismatched closing tag </{tag_name}>")
        elif is_self_closing:
            # Self-closing tag like <x id="1"/>
            if tag_id is None:
                raise ValueError(f"Self-closing tag <{tag_name}/> missing id")
            runs.append({
                "text": "",
                "id": [f"{tag_name}_{tag_id}"],
                "paragraph_index": paragraph_index
            })
        else:
            # Opening tag <tag id="...">
            if tag_id is None:
                raise ValueError(f"Opening tag <{tag_name}> missing id")
            tag_stack.append(f"{tag_name}_{tag_id}")

        pos = end

    # Final trailing text
    if pos < len(paragraph):
        text = paragraph[pos:]
        if text:
            runs.append({
                "text": text,
                "id": tag_stack.copy(),
                "paragraph_index": paragraph_index
            })

    return runs


def tokenize_text(text, tokenizer):
    # To avoid the tokenizer destroying the url
    def preserve_urls(text):
        url_pattern = r'https?://[^\s\)\]\}\>]+|www\.[^\s\)\]\}\>]+'
        # Find URLs using regex and replace them with a placeholder
        urls = re.findall(url_pattern, text)
        for idx, url in enumerate(urls):
            placeholder = f"URL{idx}"
            text = text.replace(url, placeholder)

        return text, urls

    # Replace URLs with placeholders
    text, urls = preserve_urls(text)

    # Tokenize using Sacremoses
    tokens = tokenizer.tokenize(text)

    # Revert placeholders back to original URLs
    for idx, url in enumerate(urls):
        placeholder = f"URL{idx}"
        tokens = [token.replace(placeholder, url) for token in tokens]

    return tokens


def tokenize_with_runs(runs: list[dict[str, str]]) -> tuple[list[list[dict[str, str]]], list[list[bool]]]:
    """
    Given a list of runs, we need to tokenize them by sentence and token while keeping the style of each token according
    to its original run

    Parameters:
    runs: List of runs, where each item is a chunk of text (possibly various tokens) and some style/formatting information
    source_lang: Language of the document

    Returns:
    list[list[dict]]: A list of tokenized sentences where each token contains the style of its original run
    """

    # it's a bit of a mess but first we get the tokenized sentences
    # join runs and send through spacy to split into clean tokens
    doc_from_runs = spacy_nlp("".join([run["text"] for run in runs]).strip())

    # extract sentences and tokenize each into words
    tokenized_sentences = [[token.text.strip() for token in sent if token.text.strip()] for sent in doc_from_runs.sents]
    tokenized_sentences_spaces = [[token.whitespace_ != '' for token in sent if token.text.strip()] for sent in
                                  doc_from_runs.sents]

    flat_tokens = [token for sentence in tokenized_sentences for token in sentence]
    flat_spaces = [token for sentence in tokenized_sentences_spaces for token in sentence]

    flat_tokens_with_style = []
    flat_spaces_with_style = []
    token_idx = 0
    for run in runs:
        run["text"] = run["text"].strip()
        while run["text"]:
            if run["text"].startswith(flat_tokens[token_idx]):
                run["text"] = run["text"][len(flat_tokens[token_idx]):]
                if flat_spaces[token_idx]:
                    run["text"] = run["text"].lstrip()
                item = run.copy()
                item["text"] = flat_tokens[token_idx]
                flat_tokens_with_style.append(item)
                flat_spaces_with_style.append(flat_spaces[token_idx])
                token_idx += 1
            elif flat_tokens[token_idx].startswith(run["text"]):
                subtoken = flat_tokens[token_idx][:len(run["text"])]
                item = run.copy()
                item["text"] = subtoken
                flat_tokens_with_style.append(item)
                flat_spaces_with_style.append(False)
                flat_tokens[token_idx] = flat_tokens[token_idx][len(run["text"]):]
                run["text"] = run["text"][len(subtoken):]

    # reconstruct the sentences
    token_idx = 0
    tokenized_sentences_with_style, tokenized_sentences_spaces_with_style = [], []
    for sentence, sentence_spaces in zip(tokenized_sentences, tokenized_sentences_spaces):
        sentence_with_style, sentence_spaces_with_style = [], []
        for token in sentence:
            if token == flat_tokens_with_style[token_idx]["text"]:
                sentence_with_style.append(flat_tokens_with_style[token_idx])
                sentence_spaces_with_style.append(flat_spaces_with_style[token_idx])
                token_idx += 1
            elif token.startswith(flat_tokens_with_style[token_idx]["text"]):
                while token:
                    token = token[len(flat_tokens_with_style[token_idx]["text"]):]
                    sentence_with_style.append(flat_tokens_with_style[token_idx])
                    sentence_spaces_with_style.append(flat_spaces_with_style[token_idx])
                    token_idx += 1
            else:
                print(token)
                print(sentence)
                print(token_idx)
                print(flat_tokens_with_style)
                raise Exception(f"Something unexpected happened")
        tokenized_sentences_with_style.append(sentence_with_style)
        tokenized_sentences_spaces_with_style.append(sentence_spaces_with_style)

    return tokenized_sentences_with_style, tokenized_sentences_spaces_with_style


def generate_alignments(original_tokenized_sentences_with_style: list[list[dict[str, str]]],
                        translated_sentences: list[str], aligner, temp_folder: str):
    """
    Given some original sentences with style and formatting and its translation without formatting, try to match
    the translated text formatting with the original. Since we only want to run fastalign once we have to temporarily
    forget about paragraphs and work only in sentences, so the output is a list of sentences but with information about
    from which paragraph that sentence came from

    Parameters:
    original_tokenized_sentences_with_style: Original text split into sentences with style information
    translated_sentences: Translated text, split into sentences
    aligner: Object of the aligner class, uses fastalign
    temp_folder: Path to folder where to put all the intermediate files
    source_lang: original language of the document
    target_lang: target language of the translation

    Returns:
    list[list[dict]]: A list of tokenized sentences where each translated token contains the style of the associated
                        original token
    """
    # clean temp folder
    for f in glob.glob(os.path.join(temp_folder, "*align*")):
        os.remove(f)

    # tokenize the translated text by sentence and word
    translated_tokenized_sentences = []
    # keep spacing information to detokenize properly later
    translated_tokenized_sentences_spaces = []
    for sentence in translated_sentences:
        tokens = spacy_nlp(sentence)
        translated_tokenized_sentences_spaces.append([token.whitespace_ != '' for token in tokens])
        translated_tokenized_sentences.append([token.text for token in tokens])

    assert len(translated_tokenized_sentences) == len(
        original_tokenized_sentences_with_style), "The original and translated texts contain a different number of sentences, likely due to a translation error"

    original_sentences = []
    translated_sentences = []
    for original, translated in zip(original_tokenized_sentences_with_style, translated_tokenized_sentences):
        original_sentences.append(' '.join(item['text'] for item in original))
        translated_sentences.append(' '.join(translated))

    alignments = aligner.align(original_sentences, translated_sentences)

    # using the alignments generated by fastalign, we need to copy the style of the original token to the translated one
    translated_sentences_with_style = []
    for sentence_idx, sentence_alignments in enumerate(alignments):

        # reverse the order of the alignments and build a dict with it
        sentence_alignments = {target: source for source, target in sentence_alignments}

        translated_sentence_with_style: list[dict[str, str]] = []
        for token_idx, translated_token in enumerate(translated_tokenized_sentences[sentence_idx]):
            # fastalign has found a token aligned with the translated one
            if token_idx in sentence_alignments.keys():
                # get the aligned token
                original_idx = sentence_alignments[token_idx]
                new_entry = original_tokenized_sentences_with_style[sentence_idx][original_idx].copy()
                new_entry["text"] = translated_token
                translated_sentence_with_style.append(new_entry)
            else:
                # WARNING this is a test
                # since fastalign doesn't know from which word to reference this token, copy the style of the previous word
                new_entry = translated_sentence_with_style[-1].copy()
                new_entry["text"] = translated_token
                translated_sentence_with_style.append(new_entry)

        translated_sentences_with_style.append(translated_sentence_with_style)

    return translated_sentences_with_style, translated_tokenized_sentences_spaces


def group_by_style(tokens: list[dict[str, str]], spaces: list[bool]) -> list[dict[str, str]]:
    """
    To avoid having issues in the future, we group the contiguous tokens that have the same style. Basically, we
    reconstruct the runs.

    Parameters:
    tokens: Tokens with style information

    Returns:
    list[dict]: A list of translated runs with format and style
    """
    groups = []
    zipped = zip(tokens, spaces)
    for key, group in groupby(zipped, key=lambda x: (x[0]["id"], x[0]["paragraph_index"])):
        group = list(group)
        tokens = [item[0]['text'] for item in group]
        spaces = [item[1] for item in group]

        text = Doc(spacy_nlp.vocab, words=tokens, spaces=spaces).text

        groups.append({"text": text,
                       "id": key[0],
                       "paragraph_index": key[1]})
    return groups


def runs_to_plain_text(paragraphs_with_style: dict[int, list[dict[str, str, str]]], out_file_path: str):
    """
    Generate a plain text file restoring the original tag structure like <g id=1> </g>

    Parameters:
    paragraphs_with_style: Dictionary where each key is the paragraph_index and its contents are a list of runs
    out_file_path: Path to the file where the plain text will be saved
    """
    with open(out_file_path, "w") as out_file:

        def close_tags(ids):
            tag = ""
            for gid in ids:
                tag_type, tag_id = gid.split("_")
                tag += f'</{tag_type}>'
            return tag

        def open_tags(ids):
            tag = ""
            for gid in ids:
                tag_type, tag_id = gid.split("_")
                tag += f'<{tag_type} id="{tag_id}">'
            return tag

        for key, paragraph in paragraphs_with_style.items():
            for run in paragraph:
                ids = list(run["id"]) if run["id"] else []

                if ids:
                    output = open_tags(ids) + run["text"] + close_tags(ids)
                    out_file.write(output)

                else:
                    out_file.write("".join(run["text"]))

            out_file.write("\n")


def translate_document(input_file: str, source_lang: str, target_lang: str,
                       translator,
                       aligner: Aligner,
                       temp_folder: str = "tmp",
                       tikal_folder: str = "okapi-apps_gtk2-linux-x86_64_1.47.0", with_format: bool = True) -> (str,
                                                                                                                str):
    input_filename = input_file.split("/")[-1]
    os.makedirs(temp_folder, exist_ok=True)

    # copy the original file to the temporal folder to avoid common issues with tikal
    temp_input_file = os.path.join(temp_folder, input_filename)
    shutil.copy(input_file, temp_input_file)

    original_xliff_file = os.path.join(temp_folder, input_filename + ".xlf")
    plain_text_file = doc_to_plain_text(temp_input_file, source_lang, target_lang, tikal_folder, original_xliff_file)

    # get paragraphs with runs
    paragraphs_with_runs = [get_runs_from_paragraph(line.strip(), idx) for idx, line in
                            enumerate(open(plain_text_file).readlines())]

    # translate using plaintext file
    original_tokenized_sentences_with_style = []
    original_spacing = []
    for run in paragraphs_with_runs:
        tokens, spaces = tokenize_with_runs(run)
        original_tokenized_sentences_with_style += tokens
        original_spacing += spaces

    translated_sentences = []
    yield "Translating 0%...", None
    total = len(original_tokenized_sentences_with_style)
    pbar = tqdm.tqdm(desc="Translating paragraphs...", total=total)

    for i, (sentence, spacing) in enumerate(zip(original_tokenized_sentences_with_style, original_spacing)):
        text = Doc(spacy_nlp.vocab, words=[token["text"] for token in sentence], spaces=spacing).text

        while True:
            try:
                translated_sentences.append(translator.translate(text, source_lang, target_lang))
                break
            except AppError as e:
                print(e)
                sys.exit()

        pbar.update(1)
        percent_complete = int(((i + 1) / total) * 100)
        yield f"Translating {percent_complete}%...", None

    # time to align the translation with the original
    print("Generating alignments...")
    yield "Aligning...", None
    start_time = time.time()
    translated_sentences_with_style, translated_sentences_spacing = generate_alignments(
        original_tokenized_sentences_with_style,
        translated_sentences, aligner,
        temp_folder)
    print(f"Finished alignments in {time.time() - start_time} seconds")

    # since we tokenized these sentences independently, the spacing information does not contain spaces after punctuation
    # at the end of the sentence (there's no space at the end of a sentence that ends with ".", unless there's a sentence
    # right after
    for sentence, sentence_spaces in zip(translated_sentences_with_style, translated_sentences_spacing):
        if sentence[-1]["text"] in string.punctuation:
            sentence_spaces[-1] = True

    # flatten the sentences into a list of tokens
    translated_tokens_with_style = [item for sublist in translated_sentences_with_style for item in sublist]
    tokens_spaces = [item for sublist in translated_sentences_spacing for item in sublist]

    # group the tokens by style/run
    translated_runs_with_style = group_by_style(translated_tokens_with_style, tokens_spaces)

    # group the runs by original paragraph
    translated_paragraphs_with_style = {key: [{'id': None, 'paragraph_index': key, 'text': ""}] for key in
                                        range(len(paragraphs_with_runs))}

    for item in translated_runs_with_style:
        # first item in the paragraph, remove starting blank space we introduced in group_by_style(), where we
        # didn't know where paragraphs started and ended
        if not translated_paragraphs_with_style[item['paragraph_index']][0]["text"]:
            first_item_in_paragraph = item.copy()
            first_item_in_paragraph["text"] = first_item_in_paragraph["text"].lstrip(" ")
            translated_paragraphs_with_style[item['paragraph_index']] = []
            translated_paragraphs_with_style[item['paragraph_index']].append(first_item_in_paragraph)
        else:
            translated_paragraphs_with_style[item['paragraph_index']].append(item)

    # save to new plain text file
    translated_moses_file = os.path.join(original_xliff_file + f".{target_lang}")
    runs_to_plain_text(translated_paragraphs_with_style, translated_moses_file)

    # put the translations into the xlf
    tikal_moses_to_xliff_command = [os.path.join(tikal_folder, "tikal.sh"), "-lm", original_xliff_file, "-sl",
                                    source_lang, "-tl", target_lang, "-from", translated_moses_file, "-totrg",
                                    "-noalttrans", "-to", original_xliff_file]
    Popen(tikal_moses_to_xliff_command).wait()

    # any tags that are still <g> have not been paired between original and translated texts by tikal so we remove
    # them. This may happen if a word in the original language has been split in more that one words that have other
    # words in between, or an error in fastalign
    text = open(original_xliff_file).read()
    result = re.sub(r'<g id="\d+">(.*?)</g>', r'\1', text)
    open(original_xliff_file, "w").write(result)

    # merge into a docx again
    tikal_merge_doc_command = [os.path.join(tikal_folder, "tikal.sh"), "-m", original_xliff_file]
    final_process = Popen(tikal_merge_doc_command, stdout=PIPE, stderr=PIPE)
    stdout, stderr = final_process.communicate()
    final_process.wait()

    # get the path to the output file
    output = stdout.decode('utf-8')
    translated_file_path = re.search(r'(?<=Output:\s)(.*)', output)[0]

    print(f"Saved file in {translated_file_path}")
    yield "", translated_file_path