# import os # import io # import re # from typing import List, Tuple, Dict # import torch # import gradio as gr # from transformers import AutoTokenizer, AutoModelForSeq2SeqLM # # --- NEW: docs --- # import docx # from docx.enum.text import WD_ALIGN_PARAGRAPH # from docx.text.paragraph import Paragraph # # PDF read & write # import fitz # PyMuPDF # from reportlab.lib.pagesizes import A4 # from reportlab.lib.styles import getSampleStyleSheet # from reportlab.lib.enums import TA_JUSTIFY # from reportlab.platypus import SimpleDocTemplate, Paragraph as RLParagraph, Spacer, PageBreak # from reportlab.lib.units import cm # # ================= CONFIG ================= # MODEL_REPO = "Toadoum/ngambay-fr-v1" # # Use the lang tokens that actually exist in your tokenizer. # # Switch FR_CODE to "fra_Latn" only if your tokenizer truly has it. # FR_CODE = "sba_Latn" # Français (source) # NG_CODE = "fr_Latn" # Ngambay (cible) # # Inference # MAX_NEW_TOKENS = 256 # TEMPERATURE = 0.0 # NUM_BEAMS = 1 # # Performance knobs # MAX_SRC_TOKENS = 420 # per chunk # BATCH_SIZE_DEFAULT = 12 # base batch size (autoscaled below) # # ================= Helpers ================= # def auto_batch_size(default=BATCH_SIZE_DEFAULT): # if not torch.cuda.is_available(): # return max(2, min(6, default)) # CPU # try: # free, total = torch.cuda.mem_get_info() # gb = free / (1024**3) # if gb < 2: return 2 # if gb < 4: return 6 # if gb < 8: return 10 # return default # except Exception: # return default # BATCH_SIZE = auto_batch_size() # # -------- Load model & tokenizer (meta-safe) -------- # USE_CUDA = torch.cuda.is_available() # tokenizer = AutoTokenizer.from_pretrained(MODEL_REPO, trust_remote_code=True) # model = AutoModelForSeq2SeqLM.from_pretrained( # MODEL_REPO, # device_map="auto" if USE_CUDA else None, # let Accelerate place weights if GPU # torch_dtype=torch.float16 if USE_CUDA else torch.float32, # low_cpu_mem_usage=False, # trust_remote_code=True, # ) # # --- Ensure pad/eos/bos exist and are INTS (not tensors) --- # def _to_int_or_list(x): # if isinstance(x, torch.Tensor): # return int(x.item()) if x.numel() == 1 else [int(v) for v in x.tolist()] # if isinstance(x, (list, tuple)): # return [int(v) for v in x] # return int(x) if x is not None else None # # Safeguard pad token # if tokenizer.pad_token is None and tokenizer.eos_token is not None: # tokenizer.pad_token = tokenizer.eos_token # elif tokenizer.pad_token is None: # tokenizer.add_special_tokens({"pad_token": ""}) # model.resize_token_embeddings(len(tokenizer)) # # Normalize generation config + mirror on model.config # gc = model.generation_config # for attr in ["pad_token_id", "eos_token_id", "bos_token_id", "decoder_start_token_id"]: # tok_val = getattr(tokenizer, attr, None) # cfg_val = getattr(gc, attr, None) # val = tok_val if tok_val is not None else cfg_val # if val is not None: # setattr(gc, attr, _to_int_or_list(val)) # # mirror on model.config # val2 = getattr(model.generation_config, attr, None) # if val2 is not None: # setattr(model.config, attr, _to_int_or_list(val2)) # # ================= Low-level NLLB-style generation ================= # def _forced_bos_id(lang_code: str): # # Try common mappings first # if hasattr(tokenizer, "lang_code_to_id") and isinstance(tokenizer.lang_code_to_id, dict): # if lang_code in tokenizer.lang_code_to_id: # return int(tokenizer.lang_code_to_id[lang_code]) # # Fallback: treat lang code as a token # try: # tok_id = tokenizer.convert_tokens_to_ids(lang_code) # if isinstance(tok_id, int) and tok_id != tokenizer.unk_token_id: # return tok_id # except Exception: # pass # # Final fallback: keep whatever the model already has # return model.generation_config.forced_bos_token_id # def _encode(texts: List[str], src_lang: str): # # NLLB/M2M-style: set source lang on tokenizer if supported # if hasattr(tokenizer, "src_lang"): # tokenizer.src_lang = src_lang # return tokenizer( # texts, # return_tensors="pt", # padding=True, # truncation=True, # add_special_tokens=True, # ) # def _generate_batch(texts: List[str], src_lang: str, tgt_lang: str) -> List[str]: # if not texts: # return [] # inputs = _encode(texts, src_lang) # # NOTE: Do NOT move inputs; with device_map="auto" the hooks handle it. # # Keep tensors on CPU; accelerate offloads as needed. # forced_bos = _forced_bos_id(tgt_lang) # gen_kwargs = dict( # max_new_tokens=MAX_NEW_TOKENS, # do_sample=False, # num_beams=NUM_BEAMS, # eos_token_id=model.generation_config.eos_token_id, # pad_token_id=model.generation_config.pad_token_id, # forced_bos_token_id=forced_bos, # ) # with torch.no_grad(): # output_ids = model.generate(**inputs, **gen_kwargs) # return tokenizer.batch_decode(output_ids, skip_special_tokens=True) # # ================= Simple text translation ================= # def translate_text_simple(text: str) -> str: # if not text or not text.strip(): # return "" # return _generate_batch([text], FR_CODE, NG_CODE)[0] # # ================= Chunking + Batched Translation + Cache ================= # def tokenize_len(s: str) -> int: # return tokenizer(s, add_special_tokens=False, return_length=True)["length"][0] # def chunk_text_for_translation(text: str, max_src_tokens: int = MAX_SRC_TOKENS) -> List[str]: # """Split text by sentence-ish boundaries and merge under token limit.""" # if not text.strip(): # return [] # parts = re.split(r'(\s*[\.\!\?…:;]\s+)', text) # sentences = [] # for i in range(0, len(parts), 2): # s = parts[i] # p = parts[i+1] if i+1 < len(parts) else "" # unit = (s + (p or "")).strip() # if unit: # sentences.append(unit) # chunks, current = [], "" # for sent in sentences: # candidate = (current + " " + sent).strip() if current else sent # if current and tokenize_len(candidate) > max_src_tokens: # chunks.append(current.strip()) # current = sent # else: # current = candidate # if current.strip(): # chunks.append(current.strip()) # return chunks # # Small bounded cache (LRU-like using dict + cap) # TRANSLATION_CACHE: Dict[str, str] = {} # CACHE_CAP = 20000 # def _cache_set(k: str, v: str): # if len(TRANSLATION_CACHE) >= CACHE_CAP: # # drop ~5% oldest items # for i, key in enumerate(list(TRANSLATION_CACHE.keys())): # del TRANSLATION_CACHE[key] # if i > CACHE_CAP // 20: # break # TRANSLATION_CACHE[k] = v # def translate_chunks_list(chunks: List[str], batch_size: int = BATCH_SIZE) -> List[str]: # """ # Translate a list of chunks with de-dup + batching. # Returns translations in the same order as input. # """ # norm_chunks = [c.strip() for c in chunks] # unique_to_translate = [] # seen = set() # for c in norm_chunks: # if c and c not in TRANSLATION_CACHE and c not in seen: # seen.add(c) # unique_to_translate.append(c) # for i in range(0, len(unique_to_translate), batch_size): # batch = unique_to_translate[i:i + batch_size] # outs = _generate_batch(batch, FR_CODE, NG_CODE) # for src, o in zip(batch, outs): # _cache_set(src, o) # return [TRANSLATION_CACHE.get(c, "") for c in norm_chunks] # def translate_long_text(text: str) -> str: # """Chunk → batch translate → rejoin for one paragraph/block.""" # chs = chunk_text_for_translation(text) # if not chs: # return "" # trs = translate_chunks_list(chs) # return " ".join(trs).strip() # # ================= DOCX helpers ================= # def is_heading(par: Paragraph) -> Tuple[bool, int]: # # Works with English and French Word styles # name = (par.style.name or "").lower() # if any(c in name for c in ["heading", "title", "titre"]): # for lvl in range(1, 10): # if str(lvl) in name: # return True, lvl # return True, 1 # return False, 0 # def translate_docx_bytes(file_bytes: bytes) -> bytes: # """ # Read .docx → collect ALL chunks (paras + table cells) → single batched translation → rebuild .docx. # Paragraphs and table cell paragraphs are justified; headings kept as headings. # """ # f = io.BytesIO(file_bytes) # src_doc = docx.Document(f) # # 1) Collect work units # work = [] # list of dict entries describing items with ranges into all_chunks # all_chunks: List[str] = [] # # paragraphs # for par in src_doc.paragraphs: # txt = par.text # if not txt.strip(): # work.append({"kind": "blank"}) # continue # is_head, lvl = is_heading(par) # if is_head: # work.append({"kind": "heading", "level": min(max(lvl, 1), 9), "range": (len(all_chunks), len(all_chunks)+1)}) # all_chunks.append(txt.strip()) # else: # chs = chunk_text_for_translation(txt) # if chs: # start = len(all_chunks) # all_chunks.extend(chs) # work.append({"kind": "para", "range": (start, start+len(chs))}) # else: # work.append({"kind": "blank"}) # # tables # for table in src_doc.tables: # t_desc = {"kind": "table", "rows": len(table.rows), "cols": len(table.columns), "cells": []} # for row in table.rows: # row_cells = [] # for cell in row.cells: # cell_text = "\n".join([p.text for p in cell.paragraphs]).strip() # if cell_text: # chs = chunk_text_for_translation(cell_text) # if chs: # start = len(all_chunks) # all_chunks.extend(chs) # row_cells.append({"range": (start, start+len(chs))}) # else: # row_cells.append({"range": None}) # else: # row_cells.append({"range": None}) # t_desc["cells"].append(row_cells) # work.append(t_desc) # # 2) Translate all chunks at once (de-dup + batching) # translated_all = translate_chunks_list(all_chunks) if all_chunks else [] # # 3) Rebuild new document with justified paragraphs # new_doc = docx.Document() # def join_range(rng: Tuple[int, int]) -> str: # if rng is None: # return "" # s, e = rng # return " ".join(translated_all[s:e]).strip() # for item in work: # if item["kind"] == "blank": # new_doc.add_paragraph("") # elif item["kind"] == "heading": # text = join_range(item["range"]) # new_doc.add_heading(text, level=item["level"]) # elif item["kind"] == "para": # text = join_range(item["range"]) # p = new_doc.add_paragraph(text) # p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY # elif item["kind"] == "table": # tbl = new_doc.add_table(rows=item["rows"], cols=item["cols"]) # for r_idx in range(item["rows"]): # for c_idx in range(item["cols"]): # cell_info = item["cells"][r_idx][c_idx] # txt = join_range(cell_info["range"]) # tgt_cell = tbl.cell(r_idx, c_idx) # tgt_cell.text = txt # for p in tgt_cell.paragraphs: # p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY # out = io.BytesIO() # new_doc.save(out) # return out.getvalue() # # ================= PDF helpers ================= # def extract_pdf_text_blocks(pdf_bytes: bytes) -> List[List[str]]: # """ # Returns list of pages, each a list of block texts (visual order). # """ # pages_blocks: List[List[str]] = [] # doc = fitz.open(stream=pdf_bytes, filetype="pdf") # for page in doc: # blocks = page.get_text("blocks") # blocks.sort(key=lambda b: (round(b[1], 1), round(b[0], 1))) # page_texts = [] # for b in blocks: # text = b[4].strip() # if text: # page_texts.append(text) # pages_blocks.append(page_texts) # doc.close() # return pages_blocks # def build_pdf_from_blocks(translated_pages: List[List[str]]) -> bytes: # """ # Build a clean paginated PDF with justified paragraphs. # Keeps one translated page per original page via PageBreak. # """ # buf = io.BytesIO() # doc = SimpleDocTemplate( # buf, pagesize=A4, # rightMargin=2*cm, leftMargin=2*cm, # topMargin=2*cm, bottomMargin=2*cm # ) # styles = getSampleStyleSheet() # body = styles["BodyText"] # body.alignment = TA_JUSTIFY # body.leading = 14 # story = [] # for p_idx, blocks in enumerate(translated_pages): # if p_idx > 0: # story.append(PageBreak()) # for blk in blocks: # story.append(RLParagraph(blk.replace("\n", "
"), body)) # story.append(Spacer(1, 0.35*cm)) # doc.build(story) # return buf.getvalue() # def translate_pdf_bytes(file_bytes: bytes) -> bytes: # """ # Read PDF → collect ALL block chunks across pages → single batched translation → rebuild PDF. # """ # pages_blocks = extract_pdf_text_blocks(file_bytes) # # 1) collect chunks for the entire PDF # all_chunks: List[str] = [] # plan = [] # list of pages, each a list of ranges for blocks # for blocks in pages_blocks: # page_plan = [] # for blk in blocks: # chs = chunk_text_for_translation(blk) # if chs: # start = len(all_chunks) # all_chunks.extend(chs) # page_plan.append((start, start + len(chs))) # else: # page_plan.append(None) # plan.append(page_plan) # # 2) translate all chunks at once # translated_all = translate_chunks_list(all_chunks) if all_chunks else [] # # 3) reconstruct per block # translated_pages: List[List[str]] = [] # for page_plan in plan: # page_out = [] # for rng in page_plan: # if rng is None: # page_out.append("") # else: # s, e = rng # page_out.append(" ".join(translated_all[s:e]).strip()) # translated_pages.append(page_out) # return build_pdf_from_blocks(translated_pages) # # ================= Gradio file handler ================= # def translate_document(file_obj): # """ # Accepts gr.File input (NamedString, filepath str, or dict with binary). # Returns (output_file_path, status_message). # """ # if file_obj is None: # return None, "Veuillez sélectionner un fichier .docx ou .pdf" # try: # name = "document" # data = None # # Case A: plain filepath string # if isinstance(file_obj, str): # name = os.path.basename(file_obj) # with open(file_obj, "rb") as f: # data = f.read() # # Case B: Gradio NamedString with .name (orig name) and .value (temp path) # elif hasattr(file_obj, "name") and hasattr(file_obj, "value"): # name = os.path.basename(file_obj.name or "document") # with open(file_obj.value, "rb") as f: # data = f.read() # # Case C: dict (type="binary") # elif isinstance(file_obj, dict) and "name" in file_obj and "data" in file_obj: # name = os.path.basename(file_obj["name"] or "document") # d = file_obj["data"] # data = d.read() if hasattr(d, "read") else d # else: # return None, "Type d'entrée fichier non supporté (filepath/binaire)." # if data is None: # return None, "Impossible de lire le fichier sélectionné." # if name.lower().endswith(".docx"): # out_bytes = translate_docx_bytes(data) # out_path = "translated_ngambay.docx" # with open(out_path, "wb") as f: # f.write(out_bytes) # return out_path, "✅ Traduction DOCX terminée (paragraphes justifiés)." # elif name.lower().endswith(".pdf"): # out_bytes = translate_pdf_bytes(data) # out_path = "translated_ngambay.pdf" # with open(out_path, "wb") as f: # f.write(out_bytes) # return out_path, "✅ Traduction PDF terminée (paragraphes justifiés)." # else: # return None, "Type de fichier non supporté. Choisissez .docx ou .pdf" # except Exception as e: # return None, f"❌ Erreur pendant la traduction: {e}" # # ================== UI ================== # theme = gr.themes.Soft( # primary_hue="indigo", # radius_size="lg", # font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"] # ).set( # body_background_fill="#f7f7fb", # button_primary_text_color="#ffffff" # ) # CUSTOM_CSS = """ # .gradio-container {max-width: 980px !important;} # .header-card { # background: linear-gradient(135deg, #4f46e5 0%, #7c3aed 100%); # color: white; padding: 22px; border-radius: 18px; # box-shadow: 0 10px 30px rgba(79,70,229,.25); # transition: transform .2s ease; # } # .header-card:hover { transform: translateY(-1px); } # .header-title { font-size: 26px; font-weight: 800; margin: 0 0 6px 0; letter-spacing: .2px; } # .header-sub { opacity: .98; font-size: 14px; } # .brand { display:flex; align-items:center; gap:10px; justify-content:space-between; flex-wrap:wrap; } # .badge { # display:inline-block; background: rgba(255,255,255,.18); # padding: 4px 10px; border-radius: 999px; font-size: 12px; # border: 1px solid rgba(255,255,255,.25); # } # .footer-note { # margin-top: 8px; color: #64748b; font-size: 12px; text-align: center; # } # .support-banner { # margin-top: 14px; # border-radius: 14px; # padding: 14px 16px; # background: linear-gradient(135deg, rgba(79,70,229,.08), rgba(124,58,237,.08)); # border: 1px solid rgba(99,102,241,.25); # box-shadow: 0 6px 18px rgba(79,70,229,.08); # } # .support-title { font-weight: 700; font-size: 16px; margin-bottom: 4px; } # .support-text { font-size: 13px; color: #334155; line-height: 1.5; } # .support-contacts { display: flex; gap: 10px; flex-wrap: wrap; margin-top: 8px; } # .support-chip { # display:inline-block; padding: 6px 10px; border-radius: 999px; # background: white; border: 1px dashed rgba(79,70,229,.45); # font-size: 12px; color: #3730a3; # } # """ # with gr.Blocks( # title="Français → Ngambay · Toadoum/ngambay-fr-v1", # theme=theme, # css=CUSTOM_CSS, # fill_height=True, # ) as demo: # with gr.Group(elem_classes=["header-card"]): # gr.HTML( # """ #
#
#
Français → Ngambay (v1)
#
🚀 Version bêta · Merci de tester et partager vos retours pour améliorer la qualité de traduction.
#
# Modèle : Toadoum/ngambay-fr-v1 #
# """ # ) # with gr.Tabs(): # # -------- Tab 1: Texte -------- # with gr.Tab("Traduction de texte"): # with gr.Row(): # with gr.Column(scale=5): # src = gr.Textbox( # label="Texte source (Français)", # placeholder="Saisissez votre texte en français…", # lines=8, # autofocus=True # ) # with gr.Row(): # btn = gr.Button("Traduire", variant="primary", scale=3) # clear_btn = gr.Button("Effacer", scale=1) # gr.Examples( # examples=[ # ["Bonjour, comment allez-vous aujourd’hui ?"], # ["La réunion de sensibilisation aura lieu demain au centre communautaire."], # ["Merci pour votre participation et votre soutien."], # ["Veuillez suivre les recommandations de santé pour protéger votre famille."] # ], # inputs=[src], # label="Exemples (cliquez pour remplir)" # ) # with gr.Column(scale=5): # tgt = gr.Textbox( # label="Traduction (Ngambay)", # lines=8, # interactive=False, # show_copy_button=True # ) # gr.Markdown('') # # -------- Tab 2: Documents -------- # with gr.Tab("Traduction de document (.docx / .pdf)"): # with gr.Row(): # with gr.Column(scale=5): # doc_inp = gr.File( # label="Sélectionnez un document (.docx ou .pdf)", # file_types=[".docx", ".pdf"], # type="filepath" # ensures a temp filepath; handler also supports binary # ) # run_doc = gr.Button("Traduire le document", variant="primary") # with gr.Column(scale=5): # doc_out = gr.File(label="Fichier traduit (télécharger)") # doc_status = gr.Markdown(visible=False) # def _wrap_translate_document(f): # path, msg = translate_document(f) # return path, gr.update(value=msg, visible=True) # run_doc.click(_wrap_translate_document, inputs=doc_inp, outputs=[doc_out, doc_status]) # # Contribution banner # gr.HTML( # """ #
#
💙 Contribuer au projet (recrutement de linguistes)
#
# Nous cherchons à recruter des linguistes pour renforcer la construction de données Ngambay. # Si vous souhaitez soutenir financièrement ou en tant que bénévole, contactez-nous : #
#
# 📱 WhatsApp, Airtel Money : +235 66 04 90 94 # ✉️ Email : tsakayo@aimsammi.org #
#
# """ # ) # # Text actions # btn.click(translate_text_simple, inputs=src, outputs=tgt) # clear_btn.click(lambda: ("", ""), outputs=[src, tgt]) # if __name__ == "__main__": # # No .to(...) anywhere; model stays where Accelerate placed it (or CPU). # demo.queue(default_concurrency_limit=4).launch(share=True) import os import io import re from typing import List, Tuple, Dict import torch import gradio as gr from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline # --- NEW: docs --- import docx from docx.enum.text import WD_ALIGN_PARAGRAPH from docx.text.paragraph import Paragraph # PDF read & write import fitz # PyMuPDF from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import getSampleStyleSheet from reportlab.lib.enums import TA_JUSTIFY from reportlab.platypus import SimpleDocTemplate, Paragraph as RLParagraph, Spacer from reportlab.lib.units import cm # ================= CONFIG ================= MODEL_REPO = "Toadoum/ngambay-fr-v1" FR_CODE = "fra_Latn" # Français (source) NG_CODE = "sba_Latn" # Ngambay (cible) # Inference MAX_NEW_TOKENS = 256 TEMPERATURE = 0.0 NUM_BEAMS = 1 # Performance knobs MAX_SRC_TOKENS = 420 # per chunk; reduce to ~320 if you want even faster BATCH_SIZE = 12 # number of chunks per model call (tune for your hardware) # Device selection device = 0 if torch.cuda.is_available() else -1 # set -1 on Spaces CPU if needed # Load model & tokenizer once tokenizer = AutoTokenizer.from_pretrained(MODEL_REPO) model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_REPO) translator = pipeline( task="translation", model=model, tokenizer=tokenizer, device=device, ) # Simple text box translation (kept) def translate_text_simple(text: str) -> str: if not text or not text.strip(): return "" with torch.no_grad(): out = translator( text, src_lang=FR_CODE, tgt_lang=NG_CODE, max_new_tokens=MAX_NEW_TOKENS, do_sample=False, num_beams=NUM_BEAMS, ) return out[0]["translation_text"] # ---------- Chunking + Batched Translation + Cache ---------- def tokenize_len(s: str) -> int: return len(tokenizer.encode(s, add_special_tokens=False)) def chunk_text_for_translation(text: str, max_src_tokens: int = MAX_SRC_TOKENS) -> List[str]: """Split text by sentence-ish boundaries and merge under token limit.""" if not text.strip(): return [] parts = re.split(r'(\s*[\.\!\?…:;]\s+)', text) sentences = [] for i in range(0, len(parts), 2): s = parts[i] p = parts[i+1] if i+1 < len(parts) else "" unit = (s + (p or "")).strip() if unit: sentences.append(unit) chunks, current = [], "" for sent in sentences: candidate = (current + " " + sent).strip() if current else sent if current and tokenize_len(candidate) > max_src_tokens: chunks.append(current.strip()) current = sent else: current = candidate if current.strip(): chunks.append(current.strip()) return chunks # module-level cache: identical chunks translated once TRANSLATION_CACHE: Dict[str, str] = {} def translate_chunks_list(chunks: List[str], batch_size: int = BATCH_SIZE) -> List[str]: """ Translate a list of chunks with de-dup + batching. Returns translations in the same order as input. """ # Normalize & collect unique chunks to translate norm_chunks = [c.strip() for c in chunks] to_translate = [] for c in norm_chunks: if c and c not in TRANSLATION_CACHE: to_translate.append(c) # Batched calls with torch.no_grad(): for i in range(0, len(to_translate), batch_size): batch = to_translate[i:i + batch_size] outs = translator( batch, src_lang=FR_CODE, tgt_lang=NG_CODE, max_new_tokens=MAX_NEW_TOKENS, do_sample=False, num_beams=NUM_BEAMS, ) for src, o in zip(batch, outs): TRANSLATION_CACHE[src] = o["translation_text"] return [TRANSLATION_CACHE.get(c, "") for c in norm_chunks] def translate_long_text(text: str) -> str: """Chunk → batch translate → rejoin for one paragraph/block.""" chs = chunk_text_for_translation(text) if not chs: return "" trs = translate_chunks_list(chs) # join with space to reconstruct paragraph smoothly return " ".join(trs).strip() # ---------- DOCX helpers (now fully batched across the whole doc) ---------- def is_heading(par: Paragraph) -> Tuple[bool, int]: style = (par.style.name or "").lower() if "heading" in style: for lvl in range(1, 10): if str(lvl) in style: return True, lvl return True, 1 return False, 0 def translate_docx_bytes(file_bytes: bytes) -> bytes: """ Read .docx → collect ALL chunks (paras + table cells) → single batched translation → rebuild .docx. Paragraphs and table cell paragraphs are justified; headings kept as headings. """ f = io.BytesIO(file_bytes) src_doc = docx.Document(f) # 1) Collect work units work = [] # list of dict entries describing items with ranges into all_chunks all_chunks: List[str] = [] # paragraphs for par in src_doc.paragraphs: txt = par.text if not txt.strip(): work.append({"kind": "blank"}) continue is_head, lvl = is_heading(par) if is_head: # treat as single chunk (usually short) work.append({"kind": "heading", "level": min(max(lvl, 1), 9), "range": (len(all_chunks), len(all_chunks)+1)}) all_chunks.append(txt.strip()) else: chs = chunk_text_for_translation(txt) if chs: start = len(all_chunks) all_chunks.extend(chs) work.append({"kind": "para", "range": (start, start+len(chs))}) else: work.append({"kind": "blank"}) # tables for t_idx, table in enumerate(src_doc.tables): t_desc = {"kind": "table", "rows": len(table.rows), "cols": len(table.columns), "cells": []} for r_idx, row in enumerate(table.rows): row_cells = [] for c_idx, cell in enumerate(row.cells): cell_text = "\n".join([p.text for p in cell.paragraphs]).strip() if cell_text: chs = chunk_text_for_translation(cell_text) if chs: start = len(all_chunks) all_chunks.extend(chs) row_cells.append({"range": (start, start+len(chs))}) else: row_cells.append({"range": None}) else: row_cells.append({"range": None}) t_desc["cells"].append(row_cells) work.append(t_desc) # 2) Translate all chunks at once (de-dup + batching) if all_chunks: translated_all = translate_chunks_list(all_chunks) else: translated_all = [] # 3) Rebuild new document with justified paragraphs new_doc = docx.Document() cursor = 0 # index into translated_all # helper to consume a range and join back def join_range(rng: Tuple[int, int]) -> str: if rng is None: return "" s, e = rng return " ".join(translated_all[s:e]).strip() # rebuild paragraphs for item in work: if item["kind"] == "blank": new_doc.add_paragraph("") elif item["kind"] == "heading": text = join_range(item["range"]) new_doc.add_heading(text, level=item["level"]) elif item["kind"] == "para": text = join_range(item["range"]) p = new_doc.add_paragraph(text) p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY elif item["kind"] == "table": tbl = new_doc.add_table(rows=item["rows"], cols=item["cols"]) for r_idx in range(item["rows"]): for c_idx in range(item["cols"]): cell_info = item["cells"][r_idx][c_idx] txt = join_range(cell_info["range"]) tgt_cell = tbl.cell(r_idx, c_idx) tgt_cell.text = txt for p in tgt_cell.paragraphs: p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY out = io.BytesIO() new_doc.save(out) return out.getvalue() # ---------- PDF helpers (batched across the whole PDF) ---------- def extract_pdf_text_blocks(pdf_bytes: bytes) -> List[List[str]]: """ Returns list of pages, each a list of block texts (visual order). """ pages_blocks: List[List[str]] = [] doc = fitz.open(stream=pdf_bytes, filetype="pdf") for page in doc: blocks = page.get_text("blocks") blocks.sort(key=lambda b: (round(b[1], 1), round(b[0], 1))) page_texts = [] for b in blocks: text = b[4].strip() if text: page_texts.append(text) pages_blocks.append(page_texts) doc.close() return pages_blocks def build_pdf_from_blocks(translated_pages: List[List[str]]) -> bytes: """ Build a clean paginated PDF with justified paragraphs (not exact original layout). """ buf = io.BytesIO() doc = SimpleDocTemplate( buf, pagesize=A4, rightMargin=2*cm, leftMargin=2*cm, topMargin=2*cm, bottomMargin=2*cm ) styles = getSampleStyleSheet() body = styles["BodyText"] body.alignment = TA_JUSTIFY body.leading = 14 story = [] first = True for blocks in translated_pages: if not first: story.append(Spacer(1, 0.1*cm)) # page break trigger first = False for blk in blocks: story.append(RLParagraph(blk.replace("\n", "
"), body)) story.append(Spacer(1, 0.35*cm)) doc.build(story) return buf.getvalue() def translate_pdf_bytes(file_bytes: bytes) -> bytes: """ Read PDF → collect ALL block chunks across pages → single batched translation → rebuild simple justified PDF. """ pages_blocks = extract_pdf_text_blocks(file_bytes) # 1) collect chunks for the entire PDF all_chunks: List[str] = [] plan = [] # list of pages, each a list of ranges for blocks for blocks in pages_blocks: page_plan = [] for blk in blocks: chs = chunk_text_for_translation(blk) if chs: start = len(all_chunks) all_chunks.extend(chs) page_plan.append((start, start + len(chs))) else: page_plan.append(None) plan.append(page_plan) # 2) translate all chunks at once translated_all = translate_chunks_list(all_chunks) if all_chunks else [] # 3) reconstruct per block translated_pages: List[List[str]] = [] for page_plan in plan: page_out = [] for rng in page_plan: if rng is None: page_out.append("") else: s, e = rng page_out.append(" ".join(translated_all[s:e]).strip()) translated_pages.append(page_out) return build_pdf_from_blocks(translated_pages) # ---------- Gradio file handler (robust) ---------- def translate_document(file_obj): """ Accepts gr.File input (NamedString, filepath str, or dict with binary). Returns (output_file_path, status_message). """ if file_obj is None: return None, "Veuillez sélectionner un fichier .docx ou .pdf" try: name = "document" data = None # Case A: plain filepath string if isinstance(file_obj, str): name = os.path.basename(file_obj) with open(file_obj, "rb") as f: data = f.read() # Case B: Gradio NamedString with .name (orig name) and .value (temp path) elif hasattr(file_obj, "name") and hasattr(file_obj, "value"): name = os.path.basename(file_obj.name or "document") with open(file_obj.value, "rb") as f: data = f.read() # Case C: dict (type="binary") elif isinstance(file_obj, dict) and "name" in file_obj and "data" in file_obj: name = os.path.basename(file_obj["name"] or "document") d = file_obj["data"] data = d.read() if hasattr(d, "read") else d else: return None, "Type d'entrée fichier non supporté (filepath/binaire)." if data is None: return None, "Impossible de lire le fichier sélectionné." # Clear cache per document to keep memory predictable (optional) # TRANSLATION_CACHE.clear() if name.lower().endswith(".docx"): out_bytes = translate_docx_bytes(data) out_path = "translated_ngambay.docx" with open(out_path, "wb") as f: f.write(out_bytes) return out_path, "✅ Traduction DOCX terminée (paragraphes justifiés)." elif name.lower().endswith(".pdf"): out_bytes = translate_pdf_bytes(data) out_path = "translated_ngambay.pdf" with open(out_path, "wb") as f: f.write(out_bytes) return out_path, "✅ Traduction PDF terminée (paragraphes justifiés)." else: return None, "Type de fichier non supporté. Choisissez .docx ou .pdf" except Exception as e: return None, f"❌ Erreur pendant la traduction: {e}" # ================== UI ================== theme = gr.themes.Soft( primary_hue="indigo", radius_size="lg", font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"] ).set( body_background_fill="#f7f7fb", button_primary_text_color="#ffffff" ) CUSTOM_CSS = """ .gradio-container {max-width: 980px !important;} .header-card { background: linear-gradient(135deg, #4f46e5 0%, #7c3aed 100%); color: white; padding: 22px; border-radius: 18px; box-shadow: 0 10px 30px rgba(79,70,229,.25); transition: transform .2s ease; } .header-card:hover { transform: translateY(-1px); } .header-title { font-size: 26px; font-weight: 800; margin: 0 0 6px 0; letter-spacing: .2px; } .header-sub { opacity: .98; font-size: 14px; } .brand { display:flex; align-items:center; gap:10px; justify-content:space-between; flex-wrap:wrap; } .badge { display:inline-block; background: rgba(255,255,255,.18); padding: 4px 10px; border-radius: 999px; font-size: 12px; border: 1px solid rgba(255,255,255,.25); } .footer-note { margin-top: 8px; color: #64748b; font-size: 12px; text-align: center; } .support-banner { margin-top: 14px; border-radius: 14px; padding: 14px 16px; background: linear-gradient(135deg, rgba(79,70,229,.08), rgba(124,58,237,.08)); border: 1px solid rgba(99,102,241,.25); box-shadow: 0 6px 18px rgba(79,70,229,.08); } .support-title { font-weight: 700; font-size: 16px; margin-bottom: 4px; } .support-text { font-size: 13px; color: #334155; line-height: 1.5; } .support-contacts { display: flex; gap: 10px; flex-wrap: wrap; margin-top: 8px; } .support-chip { display:inline-block; padding: 6px 10px; border-radius: 999px; background: white; border: 1px dashed rgba(79,70,229,.45); font-size: 12px; color: #3730a3; } """ with gr.Blocks( title="Français → Ngambay · Toadoum/ngambay-fr-v1", theme=theme, css=CUSTOM_CSS, fill_height=True, ) as demo: with gr.Group(elem_classes=["header-card"]): gr.HTML( """
Français → Ngambay (v1)
🚀 Version bêta · Merci de tester et partager vos retours pour améliorer la qualité de traduction.
Modèle : Toadoum/ngambay-fr-v1
""" ) with gr.Tabs(): # -------- Tab 1: Texte -------- with gr.Tab("Traduction de texte"): with gr.Row(): with gr.Column(scale=5): src = gr.Textbox( label="Texte source (Français)", placeholder="Saisissez votre texte en français…", lines=8, autofocus=True ) with gr.Row(): btn = gr.Button("Traduire", variant="primary", scale=3) clear_btn = gr.Button("Effacer", scale=1) gr.Examples( examples=[ ["Bonjour, comment allez-vous aujourd’hui ?"], ["La réunion de sensibilisation aura lieu demain au centre communautaire."], ["Merci pour votre participation et votre soutien."], ["Veuillez suivre les recommandations de santé pour protéger votre famille."] ], inputs=[src], label="Exemples (cliquez pour remplir)" ) with gr.Column(scale=5): tgt = gr.Textbox( label="Traduction (Ngambay)", lines=8, interactive=False, show_copy_button=True ) gr.Markdown('') # -------- Tab 2: Documents -------- with gr.Tab("Traduction de document (.docx / .pdf)"): with gr.Row(): with gr.Column(scale=5): doc_inp = gr.File( label="Sélectionnez un document (.docx ou .pdf)", file_types=[".docx", ".pdf"], type="filepath" # ensures a temp filepath; handler also supports binary ) run_doc = gr.Button("Traduire le document", variant="primary") with gr.Column(scale=5): doc_out = gr.File(label="Fichier traduit (télécharger)") doc_status = gr.Markdown("") run_doc.click(translate_document, inputs=doc_inp, outputs=[doc_out, doc_status]) # Contribution banner gr.HTML( """
💙 Contribuer au projet (recrutement de linguistes)
Nous cherchons à recruter des linguistes pour renforcer la construction de données Ngambay. Si vous souhaitez soutenir financièrement ou en tant que bénévole, contactez-nous :
📱 WhatsApp, Airtel Money : +235 66 04 90 94 ✉️ Email : tsakayo@aimsammi.org
""" ) # Text actions btn.click(translate_text_simple, inputs=src, outputs=tgt) clear_btn.click(lambda: ("", ""), outputs=[src, tgt]) if __name__ == "__main__": demo.queue(default_concurrency_limit=4).launch(share=True)