Spaces:
Runtime error
Runtime error
""" | |
callbacks.py | |
------------- | |
Ce module définit les callbacks de l'application Dash en utilisant les abstractions | |
définies dans le sous-package helpers.""" | |
from dash import Input, Output, State, no_update | |
import datetime | |
import os | |
from pathlib import Path | |
from loguru import logger | |
from dotenv import load_dotenv | |
load_dotenv() | |
from helpers.processor import Processor | |
from helpers.s3 import S3Client | |
from helpers.models import S3Config | |
from app import app | |
from global_vars import data, BUCKET_NAME | |
PERSIST_FILE = "results.json" | |
# Instanciation du Processor | |
processor = Processor() | |
# Instanciation du client S3 à partir de la configuration | |
s3_config = S3Config( | |
bucket_name=BUCKET_NAME, | |
endpoint_url=os.getenv("AWS_ENDPOINT_URL_S3"), | |
access_key=os.getenv("AWS_ACCESS_KEY_ID"), | |
secret_key=os.getenv("AWS_SECRET_ACCESS_KEY") | |
) | |
s3_client = S3Client(s3_config) | |
# ----------------------------------------------------------------------------- | |
# CALLBACKS D'AFFICHAGE DES SECTIONS | |
# ----------------------------------------------------------------------------- | |
def show_chapter_section(n_clicks, user_info): | |
"""Affiche la section chapitre après saisie d'une information utilisateur.""" | |
if n_clicks and user_info: | |
return {"display": "block"}, {"display": "none"} | |
return {"display": "none"}, {"display": "block"} | |
def show_page_section(n_clicks, chapter_value): | |
"""Affiche la section page après sélection d'un chapitre.""" | |
if n_clicks and chapter_value: | |
return {"display": "block"}, {"display": "none"} | |
return {"display": "none"}, {"display": "block"} | |
def show_transcription_section(n_clicks, page_value): | |
"""Affiche la section transcription après sélection d'une page.""" | |
if n_clicks and page_value: | |
return {"display": "block"}, {"display": "none"} | |
return {"display": "none"}, {"display": "block"} | |
# ----------------------------------------------------------------------------- | |
# CALLBACK DE MISE À JOUR DU DROPDOWN DES PAGES | |
# ----------------------------------------------------------------------------- | |
def update_pages(chapter_value): | |
"""Met à jour les options du dropdown de pages selon le chapitre sélectionné.""" | |
if chapter_value: | |
chapter_path = Path(chapter_value) | |
pages = [d for d in chapter_path.iterdir() if d.is_dir()] if chapter_path.exists() else [] | |
return [{"label": d.name, "value": str(d)} for d in pages] | |
return [] | |
# ----------------------------------------------------------------------------- | |
# CALLBACK DE MISE À JOUR AUDIO ET DES SUGGESTIONS (via dcc.Store) | |
# ----------------------------------------------------------------------------- | |
def update_audio_and_suggestions(page_value, chapter_value): | |
""" | |
Met à jour les stores pour les chemins audio et les suggestions. | |
Affiche le premier segment audio et les 6 premières suggestions. | |
""" | |
hidden_style = {"display": "none"} | |
if page_value and chapter_value: | |
# Utilise la méthode abstraite pour charger les transcriptions et extraire l'état | |
possible_values, audio_paths = processor.load_page_verses_and_audios(s3_client, page_value, data) | |
options = [{"label": t, "value": t} for t in possible_values[:6]] | |
if len(audio_paths)>0: #control end of page | |
audio_src = audio_paths[0] | |
return audio_paths, possible_values, audio_src, options, hidden_style | |
hidden_style = {"display": "block"} # Show the hidden message | |
try: | |
os.remove(PERSIST_FILE) | |
except: | |
pass | |
return no_update, no_update, no_update, no_update, hidden_style | |
return no_update, no_update, no_update, no_update, hidden_style | |
# CALLBACK POUR LE TRAITEMENT DE LA TRANSCIPTION | |
# ----------------------------------------------------------------------------- | |
def update_transcription(n_clicks, selected_transcriptions, user_info, page_value, | |
current_audio, audio_store, values_store, stored_transcriptions): | |
""" | |
Traite la soumission d'une transcription : | |
- Ajoute l'entrée avec timestamp dans le store de transcription. | |
- Retire le segment audio traité et les suggestions utilisées. | |
- Met à jour l'audio et les options de la checklist. | |
""" | |
if n_clicks > 0 and page_value and current_audio: | |
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
stored_transcriptions = stored_transcriptions if stored_transcriptions is not None else [] | |
stored_transcriptions.append({ | |
"segment_path": current_audio, | |
"transcriptions": selected_transcriptions, | |
"timestamp": timestamp, | |
"user_id": user_info | |
}) | |
# Mise à jour du store audio | |
if audio_store and isinstance(audio_store, list): | |
audio_store.pop(0) | |
next_audio = audio_store[0] if audio_store else "" | |
else: | |
next_audio = "" | |
# Mise à jour du store de suggestions | |
if values_store and isinstance(values_store, list): | |
for val in selected_transcriptions: | |
if val in values_store: | |
values_store.remove(val) | |
next_options = [{"label": t, "value": t} for t in (values_store[:6] + ["autre transcription"])] if values_store else ["autre transcription"] | |
confirmation_message = (f"Transcriptions sélectionnées : {', '.join(selected_transcriptions)}" | |
if selected_transcriptions else "Aucune transcription sélectionnée.") | |
# Réinitialisation de la checklist | |
print(next_options) | |
print("************") | |
if (len(next_options)>1): | |
return audio_store, values_store, next_audio, next_options, [], confirmation_message, stored_transcriptions | |
return no_update, no_update, no_update, no_update, no_update, "fin de la page, veuillez sauvegarder", stored_transcriptions | |
# ----------------------------------------------------------------------------- | |
# CALLBACK POUR LA SAUVEGARDE DES RÉSULTATS | |
# ----------------------------------------------------------------------------- | |
def save_results(n_clicks, page_value, stored_transcriptions): | |
""" | |
Sauvegarde les transcriptions en combinant les données persistantes existantes | |
avec les nouvelles et en les uploadant sur S3. | |
""" | |
if n_clicks > 0 and page_value: | |
try: | |
initial_transcriptions = processor.load_persistent_data(PERSIST_FILE) | |
except Exception as e: | |
logger.error(f"Erreur lors du chargement des données persistantes : {e}") | |
initial_transcriptions = [] | |
combined_transcriptions = initial_transcriptions + (stored_transcriptions if stored_transcriptions else []) | |
if len(combined_transcriptions)>0: | |
processor.save_persistent_data(combined_transcriptions, PERSIST_FILE) | |
cleaned_page = page_value.replace("\\", "/").replace("assets/", "") | |
s3_key = f"labelling/{cleaned_page}/{PERSIST_FILE}" | |
s3_client.upload_file(PERSIST_FILE, s3_key) | |
return "Les résultats ont été sauvegardés avec succès." | |
return no_update | |