# ==========================================
# game_engine.py - Avec métriques OCR et dataset optimisé + modèles commutables
# ==========================================
"""
Moteur de jeu avec tracking complet des performances OCR et support modèles commutables
"""
import random
import time
import datetime
import gradio as gr
import os
import uuid
import gc
import numpy as np
from PIL import Image
# Import GPU uniquement
from image_processing_gpu import (
recognize_number_fast_with_image,
create_thumbnail_fast,
create_white_canvas,
cleanup_memory,
get_ocr_model_info,
get_available_models,
set_ocr_model,
get_current_model_info
)
print("✅ Game Engine: Mode GPU avec métriques OCR et modèles commutables")
# Imports dataset
try:
from datasets import Dataset, Image as DatasetImage, load_dataset
DATASET_AVAILABLE = True
print("✅ Modules dataset disponibles")
except ImportError as e:
DATASET_AVAILABLE = False
print(f"⚠️ Modules dataset non disponibles: {e}")
# Dataset name avec nouvelle structure cohérente
DATASET_NAME = "hoololi/CalcTrainer_dataset"
# Configuration des difficultés par opération
DIFFICULTY_RANGES = {
"×": {"Facile": (2, 9), "Difficile": (4, 12)},
"+": {"Facile": (1, 50), "Difficile": (10, 100)},
"-": {"Facile": (1, 50), "Difficile": (10, 100)},
"÷": {"Facile": (1, 10), "Difficile": (2, 12)}
}
def get_ocr_models_info() -> dict:
"""Retourne les informations sur les modèles OCR disponibles"""
try:
available_models = get_available_models()
current_model = get_current_model_info()
return {
"available_models": available_models,
"current_model": current_model,
"model_names": list(available_models.keys())
}
except Exception as e:
print(f"❌ Erreur get_ocr_models_info: {e}")
return {
"available_models": {},
"current_model": {"model_name": "hoololi/trocr-base-handwritten-calctrainer"},
"model_names": []
}
def switch_ocr_model(model_name: str) -> str:
"""Change le modèle OCR et retourne un message de statut"""
try:
success = set_ocr_model(model_name)
if success:
model_info = get_current_model_info()
return f"✅ Modèle changé vers: {model_info['display_name']}\n📍 {model_info['description']}"
else:
return f"❌ Échec du changement vers: {model_name}"
except Exception as e:
return f"❌ Erreur lors du changement: {str(e)}"
def create_result_row_with_metrics(i: int, image: dict | np.ndarray | Image.Image, expected: int, operation_data: tuple[int, int, str, int]) -> dict:
"""Traite une image avec OCR et mesure les métriques"""
print(f"🔍 Traitement OCR image #{i+1}...")
# Mesurer temps OCR précisément
ocr_start_time = time.time()
recognized, optimized_image, dataset_image_data = recognize_number_fast_with_image(image, debug=False)
ocr_processing_time = time.time() - ocr_start_time
print(f" ⏱️ OCR temps: {ocr_processing_time:.3f}s → '{recognized}'")
try:
recognized_num = int(recognized) if recognized.isdigit() else 0
except:
recognized_num = 0
is_correct = recognized_num == expected
a, b, operation, correct_result = operation_data
status_icon = "✅" if is_correct else "❌"
status_text = "Correct" if is_correct else "Incorrect"
row_color = "#e8f5e8" if is_correct else "#ffe8e8"
# Miniature pour affichage
image_thumbnail = create_thumbnail_fast(optimized_image, size=(50, 50))
# Libérer mémoire
if optimized_image and hasattr(optimized_image, 'close'):
try:
optimized_image.close()
except:
pass
return {
'html_row': f"""
{i+1} |
{a} |
{operation} |
{b} |
{expected} |
{image_thumbnail} |
{recognized_num} |
{status_icon} {status_text} |
{ocr_processing_time:.3f}s |
""",
'is_correct': is_correct,
'recognized': recognized,
'recognized_num': recognized_num,
'dataset_image_data': dataset_image_data,
'ocr_processing_time': ocr_processing_time
}
class MathGame:
"""Moteur de jeu avec métriques OCR complètes et modèles commutables"""
def __init__(self):
self.is_running = False
self.start_time = 0
self.current_operation = ""
self.correct_answer = 0
self.user_images = []
self.expected_answers = []
self.operations_history = []
self.question_count = 0
self.time_remaining = 30
self.session_data = []
# Configuration session
self.duration = 30
self.operation_type = "×"
self.difficulty = "Facile"
# Gestion export
self.export_status = "not_exported"
self.export_timestamp = None
self.export_result = None
def get_export_status(self) -> dict[str, str | bool | None]:
return {
"status": self.export_status,
"timestamp": self.export_timestamp,
"result": self.export_result,
"can_export": self.export_status == "not_exported" and len(self.session_data) > 0
}
def mark_export_in_progress(self) -> None:
self.export_status = "exporting"
self.export_timestamp = datetime.datetime.now().isoformat()
def mark_export_completed(self, result: str) -> None:
self.export_status = "exported"
self.export_result = result
def generate_multiplication(self, difficulty: str) -> tuple[str, int]:
"""Génère une multiplication"""
min_val, max_val = DIFFICULTY_RANGES["×"][difficulty]
a = random.randint(min_val, max_val)
b = random.randint(min_val, max_val)
return f"{a} × {b}", a * b
def generate_addition(self, difficulty: str) -> tuple[str, int]:
"""Génère une addition"""
min_val, max_val = DIFFICULTY_RANGES["+"][difficulty]
a = random.randint(min_val, max_val)
b = random.randint(min_val, max_val)
return f"{a} + {b}", a + b
def generate_subtraction(self, difficulty: str) -> tuple[str, int]:
"""Génère une soustraction (résultat toujours positif)"""
min_val, max_val = DIFFICULTY_RANGES["-"][difficulty]
a = random.randint(min_val, max_val)
b = random.randint(min_val, a)
return f"{a} - {b}", a - b
def generate_division(self, difficulty: str) -> tuple[str, int]:
"""Génère une division exacte"""
min_result, max_result = DIFFICULTY_RANGES["÷"][difficulty]
result = random.randint(min_result, max_result)
if difficulty == "Facile":
divisor = random.randint(2, 9)
else:
divisor = random.randint(2, 12)
dividend = result * divisor
return f"{dividend} ÷ {divisor}", result
def generate_operation(self, operation_type: str, difficulty: str) -> tuple[str, int]:
"""Génère une opération selon le type et la difficulté"""
if operation_type == "×":
return self.generate_multiplication(difficulty)
elif operation_type == "+":
return self.generate_addition(difficulty)
elif operation_type == "-":
return self.generate_subtraction(difficulty)
elif operation_type == "÷":
return self.generate_division(difficulty)
elif operation_type == "Aléatoire":
random_op = random.choice(["×", "+", "-", "÷"])
return self.generate_operation(random_op, difficulty)
else:
return self.generate_multiplication(difficulty)
def start_game(self, duration: str, operation: str, difficulty: str) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]:
"""Démarre le jeu avec la configuration choisie"""
# Configuration
self.duration = 60 if duration == "60 secondes" else 30
self.operation_type = operation
self.difficulty = difficulty
# Nettoyage simple
if hasattr(self, 'user_images') and self.user_images:
for img in self.user_images:
if hasattr(img, 'close'):
try:
img.close()
except:
pass
# Réinitialisation complète
self.is_running = True
self.start_time = time.time()
self.user_images = []
self.expected_answers = []
self.operations_history = []
self.question_count = 0
self.time_remaining = self.duration
self.session_data = []
# Reset export
self.export_status = "not_exported"
self.export_timestamp = None
self.export_result = None
gc.collect()
# Première opération
operation_str, answer = self.generate_operation(self.operation_type, self.difficulty)
self.current_operation = operation_str
self.correct_answer = answer
# Parser l'opération pour l'historique
parts = operation_str.split()
a, op, b = int(parts[0]), parts[1], int(parts[2])
self.operations_history.append((a, b, op, answer))
# Affichage
operation_emoji = {
"×": "✖️", "+": "➕", "-": "➖", "÷": "➗", "Aléatoire": "🎲"
}
emoji = operation_emoji.get(self.operation_type, "🔢")
return (
f'{operation_str}
',
create_white_canvas(),
f"🎯 {emoji} {self.operation_type} • {self.difficulty} • Écrivez votre réponse !",
f"⏱️ Temps restant: {self.time_remaining}s",
gr.update(interactive=False),
gr.update(interactive=True),
""
)
def next_question(self, image_data: dict | np.ndarray | Image.Image | None) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]:
"""Passe à la question suivante - STOCKAGE SIMPLE, PAS D'OCR"""
if not self.is_running:
return (
f'{self.current_operation}
',
image_data,
"❌ Le jeu n'est pas en cours !",
"⏱️ Temps: 0s",
gr.update(interactive=True),
gr.update(interactive=False),
""
)
elapsed_time = time.time() - self.start_time
if elapsed_time >= self.duration:
return self.end_game(image_data)
# STOCKAGE SIMPLE - PAS D'OCR pendant le jeu !
if image_data is not None:
self.user_images.append(image_data)
self.expected_answers.append(self.correct_answer)
self.question_count += 1
print(f"📝 Image {self.question_count} stockée (pas d'OCR pendant le jeu)")
# Nouvelle opération
operation_str, answer = self.generate_operation(self.operation_type, self.difficulty)
self.current_operation = operation_str
self.correct_answer = answer
# Parser pour l'historique
parts = operation_str.split()
a, op, b = int(parts[0]), parts[1], int(parts[2])
self.operations_history.append((a, b, op, answer))
time_remaining = max(0, self.duration - int(elapsed_time))
self.time_remaining = time_remaining
if time_remaining <= 0:
return self.end_game(image_data)
# Emoji pour l'opération
operation_emoji = {
"×": "✖️", "+": "➕", "-": "➖", "÷": "➗", "Aléatoire": "🎲"
}
emoji = operation_emoji.get(self.operation_type, "🔢")
return (
f'{operation_str}
',
create_white_canvas(),
f"🎯 {emoji} Question {self.question_count + 1} • {self.difficulty}",
f"⏱️ Temps restant: {time_remaining}s",
gr.update(interactive=False),
gr.update(interactive=True),
""
)
def end_game(self, final_image: dict | np.ndarray | Image.Image | None) -> tuple[str, Image.Image, str, str, gr.update, gr.update, str]:
"""Fin de jeu - OCR AVEC MÉTRIQUES COMPLÈTES"""
self.is_running = False
print("🏁 Fin de jeu - Début OCR avec métriques détaillées...")
# Ajouter la dernière image si présente
if final_image is not None:
self.user_images.append(final_image)
self.expected_answers.append(self.correct_answer)
self.question_count += 1
# Ajouter l'opération finale à l'historique si nécessaire
if len(self.operations_history) < len(self.user_images):
parts = self.current_operation.split()
a, op, b = int(parts[0]), parts[1], int(parts[2])
self.operations_history.append((a, b, op, self.correct_answer))
# OCR SÉQUENTIEL AVEC MÉTRIQUES
total_questions = len(self.user_images)
correct_answers = 0
table_rows_html = ""
session_timestamp = datetime.datetime.now().isoformat()
session_id = f"session_{int(datetime.datetime.now().timestamp())}_{str(uuid.uuid4())[:8]}"
# Métriques OCR globales
total_ocr_start_time = time.time()
ocr_times = []
self.session_data = []
images_saved = 0
print(f"🔄 Traitement OCR avec métriques de {total_questions} images...")
# Récupérer infos modèle OCR une seule fois - MODIFIÉ pour utiliser le nouveau système
try:
ocr_model_info = get_ocr_model_info()
model_name = ocr_model_info.get("model_name", "hoololi/trocr-base-handwritten-calctrainer")
hardware = f"{ocr_model_info.get('device', 'Unknown')}-{ocr_model_info.get('gpu_name', 'Unknown')}"
except Exception as e:
print(f"❌ Erreur get_ocr_model_info: {e}")
model_name = "hoololi/trocr-base-handwritten-calctrainer"
hardware = "ZeroGPU-Unknown"
# Boucle OCR avec métriques
for i in range(total_questions):
print(f"📷 OCR image {i+1}/{total_questions}...")
# OCR avec métriques
row_data = create_result_row_with_metrics(
i,
self.user_images[i],
self.expected_answers[i],
self.operations_history[i] if i < len(self.operations_history) else (0, 0, "×", 0)
)
table_rows_html += row_data['html_row']
ocr_times.append(row_data['ocr_processing_time'])
if row_data['is_correct']:
correct_answers += 1
# Structure dataset optimisée
a, b, operation, correct_result = self.operations_history[i] if i < len(self.operations_history) else (0, 0, "×", 0)
# ID unique pour cette question
question_id = f"{session_id}_q{i+1:02d}"
entry = {
# Identification
"session_id": session_id,
"question_id": question_id,
"timestamp": session_timestamp,
# Données mathématiques
"operand_a": a,
"operand_b": b,
"operation": operation,
"correct_answer": self.expected_answers[i],
"difficulty": self.difficulty,
# Données OCR
"ocr_prediction": row_data['recognized'],
"ocr_parsed_number": row_data['recognized_num'],
"is_correct": row_data['is_correct'],
# Métriques modèle OCR
"ocr_model_name": model_name,
"ocr_processing_time": row_data['ocr_processing_time'],
"ocr_confidence": 0.0, # Non disponible avec TrOCR actuel
# Métriques session (calculées à la fin)
"session_duration": self.duration,
"session_total_questions": total_questions,
# Métadonnées techniques
"app_version": "3.2_with_switchable_models",
"hardware": hardware
}
# Image PIL native pour dataset
if row_data['dataset_image_data']:
entry["handwriting_image"] = row_data['dataset_image_data']["handwriting_image"]
images_saved += 1
self.session_data.append(entry)
# Calculs finaux métriques
total_ocr_time = time.time() - total_ocr_start_time
avg_ocr_time = sum(ocr_times) / len(ocr_times) if ocr_times else 0.0
accuracy = (correct_answers / total_questions * 100) if total_questions > 0 else 0
# Ajouter métriques session à toutes les entrées
for entry in self.session_data:
entry["session_accuracy"] = accuracy
entry["session_total_ocr_time"] = total_ocr_time
entry["session_avg_ocr_time"] = avg_ocr_time
# Statistiques détaillées
print(f"📊 === MÉTRIQUES OCR COMPLÈTES ===")
print(f"📷 Images traitées: {total_questions}")
print(f"⏱️ Temps total OCR: {total_ocr_time:.2f}s")
print(f"⚡ Temps moyen/image: {avg_ocr_time:.3f}s")
print(f"🎯 Précision: {accuracy:.1f}%")
print(f"🤖 Modèle: {model_name}")
print(f"💻 Hardware: {hardware}")
# Statistiques par opération
operations_stats = {}
for entry in self.session_data:
op = entry['operation']
if op not in operations_stats:
operations_stats[op] = {'correct': 0, 'total': 0, 'times': []}
operations_stats[op]['total'] += 1
operations_stats[op]['times'].append(entry['ocr_processing_time'])
if entry['is_correct']:
operations_stats[op]['correct'] += 1
print(f"📈 Détail par opération:")
for op, stats in operations_stats.items():
op_accuracy = (stats['correct'] / stats['total'] * 100) if stats['total'] > 0 else 0
op_avg_time = sum(stats['times']) / len(stats['times']) if stats['times'] else 0
print(f" {op}: {op_accuracy:.1f}% précision, {op_avg_time:.3f}s/image ({stats['total']} images)")
# Nettoyage mémoire
for img in self.user_images:
if hasattr(img, 'close'):
try:
img.close()
except:
pass
cleanup_memory()
# HTML résultats avec colonne temps
table_html = f"""
Question |
A |
Op |
B |
Réponse |
Votre dessin |
OCR |
Statut |
Temps OCR |
{table_rows_html}
"""
# Configuration session pour affichage
config_display = f"{self.operation_type} • {self.difficulty} • {self.duration}s"
export_info = self.get_export_status()
if export_info["can_export"]:
export_section = f"""
📊 Métriques de la série
✅ {total_questions} réponses • 📊 {accuracy:.1f}% de précision
🖼️ {images_saved} images sauvegardées
⏱️ OCR: {total_ocr_time:.2f}s total, {avg_ocr_time:.3f}s/image
🤖 Modèle: {model_name}
💻 Hardware: {hardware}
⚙️ Configuration: {config_display}
"""
else:
export_section = ""
final_results = f"""
🎉 Session terminée !
{total_questions}
Questions
{correct_answers}
Correctes
{total_questions - correct_answers}
Incorrectes
{accuracy:.1f}%
Précision
{avg_ocr_time:.3f}s
Temps/image
📊 Détail des Réponses avec Métriques OCR
{table_html}
{export_section}
"""
return (
"""🏁 C'est fini !
""",
create_white_canvas(),
f"✨ Session {config_display} terminée !",
"⏱️ Temps écoulé !",
gr.update(interactive=True),
gr.update(interactive=False),
final_results
)
def export_to_optimized_dataset(session_data: list[dict], dataset_name: str = None) -> str:
"""Export vers le dataset optimisé avec métriques OCR"""
if dataset_name is None:
dataset_name = DATASET_NAME
if not DATASET_AVAILABLE:
return "❌ Modules dataset non disponibles"
hf_token = os.getenv("HF_TOKEN") or os.getenv("tk_calcul_ocr")
if not hf_token:
return "❌ Token HuggingFace manquant"
try:
print(f"\n🚀 === EXPORT DATASET OPTIMISÉ AVEC MÉTRIQUES ===")
print(f"📊 Dataset: {dataset_name}")
# Filtrer les entrées avec images
clean_entries = [entry for entry in session_data if entry.get('handwriting_image') is not None]
if len(clean_entries) == 0:
return "❌ Aucune entrée avec image à exporter"
# Statistiques pré-export
total_ocr_time = clean_entries[0].get('session_total_ocr_time', 0)
avg_ocr_time = clean_entries[0].get('session_avg_ocr_time', 0)
model_name = clean_entries[0].get('ocr_model_name', 'Unknown')
session_accuracy = clean_entries[0].get('session_accuracy', 0)
print(f"📈 Métriques session:")
print(f" - {len(clean_entries)} images")
print(f" - {session_accuracy:.1f}% précision")
print(f" - {total_ocr_time:.2f}s total OCR")
print(f" - {avg_ocr_time:.3f}s/image")
print(f" - Modèle: {model_name}")
# Charger dataset existant et combiner
try:
existing_dataset = load_dataset(dataset_name, split="train")
existing_data = existing_dataset.to_list()
print(f"📊 {len(existing_data)} entrées existantes")
combined_data = existing_data + clean_entries
clean_dataset = Dataset.from_list(combined_data)
print(f"📊 Dataset combiné: {len(combined_data)} total")
except Exception as e:
print(f"📊 Nouveau dataset: {e}")
clean_dataset = Dataset.from_list(clean_entries)
# Conversion colonne image
try:
clean_dataset = clean_dataset.cast_column("handwriting_image", DatasetImage())
print("✅ Colonne image convertie")
except Exception as e:
print(f"⚠️ Conversion image: {e}")
# Statistiques par opération pour commit message
operations_count = {}
for entry in clean_entries:
op = entry.get('operation', 'unknown')
operations_count[op] = operations_count.get(op, 0) + 1
operations_summary = ", ".join([f"{op}: {count}" for op, count in operations_count.items()])
# Message de commit enrichi avec métriques
commit_message = f"""Add {len(clean_entries)} samples with OCR metrics
Model: {model_name}
Accuracy: {session_accuracy:.1f}%
Avg OCR time: {avg_ocr_time:.3f}s/image
Operations: {operations_summary}
Hardware: {clean_entries[0].get('hardware', 'Unknown')}
"""
# Push vers HuggingFace
print(f"📤 Push vers {dataset_name}...")
clean_dataset.push_to_hub(
dataset_name,
private=False,
token=hf_token,
commit_message=commit_message
)
cleanup_memory()
return f"""### ✅ Session ajoutée au dataset optimisé !
📊 **Dataset:** {dataset_name}
🖼️ **Images:** {len(clean_entries)}
🎯 **Précision:** {session_accuracy:.1f}%
⏱️ **Performance:** {avg_ocr_time:.3f}s/image (total: {total_ocr_time:.1f}s)
🤖 **Modèle:** {model_name}
🔢 **Opérations:** {operations_summary}
📈 **Total dataset:** {len(clean_dataset)}
🔗 {dataset_name}
"""
except Exception as e:
print(f"❌ ERREUR: {e}")
return f"❌ Erreur: {str(e)}"
# Fonction de compatibilité pour ne pas casser l'interface
def export_to_clean_dataset(session_data: list[dict], dataset_name: str = None) -> str:
"""Wrapper pour compatibilité avec l'ancienne interface"""
return export_to_optimized_dataset(session_data, dataset_name)