# import os | |
# import io | |
# import re | |
# from typing import List, Tuple, Dict | |
# import torch | |
# import gradio as gr | |
# from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
# # --- NEW: docs --- | |
# import docx | |
# from docx.enum.text import WD_ALIGN_PARAGRAPH | |
# from docx.text.paragraph import Paragraph | |
# # PDF read & write | |
# import fitz # PyMuPDF | |
# from reportlab.lib.pagesizes import A4 | |
# from reportlab.lib.styles import getSampleStyleSheet | |
# from reportlab.lib.enums import TA_JUSTIFY | |
# from reportlab.platypus import SimpleDocTemplate, Paragraph as RLParagraph, Spacer, PageBreak | |
# from reportlab.lib.units import cm | |
# # ================= CONFIG ================= | |
# MODEL_REPO = "Toadoum/ngambay-fr-v1" | |
# # Use the lang tokens that actually exist in your tokenizer. | |
# # Switch FR_CODE to "fra_Latn" only if your tokenizer truly has it. | |
# FR_CODE = "sba_Latn" # Français (source) | |
# NG_CODE = "fr_Latn" # Ngambay (cible) | |
# # Inference | |
# MAX_NEW_TOKENS = 256 | |
# TEMPERATURE = 0.0 | |
# NUM_BEAMS = 1 | |
# # Performance knobs | |
# MAX_SRC_TOKENS = 420 # per chunk | |
# BATCH_SIZE_DEFAULT = 12 # base batch size (autoscaled below) | |
# # ================= Helpers ================= | |
# def auto_batch_size(default=BATCH_SIZE_DEFAULT): | |
# if not torch.cuda.is_available(): | |
# return max(2, min(6, default)) # CPU | |
# try: | |
# free, total = torch.cuda.mem_get_info() | |
# gb = free / (1024**3) | |
# if gb < 2: return 2 | |
# if gb < 4: return 6 | |
# if gb < 8: return 10 | |
# return default | |
# except Exception: | |
# return default | |
# BATCH_SIZE = auto_batch_size() | |
# # -------- Load model & tokenizer (meta-safe) -------- | |
# USE_CUDA = torch.cuda.is_available() | |
# tokenizer = AutoTokenizer.from_pretrained(MODEL_REPO, trust_remote_code=True) | |
# model = AutoModelForSeq2SeqLM.from_pretrained( | |
# MODEL_REPO, | |
# device_map="auto" if USE_CUDA else None, # let Accelerate place weights if GPU | |
# torch_dtype=torch.float16 if USE_CUDA else torch.float32, | |
# low_cpu_mem_usage=False, | |
# trust_remote_code=True, | |
# ) | |
# # --- Ensure pad/eos/bos exist and are INTS (not tensors) --- | |
# def _to_int_or_list(x): | |
# if isinstance(x, torch.Tensor): | |
# return int(x.item()) if x.numel() == 1 else [int(v) for v in x.tolist()] | |
# if isinstance(x, (list, tuple)): | |
# return [int(v) for v in x] | |
# return int(x) if x is not None else None | |
# # Safeguard pad token | |
# if tokenizer.pad_token is None and tokenizer.eos_token is not None: | |
# tokenizer.pad_token = tokenizer.eos_token | |
# elif tokenizer.pad_token is None: | |
# tokenizer.add_special_tokens({"pad_token": "<pad>"}) | |
# model.resize_token_embeddings(len(tokenizer)) | |
# # Normalize generation config + mirror on model.config | |
# gc = model.generation_config | |
# for attr in ["pad_token_id", "eos_token_id", "bos_token_id", "decoder_start_token_id"]: | |
# tok_val = getattr(tokenizer, attr, None) | |
# cfg_val = getattr(gc, attr, None) | |
# val = tok_val if tok_val is not None else cfg_val | |
# if val is not None: | |
# setattr(gc, attr, _to_int_or_list(val)) | |
# # mirror on model.config | |
# val2 = getattr(model.generation_config, attr, None) | |
# if val2 is not None: | |
# setattr(model.config, attr, _to_int_or_list(val2)) | |
# # ================= Low-level NLLB-style generation ================= | |
# def _forced_bos_id(lang_code: str): | |
# # Try common mappings first | |
# if hasattr(tokenizer, "lang_code_to_id") and isinstance(tokenizer.lang_code_to_id, dict): | |
# if lang_code in tokenizer.lang_code_to_id: | |
# return int(tokenizer.lang_code_to_id[lang_code]) | |
# # Fallback: treat lang code as a token | |
# try: | |
# tok_id = tokenizer.convert_tokens_to_ids(lang_code) | |
# if isinstance(tok_id, int) and tok_id != tokenizer.unk_token_id: | |
# return tok_id | |
# except Exception: | |
# pass | |
# # Final fallback: keep whatever the model already has | |
# return model.generation_config.forced_bos_token_id | |
# def _encode(texts: List[str], src_lang: str): | |
# # NLLB/M2M-style: set source lang on tokenizer if supported | |
# if hasattr(tokenizer, "src_lang"): | |
# tokenizer.src_lang = src_lang | |
# return tokenizer( | |
# texts, | |
# return_tensors="pt", | |
# padding=True, | |
# truncation=True, | |
# add_special_tokens=True, | |
# ) | |
# def _generate_batch(texts: List[str], src_lang: str, tgt_lang: str) -> List[str]: | |
# if not texts: | |
# return [] | |
# inputs = _encode(texts, src_lang) | |
# # NOTE: Do NOT move inputs; with device_map="auto" the hooks handle it. | |
# # Keep tensors on CPU; accelerate offloads as needed. | |
# forced_bos = _forced_bos_id(tgt_lang) | |
# gen_kwargs = dict( | |
# max_new_tokens=MAX_NEW_TOKENS, | |
# do_sample=False, | |
# num_beams=NUM_BEAMS, | |
# eos_token_id=model.generation_config.eos_token_id, | |
# pad_token_id=model.generation_config.pad_token_id, | |
# forced_bos_token_id=forced_bos, | |
# ) | |
# with torch.no_grad(): | |
# output_ids = model.generate(**inputs, **gen_kwargs) | |
# return tokenizer.batch_decode(output_ids, skip_special_tokens=True) | |
# # ================= Simple text translation ================= | |
# def translate_text_simple(text: str) -> str: | |
# if not text or not text.strip(): | |
# return "" | |
# return _generate_batch([text], FR_CODE, NG_CODE)[0] | |
# # ================= Chunking + Batched Translation + Cache ================= | |
# def tokenize_len(s: str) -> int: | |
# return tokenizer(s, add_special_tokens=False, return_length=True)["length"][0] | |
# def chunk_text_for_translation(text: str, max_src_tokens: int = MAX_SRC_TOKENS) -> List[str]: | |
# """Split text by sentence-ish boundaries and merge under token limit.""" | |
# if not text.strip(): | |
# return [] | |
# parts = re.split(r'(\s*[\.\!\?…:;]\s+)', text) | |
# sentences = [] | |
# for i in range(0, len(parts), 2): | |
# s = parts[i] | |
# p = parts[i+1] if i+1 < len(parts) else "" | |
# unit = (s + (p or "")).strip() | |
# if unit: | |
# sentences.append(unit) | |
# chunks, current = [], "" | |
# for sent in sentences: | |
# candidate = (current + " " + sent).strip() if current else sent | |
# if current and tokenize_len(candidate) > max_src_tokens: | |
# chunks.append(current.strip()) | |
# current = sent | |
# else: | |
# current = candidate | |
# if current.strip(): | |
# chunks.append(current.strip()) | |
# return chunks | |
# # Small bounded cache (LRU-like using dict + cap) | |
# TRANSLATION_CACHE: Dict[str, str] = {} | |
# CACHE_CAP = 20000 | |
# def _cache_set(k: str, v: str): | |
# if len(TRANSLATION_CACHE) >= CACHE_CAP: | |
# # drop ~5% oldest items | |
# for i, key in enumerate(list(TRANSLATION_CACHE.keys())): | |
# del TRANSLATION_CACHE[key] | |
# if i > CACHE_CAP // 20: | |
# break | |
# TRANSLATION_CACHE[k] = v | |
# def translate_chunks_list(chunks: List[str], batch_size: int = BATCH_SIZE) -> List[str]: | |
# """ | |
# Translate a list of chunks with de-dup + batching. | |
# Returns translations in the same order as input. | |
# """ | |
# norm_chunks = [c.strip() for c in chunks] | |
# unique_to_translate = [] | |
# seen = set() | |
# for c in norm_chunks: | |
# if c and c not in TRANSLATION_CACHE and c not in seen: | |
# seen.add(c) | |
# unique_to_translate.append(c) | |
# for i in range(0, len(unique_to_translate), batch_size): | |
# batch = unique_to_translate[i:i + batch_size] | |
# outs = _generate_batch(batch, FR_CODE, NG_CODE) | |
# for src, o in zip(batch, outs): | |
# _cache_set(src, o) | |
# return [TRANSLATION_CACHE.get(c, "") for c in norm_chunks] | |
# def translate_long_text(text: str) -> str: | |
# """Chunk → batch translate → rejoin for one paragraph/block.""" | |
# chs = chunk_text_for_translation(text) | |
# if not chs: | |
# return "" | |
# trs = translate_chunks_list(chs) | |
# return " ".join(trs).strip() | |
# # ================= DOCX helpers ================= | |
# def is_heading(par: Paragraph) -> Tuple[bool, int]: | |
# # Works with English and French Word styles | |
# name = (par.style.name or "").lower() | |
# if any(c in name for c in ["heading", "title", "titre"]): | |
# for lvl in range(1, 10): | |
# if str(lvl) in name: | |
# return True, lvl | |
# return True, 1 | |
# return False, 0 | |
# def translate_docx_bytes(file_bytes: bytes) -> bytes: | |
# """ | |
# Read .docx → collect ALL chunks (paras + table cells) → single batched translation → rebuild .docx. | |
# Paragraphs and table cell paragraphs are justified; headings kept as headings. | |
# """ | |
# f = io.BytesIO(file_bytes) | |
# src_doc = docx.Document(f) | |
# # 1) Collect work units | |
# work = [] # list of dict entries describing items with ranges into all_chunks | |
# all_chunks: List[str] = [] | |
# # paragraphs | |
# for par in src_doc.paragraphs: | |
# txt = par.text | |
# if not txt.strip(): | |
# work.append({"kind": "blank"}) | |
# continue | |
# is_head, lvl = is_heading(par) | |
# if is_head: | |
# work.append({"kind": "heading", "level": min(max(lvl, 1), 9), "range": (len(all_chunks), len(all_chunks)+1)}) | |
# all_chunks.append(txt.strip()) | |
# else: | |
# chs = chunk_text_for_translation(txt) | |
# if chs: | |
# start = len(all_chunks) | |
# all_chunks.extend(chs) | |
# work.append({"kind": "para", "range": (start, start+len(chs))}) | |
# else: | |
# work.append({"kind": "blank"}) | |
# # tables | |
# for table in src_doc.tables: | |
# t_desc = {"kind": "table", "rows": len(table.rows), "cols": len(table.columns), "cells": []} | |
# for row in table.rows: | |
# row_cells = [] | |
# for cell in row.cells: | |
# cell_text = "\n".join([p.text for p in cell.paragraphs]).strip() | |
# if cell_text: | |
# chs = chunk_text_for_translation(cell_text) | |
# if chs: | |
# start = len(all_chunks) | |
# all_chunks.extend(chs) | |
# row_cells.append({"range": (start, start+len(chs))}) | |
# else: | |
# row_cells.append({"range": None}) | |
# else: | |
# row_cells.append({"range": None}) | |
# t_desc["cells"].append(row_cells) | |
# work.append(t_desc) | |
# # 2) Translate all chunks at once (de-dup + batching) | |
# translated_all = translate_chunks_list(all_chunks) if all_chunks else [] | |
# # 3) Rebuild new document with justified paragraphs | |
# new_doc = docx.Document() | |
# def join_range(rng: Tuple[int, int]) -> str: | |
# if rng is None: | |
# return "" | |
# s, e = rng | |
# return " ".join(translated_all[s:e]).strip() | |
# for item in work: | |
# if item["kind"] == "blank": | |
# new_doc.add_paragraph("") | |
# elif item["kind"] == "heading": | |
# text = join_range(item["range"]) | |
# new_doc.add_heading(text, level=item["level"]) | |
# elif item["kind"] == "para": | |
# text = join_range(item["range"]) | |
# p = new_doc.add_paragraph(text) | |
# p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY | |
# elif item["kind"] == "table": | |
# tbl = new_doc.add_table(rows=item["rows"], cols=item["cols"]) | |
# for r_idx in range(item["rows"]): | |
# for c_idx in range(item["cols"]): | |
# cell_info = item["cells"][r_idx][c_idx] | |
# txt = join_range(cell_info["range"]) | |
# tgt_cell = tbl.cell(r_idx, c_idx) | |
# tgt_cell.text = txt | |
# for p in tgt_cell.paragraphs: | |
# p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY | |
# out = io.BytesIO() | |
# new_doc.save(out) | |
# return out.getvalue() | |
# # ================= PDF helpers ================= | |
# def extract_pdf_text_blocks(pdf_bytes: bytes) -> List[List[str]]: | |
# """ | |
# Returns list of pages, each a list of block texts (visual order). | |
# """ | |
# pages_blocks: List[List[str]] = [] | |
# doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
# for page in doc: | |
# blocks = page.get_text("blocks") | |
# blocks.sort(key=lambda b: (round(b[1], 1), round(b[0], 1))) | |
# page_texts = [] | |
# for b in blocks: | |
# text = b[4].strip() | |
# if text: | |
# page_texts.append(text) | |
# pages_blocks.append(page_texts) | |
# doc.close() | |
# return pages_blocks | |
# def build_pdf_from_blocks(translated_pages: List[List[str]]) -> bytes: | |
# """ | |
# Build a clean paginated PDF with justified paragraphs. | |
# Keeps one translated page per original page via PageBreak. | |
# """ | |
# buf = io.BytesIO() | |
# doc = SimpleDocTemplate( | |
# buf, pagesize=A4, | |
# rightMargin=2*cm, leftMargin=2*cm, | |
# topMargin=2*cm, bottomMargin=2*cm | |
# ) | |
# styles = getSampleStyleSheet() | |
# body = styles["BodyText"] | |
# body.alignment = TA_JUSTIFY | |
# body.leading = 14 | |
# story = [] | |
# for p_idx, blocks in enumerate(translated_pages): | |
# if p_idx > 0: | |
# story.append(PageBreak()) | |
# for blk in blocks: | |
# story.append(RLParagraph(blk.replace("\n", "<br/>"), body)) | |
# story.append(Spacer(1, 0.35*cm)) | |
# doc.build(story) | |
# return buf.getvalue() | |
# def translate_pdf_bytes(file_bytes: bytes) -> bytes: | |
# """ | |
# Read PDF → collect ALL block chunks across pages → single batched translation → rebuild PDF. | |
# """ | |
# pages_blocks = extract_pdf_text_blocks(file_bytes) | |
# # 1) collect chunks for the entire PDF | |
# all_chunks: List[str] = [] | |
# plan = [] # list of pages, each a list of ranges for blocks | |
# for blocks in pages_blocks: | |
# page_plan = [] | |
# for blk in blocks: | |
# chs = chunk_text_for_translation(blk) | |
# if chs: | |
# start = len(all_chunks) | |
# all_chunks.extend(chs) | |
# page_plan.append((start, start + len(chs))) | |
# else: | |
# page_plan.append(None) | |
# plan.append(page_plan) | |
# # 2) translate all chunks at once | |
# translated_all = translate_chunks_list(all_chunks) if all_chunks else [] | |
# # 3) reconstruct per block | |
# translated_pages: List[List[str]] = [] | |
# for page_plan in plan: | |
# page_out = [] | |
# for rng in page_plan: | |
# if rng is None: | |
# page_out.append("") | |
# else: | |
# s, e = rng | |
# page_out.append(" ".join(translated_all[s:e]).strip()) | |
# translated_pages.append(page_out) | |
# return build_pdf_from_blocks(translated_pages) | |
# # ================= Gradio file handler ================= | |
# def translate_document(file_obj): | |
# """ | |
# Accepts gr.File input (NamedString, filepath str, or dict with binary). | |
# Returns (output_file_path, status_message). | |
# """ | |
# if file_obj is None: | |
# return None, "Veuillez sélectionner un fichier .docx ou .pdf" | |
# try: | |
# name = "document" | |
# data = None | |
# # Case A: plain filepath string | |
# if isinstance(file_obj, str): | |
# name = os.path.basename(file_obj) | |
# with open(file_obj, "rb") as f: | |
# data = f.read() | |
# # Case B: Gradio NamedString with .name (orig name) and .value (temp path) | |
# elif hasattr(file_obj, "name") and hasattr(file_obj, "value"): | |
# name = os.path.basename(file_obj.name or "document") | |
# with open(file_obj.value, "rb") as f: | |
# data = f.read() | |
# # Case C: dict (type="binary") | |
# elif isinstance(file_obj, dict) and "name" in file_obj and "data" in file_obj: | |
# name = os.path.basename(file_obj["name"] or "document") | |
# d = file_obj["data"] | |
# data = d.read() if hasattr(d, "read") else d | |
# else: | |
# return None, "Type d'entrée fichier non supporté (filepath/binaire)." | |
# if data is None: | |
# return None, "Impossible de lire le fichier sélectionné." | |
# if name.lower().endswith(".docx"): | |
# out_bytes = translate_docx_bytes(data) | |
# out_path = "translated_ngambay.docx" | |
# with open(out_path, "wb") as f: | |
# f.write(out_bytes) | |
# return out_path, "✅ Traduction DOCX terminée (paragraphes justifiés)." | |
# elif name.lower().endswith(".pdf"): | |
# out_bytes = translate_pdf_bytes(data) | |
# out_path = "translated_ngambay.pdf" | |
# with open(out_path, "wb") as f: | |
# f.write(out_bytes) | |
# return out_path, "✅ Traduction PDF terminée (paragraphes justifiés)." | |
# else: | |
# return None, "Type de fichier non supporté. Choisissez .docx ou .pdf" | |
# except Exception as e: | |
# return None, f"❌ Erreur pendant la traduction: {e}" | |
# # ================== UI ================== | |
# theme = gr.themes.Soft( | |
# primary_hue="indigo", | |
# radius_size="lg", | |
# font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"] | |
# ).set( | |
# body_background_fill="#f7f7fb", | |
# button_primary_text_color="#ffffff" | |
# ) | |
# CUSTOM_CSS = """ | |
# .gradio-container {max-width: 980px !important;} | |
# .header-card { | |
# background: linear-gradient(135deg, #4f46e5 0%, #7c3aed 100%); | |
# color: white; padding: 22px; border-radius: 18px; | |
# box-shadow: 0 10px 30px rgba(79,70,229,.25); | |
# transition: transform .2s ease; | |
# } | |
# .header-card:hover { transform: translateY(-1px); } | |
# .header-title { font-size: 26px; font-weight: 800; margin: 0 0 6px 0; letter-spacing: .2px; } | |
# .header-sub { opacity: .98; font-size: 14px; } | |
# .brand { display:flex; align-items:center; gap:10px; justify-content:space-between; flex-wrap:wrap; } | |
# .badge { | |
# display:inline-block; background: rgba(255,255,255,.18); | |
# padding: 4px 10px; border-radius: 999px; font-size: 12px; | |
# border: 1px solid rgba(255,255,255,.25); | |
# } | |
# .footer-note { | |
# margin-top: 8px; color: #64748b; font-size: 12px; text-align: center; | |
# } | |
# .support-banner { | |
# margin-top: 14px; | |
# border-radius: 14px; | |
# padding: 14px 16px; | |
# background: linear-gradient(135deg, rgba(79,70,229,.08), rgba(124,58,237,.08)); | |
# border: 1px solid rgba(99,102,241,.25); | |
# box-shadow: 0 6px 18px rgba(79,70,229,.08); | |
# } | |
# .support-title { font-weight: 700; font-size: 16px; margin-bottom: 4px; } | |
# .support-text { font-size: 13px; color: #334155; line-height: 1.5; } | |
# .support-contacts { display: flex; gap: 10px; flex-wrap: wrap; margin-top: 8px; } | |
# .support-chip { | |
# display:inline-block; padding: 6px 10px; border-radius: 999px; | |
# background: white; border: 1px dashed rgba(79,70,229,.45); | |
# font-size: 12px; color: #3730a3; | |
# } | |
# """ | |
# with gr.Blocks( | |
# title="Français → Ngambay · Toadoum/ngambay-fr-v1", | |
# theme=theme, | |
# css=CUSTOM_CSS, | |
# fill_height=True, | |
# ) as demo: | |
# with gr.Group(elem_classes=["header-card"]): | |
# gr.HTML( | |
# """ | |
# <div class="brand"> | |
# <div> | |
# <div class="header-title">Français → Ngambay (v1)</div> | |
# <div class="header-sub">🚀 Version bêta · Merci de tester et partager vos retours pour améliorer la qualité de traduction.</div> | |
# </div> | |
# <span class="badge">Modèle : Toadoum/ngambay-fr-v1</span> | |
# </div> | |
# """ | |
# ) | |
# with gr.Tabs(): | |
# # -------- Tab 1: Texte -------- | |
# with gr.Tab("Traduction de texte"): | |
# with gr.Row(): | |
# with gr.Column(scale=5): | |
# src = gr.Textbox( | |
# label="Texte source (Français)", | |
# placeholder="Saisissez votre texte en français…", | |
# lines=8, | |
# autofocus=True | |
# ) | |
# with gr.Row(): | |
# btn = gr.Button("Traduire", variant="primary", scale=3) | |
# clear_btn = gr.Button("Effacer", scale=1) | |
# gr.Examples( | |
# examples=[ | |
# ["Bonjour, comment allez-vous aujourd’hui ?"], | |
# ["La réunion de sensibilisation aura lieu demain au centre communautaire."], | |
# ["Merci pour votre participation et votre soutien."], | |
# ["Veuillez suivre les recommandations de santé pour protéger votre famille."] | |
# ], | |
# inputs=[src], | |
# label="Exemples (cliquez pour remplir)" | |
# ) | |
# with gr.Column(scale=5): | |
# tgt = gr.Textbox( | |
# label="Traduction (Ngambay)", | |
# lines=8, | |
# interactive=False, | |
# show_copy_button=True | |
# ) | |
# gr.Markdown('<div class="footer-note">Astuce : collez un paragraphe complet pour un meilleur contexte. Les noms propres et sigles peuvent nécessiter une relecture humaine.</div>') | |
# # -------- Tab 2: Documents -------- | |
# with gr.Tab("Traduction de document (.docx / .pdf)"): | |
# with gr.Row(): | |
# with gr.Column(scale=5): | |
# doc_inp = gr.File( | |
# label="Sélectionnez un document (.docx ou .pdf)", | |
# file_types=[".docx", ".pdf"], | |
# type="filepath" # ensures a temp filepath; handler also supports binary | |
# ) | |
# run_doc = gr.Button("Traduire le document", variant="primary") | |
# with gr.Column(scale=5): | |
# doc_out = gr.File(label="Fichier traduit (télécharger)") | |
# doc_status = gr.Markdown(visible=False) | |
# def _wrap_translate_document(f): | |
# path, msg = translate_document(f) | |
# return path, gr.update(value=msg, visible=True) | |
# run_doc.click(_wrap_translate_document, inputs=doc_inp, outputs=[doc_out, doc_status]) | |
# # Contribution banner | |
# gr.HTML( | |
# """ | |
# <div class="support-banner"> | |
# <div class="support-title">💙 Contribuer au projet (recrutement de linguistes)</div> | |
# <div class="support-text"> | |
# Nous cherchons à <b>recruter des linguistes</b> pour renforcer la construction de données Ngambay. | |
# Si vous souhaitez soutenir financièrement ou en tant que bénévole, contactez-nous : | |
# </div> | |
# <div class="support-contacts"> | |
# <span class="support-chip">📱 WhatsApp, Airtel Money : <b>+235 66 04 90 94</b></span> | |
# <span class="support-chip">✉️ Email : <a href="mailto:tsakayo@aimsammi.org">tsakayo@aimsammi.org</a></span> | |
# </div> | |
# </div> | |
# """ | |
# ) | |
# # Text actions | |
# btn.click(translate_text_simple, inputs=src, outputs=tgt) | |
# clear_btn.click(lambda: ("", ""), outputs=[src, tgt]) | |
# if __name__ == "__main__": | |
# # No .to(...) anywhere; model stays where Accelerate placed it (or CPU). | |
# demo.queue(default_concurrency_limit=4).launch(share=True) | |
import os | |
import io | |
import re | |
from typing import List, Tuple, Dict | |
import torch | |
import gradio as gr | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
# --- NEW: docs --- | |
import docx | |
from docx.enum.text import WD_ALIGN_PARAGRAPH | |
from docx.text.paragraph import Paragraph | |
# PDF read & write | |
import fitz # PyMuPDF | |
from reportlab.lib.pagesizes import A4 | |
from reportlab.lib.styles import getSampleStyleSheet | |
from reportlab.lib.enums import TA_JUSTIFY | |
from reportlab.platypus import SimpleDocTemplate, Paragraph as RLParagraph, Spacer | |
from reportlab.lib.units import cm | |
# ================= CONFIG ================= | |
MODEL_REPO = "Toadoum/ngambay-fr-v1" | |
FR_CODE = "fra_Latn" # Français (source) | |
NG_CODE = "sba_Latn" # Ngambay (cible) | |
# Inference | |
MAX_NEW_TOKENS = 256 | |
TEMPERATURE = 0.0 | |
NUM_BEAMS = 1 | |
# Performance knobs | |
MAX_SRC_TOKENS = 420 # per chunk; reduce to ~320 if you want even faster | |
BATCH_SIZE = 12 # number of chunks per model call (tune for your hardware) | |
# Device selection | |
device = 0 if torch.cuda.is_available() else -1 # set -1 on Spaces CPU if needed | |
# Load model & tokenizer once | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_REPO) | |
model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_REPO) | |
translator = pipeline( | |
task="translation", | |
model=model, | |
tokenizer=tokenizer, | |
device=device, | |
) | |
# Simple text box translation (kept) | |
def translate_text_simple(text: str) -> str: | |
if not text or not text.strip(): | |
return "" | |
with torch.no_grad(): | |
out = translator( | |
text, | |
src_lang=FR_CODE, | |
tgt_lang=NG_CODE, | |
max_new_tokens=MAX_NEW_TOKENS, | |
do_sample=False, | |
num_beams=NUM_BEAMS, | |
) | |
return out[0]["translation_text"] | |
# ---------- Chunking + Batched Translation + Cache ---------- | |
def tokenize_len(s: str) -> int: | |
return len(tokenizer.encode(s, add_special_tokens=False)) | |
def chunk_text_for_translation(text: str, max_src_tokens: int = MAX_SRC_TOKENS) -> List[str]: | |
"""Split text by sentence-ish boundaries and merge under token limit.""" | |
if not text.strip(): | |
return [] | |
parts = re.split(r'(\s*[\.\!\?…:;]\s+)', text) | |
sentences = [] | |
for i in range(0, len(parts), 2): | |
s = parts[i] | |
p = parts[i+1] if i+1 < len(parts) else "" | |
unit = (s + (p or "")).strip() | |
if unit: | |
sentences.append(unit) | |
chunks, current = [], "" | |
for sent in sentences: | |
candidate = (current + " " + sent).strip() if current else sent | |
if current and tokenize_len(candidate) > max_src_tokens: | |
chunks.append(current.strip()) | |
current = sent | |
else: | |
current = candidate | |
if current.strip(): | |
chunks.append(current.strip()) | |
return chunks | |
# module-level cache: identical chunks translated once | |
TRANSLATION_CACHE: Dict[str, str] = {} | |
def translate_chunks_list(chunks: List[str], batch_size: int = BATCH_SIZE) -> List[str]: | |
""" | |
Translate a list of chunks with de-dup + batching. | |
Returns translations in the same order as input. | |
""" | |
# Normalize & collect unique chunks to translate | |
norm_chunks = [c.strip() for c in chunks] | |
to_translate = [] | |
for c in norm_chunks: | |
if c and c not in TRANSLATION_CACHE: | |
to_translate.append(c) | |
# Batched calls | |
with torch.no_grad(): | |
for i in range(0, len(to_translate), batch_size): | |
batch = to_translate[i:i + batch_size] | |
outs = translator( | |
batch, | |
src_lang=FR_CODE, | |
tgt_lang=NG_CODE, | |
max_new_tokens=MAX_NEW_TOKENS, | |
do_sample=False, | |
num_beams=NUM_BEAMS, | |
) | |
for src, o in zip(batch, outs): | |
TRANSLATION_CACHE[src] = o["translation_text"] | |
return [TRANSLATION_CACHE.get(c, "") for c in norm_chunks] | |
def translate_long_text(text: str) -> str: | |
"""Chunk → batch translate → rejoin for one paragraph/block.""" | |
chs = chunk_text_for_translation(text) | |
if not chs: | |
return "" | |
trs = translate_chunks_list(chs) | |
# join with space to reconstruct paragraph smoothly | |
return " ".join(trs).strip() | |
# ---------- DOCX helpers (now fully batched across the whole doc) ---------- | |
def is_heading(par: Paragraph) -> Tuple[bool, int]: | |
style = (par.style.name or "").lower() | |
if "heading" in style: | |
for lvl in range(1, 10): | |
if str(lvl) in style: | |
return True, lvl | |
return True, 1 | |
return False, 0 | |
def translate_docx_bytes(file_bytes: bytes) -> bytes: | |
""" | |
Read .docx → collect ALL chunks (paras + table cells) → single batched translation → rebuild .docx. | |
Paragraphs and table cell paragraphs are justified; headings kept as headings. | |
""" | |
f = io.BytesIO(file_bytes) | |
src_doc = docx.Document(f) | |
# 1) Collect work units | |
work = [] # list of dict entries describing items with ranges into all_chunks | |
all_chunks: List[str] = [] | |
# paragraphs | |
for par in src_doc.paragraphs: | |
txt = par.text | |
if not txt.strip(): | |
work.append({"kind": "blank"}) | |
continue | |
is_head, lvl = is_heading(par) | |
if is_head: | |
# treat as single chunk (usually short) | |
work.append({"kind": "heading", "level": min(max(lvl, 1), 9), "range": (len(all_chunks), len(all_chunks)+1)}) | |
all_chunks.append(txt.strip()) | |
else: | |
chs = chunk_text_for_translation(txt) | |
if chs: | |
start = len(all_chunks) | |
all_chunks.extend(chs) | |
work.append({"kind": "para", "range": (start, start+len(chs))}) | |
else: | |
work.append({"kind": "blank"}) | |
# tables | |
for t_idx, table in enumerate(src_doc.tables): | |
t_desc = {"kind": "table", "rows": len(table.rows), "cols": len(table.columns), "cells": []} | |
for r_idx, row in enumerate(table.rows): | |
row_cells = [] | |
for c_idx, cell in enumerate(row.cells): | |
cell_text = "\n".join([p.text for p in cell.paragraphs]).strip() | |
if cell_text: | |
chs = chunk_text_for_translation(cell_text) | |
if chs: | |
start = len(all_chunks) | |
all_chunks.extend(chs) | |
row_cells.append({"range": (start, start+len(chs))}) | |
else: | |
row_cells.append({"range": None}) | |
else: | |
row_cells.append({"range": None}) | |
t_desc["cells"].append(row_cells) | |
work.append(t_desc) | |
# 2) Translate all chunks at once (de-dup + batching) | |
if all_chunks: | |
translated_all = translate_chunks_list(all_chunks) | |
else: | |
translated_all = [] | |
# 3) Rebuild new document with justified paragraphs | |
new_doc = docx.Document() | |
cursor = 0 # index into translated_all | |
# helper to consume a range and join back | |
def join_range(rng: Tuple[int, int]) -> str: | |
if rng is None: | |
return "" | |
s, e = rng | |
return " ".join(translated_all[s:e]).strip() | |
# rebuild paragraphs | |
for item in work: | |
if item["kind"] == "blank": | |
new_doc.add_paragraph("") | |
elif item["kind"] == "heading": | |
text = join_range(item["range"]) | |
new_doc.add_heading(text, level=item["level"]) | |
elif item["kind"] == "para": | |
text = join_range(item["range"]) | |
p = new_doc.add_paragraph(text) | |
p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY | |
elif item["kind"] == "table": | |
tbl = new_doc.add_table(rows=item["rows"], cols=item["cols"]) | |
for r_idx in range(item["rows"]): | |
for c_idx in range(item["cols"]): | |
cell_info = item["cells"][r_idx][c_idx] | |
txt = join_range(cell_info["range"]) | |
tgt_cell = tbl.cell(r_idx, c_idx) | |
tgt_cell.text = txt | |
for p in tgt_cell.paragraphs: | |
p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY | |
out = io.BytesIO() | |
new_doc.save(out) | |
return out.getvalue() | |
# ---------- PDF helpers (batched across the whole PDF) ---------- | |
def extract_pdf_text_blocks(pdf_bytes: bytes) -> List[List[str]]: | |
""" | |
Returns list of pages, each a list of block texts (visual order). | |
""" | |
pages_blocks: List[List[str]] = [] | |
doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
for page in doc: | |
blocks = page.get_text("blocks") | |
blocks.sort(key=lambda b: (round(b[1], 1), round(b[0], 1))) | |
page_texts = [] | |
for b in blocks: | |
text = b[4].strip() | |
if text: | |
page_texts.append(text) | |
pages_blocks.append(page_texts) | |
doc.close() | |
return pages_blocks | |
def build_pdf_from_blocks(translated_pages: List[List[str]]) -> bytes: | |
""" | |
Build a clean paginated PDF with justified paragraphs (not exact original layout). | |
""" | |
buf = io.BytesIO() | |
doc = SimpleDocTemplate( | |
buf, pagesize=A4, | |
rightMargin=2*cm, leftMargin=2*cm, | |
topMargin=2*cm, bottomMargin=2*cm | |
) | |
styles = getSampleStyleSheet() | |
body = styles["BodyText"] | |
body.alignment = TA_JUSTIFY | |
body.leading = 14 | |
story = [] | |
first = True | |
for blocks in translated_pages: | |
if not first: | |
story.append(Spacer(1, 0.1*cm)) # page break trigger | |
first = False | |
for blk in blocks: | |
story.append(RLParagraph(blk.replace("\n", "<br/>"), body)) | |
story.append(Spacer(1, 0.35*cm)) | |
doc.build(story) | |
return buf.getvalue() | |
def translate_pdf_bytes(file_bytes: bytes) -> bytes: | |
""" | |
Read PDF → collect ALL block chunks across pages → single batched translation → rebuild simple justified PDF. | |
""" | |
pages_blocks = extract_pdf_text_blocks(file_bytes) | |
# 1) collect chunks for the entire PDF | |
all_chunks: List[str] = [] | |
plan = [] # list of pages, each a list of ranges for blocks | |
for blocks in pages_blocks: | |
page_plan = [] | |
for blk in blocks: | |
chs = chunk_text_for_translation(blk) | |
if chs: | |
start = len(all_chunks) | |
all_chunks.extend(chs) | |
page_plan.append((start, start + len(chs))) | |
else: | |
page_plan.append(None) | |
plan.append(page_plan) | |
# 2) translate all chunks at once | |
translated_all = translate_chunks_list(all_chunks) if all_chunks else [] | |
# 3) reconstruct per block | |
translated_pages: List[List[str]] = [] | |
for page_plan in plan: | |
page_out = [] | |
for rng in page_plan: | |
if rng is None: | |
page_out.append("") | |
else: | |
s, e = rng | |
page_out.append(" ".join(translated_all[s:e]).strip()) | |
translated_pages.append(page_out) | |
return build_pdf_from_blocks(translated_pages) | |
# ---------- Gradio file handler (robust) ---------- | |
def translate_document(file_obj): | |
""" | |
Accepts gr.File input (NamedString, filepath str, or dict with binary). | |
Returns (output_file_path, status_message). | |
""" | |
if file_obj is None: | |
return None, "Veuillez sélectionner un fichier .docx ou .pdf" | |
try: | |
name = "document" | |
data = None | |
# Case A: plain filepath string | |
if isinstance(file_obj, str): | |
name = os.path.basename(file_obj) | |
with open(file_obj, "rb") as f: | |
data = f.read() | |
# Case B: Gradio NamedString with .name (orig name) and .value (temp path) | |
elif hasattr(file_obj, "name") and hasattr(file_obj, "value"): | |
name = os.path.basename(file_obj.name or "document") | |
with open(file_obj.value, "rb") as f: | |
data = f.read() | |
# Case C: dict (type="binary") | |
elif isinstance(file_obj, dict) and "name" in file_obj and "data" in file_obj: | |
name = os.path.basename(file_obj["name"] or "document") | |
d = file_obj["data"] | |
data = d.read() if hasattr(d, "read") else d | |
else: | |
return None, "Type d'entrée fichier non supporté (filepath/binaire)." | |
if data is None: | |
return None, "Impossible de lire le fichier sélectionné." | |
# Clear cache per document to keep memory predictable (optional) | |
# TRANSLATION_CACHE.clear() | |
if name.lower().endswith(".docx"): | |
out_bytes = translate_docx_bytes(data) | |
out_path = "translated_ngambay.docx" | |
with open(out_path, "wb") as f: | |
f.write(out_bytes) | |
return out_path, "✅ Traduction DOCX terminée (paragraphes justifiés)." | |
elif name.lower().endswith(".pdf"): | |
out_bytes = translate_pdf_bytes(data) | |
out_path = "translated_ngambay.pdf" | |
with open(out_path, "wb") as f: | |
f.write(out_bytes) | |
return out_path, "✅ Traduction PDF terminée (paragraphes justifiés)." | |
else: | |
return None, "Type de fichier non supporté. Choisissez .docx ou .pdf" | |
except Exception as e: | |
return None, f"❌ Erreur pendant la traduction: {e}" | |
# ================== UI ================== | |
theme = gr.themes.Soft( | |
primary_hue="indigo", | |
radius_size="lg", | |
font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"] | |
).set( | |
body_background_fill="#f7f7fb", | |
button_primary_text_color="#ffffff" | |
) | |
CUSTOM_CSS = """ | |
.gradio-container {max-width: 980px !important;} | |
.header-card { | |
background: linear-gradient(135deg, #4f46e5 0%, #7c3aed 100%); | |
color: white; padding: 22px; border-radius: 18px; | |
box-shadow: 0 10px 30px rgba(79,70,229,.25); | |
transition: transform .2s ease; | |
} | |
.header-card:hover { transform: translateY(-1px); } | |
.header-title { font-size: 26px; font-weight: 800; margin: 0 0 6px 0; letter-spacing: .2px; } | |
.header-sub { opacity: .98; font-size: 14px; } | |
.brand { display:flex; align-items:center; gap:10px; justify-content:space-between; flex-wrap:wrap; } | |
.badge { | |
display:inline-block; background: rgba(255,255,255,.18); | |
padding: 4px 10px; border-radius: 999px; font-size: 12px; | |
border: 1px solid rgba(255,255,255,.25); | |
} | |
.footer-note { | |
margin-top: 8px; color: #64748b; font-size: 12px; text-align: center; | |
} | |
.support-banner { | |
margin-top: 14px; | |
border-radius: 14px; | |
padding: 14px 16px; | |
background: linear-gradient(135deg, rgba(79,70,229,.08), rgba(124,58,237,.08)); | |
border: 1px solid rgba(99,102,241,.25); | |
box-shadow: 0 6px 18px rgba(79,70,229,.08); | |
} | |
.support-title { font-weight: 700; font-size: 16px; margin-bottom: 4px; } | |
.support-text { font-size: 13px; color: #334155; line-height: 1.5; } | |
.support-contacts { display: flex; gap: 10px; flex-wrap: wrap; margin-top: 8px; } | |
.support-chip { | |
display:inline-block; padding: 6px 10px; border-radius: 999px; | |
background: white; border: 1px dashed rgba(79,70,229,.45); | |
font-size: 12px; color: #3730a3; | |
} | |
""" | |
with gr.Blocks( | |
title="Français → Ngambay · Toadoum/ngambay-fr-v1", | |
theme=theme, | |
css=CUSTOM_CSS, | |
fill_height=True, | |
) as demo: | |
with gr.Group(elem_classes=["header-card"]): | |
gr.HTML( | |
""" | |
<div class="brand"> | |
<div> | |
<div class="header-title">Français → Ngambay (v1)</div> | |
<div class="header-sub">🚀 Version bêta · Merci de tester et partager vos retours pour améliorer la qualité de traduction.</div> | |
</div> | |
<span class="badge">Modèle : Toadoum/ngambay-fr-v1</span> | |
</div> | |
""" | |
) | |
with gr.Tabs(): | |
# -------- Tab 1: Texte -------- | |
with gr.Tab("Traduction de texte"): | |
with gr.Row(): | |
with gr.Column(scale=5): | |
src = gr.Textbox( | |
label="Texte source (Français)", | |
placeholder="Saisissez votre texte en français…", | |
lines=8, | |
autofocus=True | |
) | |
with gr.Row(): | |
btn = gr.Button("Traduire", variant="primary", scale=3) | |
clear_btn = gr.Button("Effacer", scale=1) | |
gr.Examples( | |
examples=[ | |
["Bonjour, comment allez-vous aujourd’hui ?"], | |
["La réunion de sensibilisation aura lieu demain au centre communautaire."], | |
["Merci pour votre participation et votre soutien."], | |
["Veuillez suivre les recommandations de santé pour protéger votre famille."] | |
], | |
inputs=[src], | |
label="Exemples (cliquez pour remplir)" | |
) | |
with gr.Column(scale=5): | |
tgt = gr.Textbox( | |
label="Traduction (Ngambay)", | |
lines=8, | |
interactive=False, | |
show_copy_button=True | |
) | |
gr.Markdown('<div class="footer-note">Astuce : collez un paragraphe complet pour un meilleur contexte.</div>') | |
# -------- Tab 2: Documents -------- | |
with gr.Tab("Traduction de document (.docx / .pdf)"): | |
with gr.Row(): | |
with gr.Column(scale=5): | |
doc_inp = gr.File( | |
label="Sélectionnez un document (.docx ou .pdf)", | |
file_types=[".docx", ".pdf"], | |
type="filepath" # ensures a temp filepath; handler also supports binary | |
) | |
run_doc = gr.Button("Traduire le document", variant="primary") | |
with gr.Column(scale=5): | |
doc_out = gr.File(label="Fichier traduit (télécharger)") | |
doc_status = gr.Markdown("") | |
run_doc.click(translate_document, inputs=doc_inp, outputs=[doc_out, doc_status]) | |
# Contribution banner | |
gr.HTML( | |
""" | |
<div class="support-banner"> | |
<div class="support-title">💙 Contribuer au projet (recrutement de linguistes)</div> | |
<div class="support-text"> | |
Nous cherchons à <b>recruter des linguistes</b> pour renforcer la construction de données Ngambay. | |
Si vous souhaitez soutenir financièrement ou en tant que bénévole, contactez-nous : | |
</div> | |
<div class="support-contacts"> | |
<span class="support-chip">📱 WhatsApp, Airtel Money : <b>+235 66 04 90 94</b></span> | |
<span class="support-chip">✉️ Email : <a href="mailto:tsakayo@aimsammi.org">tsakayo@aimsammi.org</a></span> | |
</div> | |
</div> | |
""" | |
) | |
# Text actions | |
btn.click(translate_text_simple, inputs=src, outputs=tgt) | |
clear_btn.click(lambda: ("", ""), outputs=[src, tgt]) | |
if __name__ == "__main__": | |
demo.queue(default_concurrency_limit=4).launch(share=True) | |