import gradio as gr import os import subprocess import sys import requests import json import logging from typing import Dict, List, Optional, Union import time import tempfile import shutil import importlib # Configuration du logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Fonction d'installation automatique def install_package(package_name): """Installe un package Python""" try: subprocess.check_call([sys.executable, "-m", "pip", "install", package_name, "--quiet"]) logger.info(f"✅ {package_name} installé avec succès") return True except subprocess.CalledProcessError as e: logger.error(f"❌ Erreur installation {package_name}: {e}") return False # Fonction pour recharger les modules après installation def reload_module(module_name): """Recharge un module après installation""" try: if module_name in sys.modules: importlib.reload(sys.modules[module_name]) else: __import__(module_name) return True except Exception as e: logger.error(f"Erreur rechargement {module_name}: {e}") return False # Imports conditionnels avec vérification def check_and_import_dependencies(): """Vérifie et importe toutes les dépendances""" global numpy, torch, NUMPY_AVAILABLE, TORCH_AVAILABLE, TRANSFORMERS_AVAILABLE global DATASETS_AVAILABLE, HF_HUB_AVAILABLE, PIL_AVAILABLE, LIBROSA_AVAILABLE, CV2_AVAILABLE global AutoTokenizer, AutoModel, AutoProcessor, AutoModelForCausalLM global TrainingArguments, Trainer, DataCollatorForLanguageModeling global Dataset, load_dataset, concatenate_datasets, HfApi, Image, librosa, cv2 # NumPy try: import numpy NUMPY_AVAILABLE = True except ImportError: numpy = None NUMPY_AVAILABLE = False # PyTorch try: import torch TORCH_AVAILABLE = True except ImportError: torch = None TORCH_AVAILABLE = False # Transformers try: from transformers import ( AutoTokenizer, AutoModel, AutoProcessor, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling ) TRANSFORMERS_AVAILABLE = True except ImportError: TRANSFORMERS_AVAILABLE = False AutoTokenizer = AutoModel = AutoProcessor = None AutoModelForCausalLM = TrainingArguments = Trainer = None DataCollatorForLanguageModeling = None # Datasets try: from datasets import Dataset, load_dataset, concatenate_datasets DATASETS_AVAILABLE = True except ImportError: DATASETS_AVAILABLE = False Dataset = load_dataset = concatenate_datasets = None # HuggingFace Hub try: from huggingface_hub import HfApi HF_HUB_AVAILABLE = True except ImportError: HF_HUB_AVAILABLE = False HfApi = None # PIL try: from PIL import Image PIL_AVAILABLE = True except ImportError: PIL_AVAILABLE = False Image = None # Librosa try: import librosa LIBROSA_AVAILABLE = True except ImportError: LIBROSA_AVAILABLE = False librosa = None # OpenCV try: import cv2 CV2_AVAILABLE = True except ImportError: CV2_AVAILABLE = False cv2 = None # Initialisation des imports check_and_import_dependencies() class MultimodalTrainer: def __init__(self): self.current_model = None self.current_tokenizer = None self.current_processor = None self.training_data = [] # Device selection if TORCH_AVAILABLE and torch and torch.cuda.is_available(): self.device = torch.device("cuda") else: self.device = "cpu" # HF API if HF_HUB_AVAILABLE and HfApi: self.hf_api = HfApi() else: self.hf_api = None def install_dependencies(self, packages_to_install): """Installe les dépendances manquantes""" installation_results = [] # Mapping des packages package_mapping = { "torch": "torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu", "transformers": "transformers", "datasets": "datasets", "accelerate": "accelerate", "pillow": "pillow", "librosa": "librosa", "opencv": "opencv-python", "huggingface_hub": "huggingface_hub" } for package in packages_to_install: installation_results.append(f"📦 Installation de {package}...") # Utilise le mapping si disponible install_cmd = package_mapping.get(package.lower(), package) if package.lower() == "torch": # Installation spéciale pour PyTorch try: subprocess.check_call([ sys.executable, "-m", "pip", "install", "torch", "torchvision", "torchaudio", "--index-url", "https://download.pytorch.org/whl/cpu", "--quiet" ]) success = True except subprocess.CalledProcessError: success = False else: success = install_package(install_cmd) if success: installation_results.append(f"✅ {package} installé avec succès!") else: installation_results.append(f"❌ Échec installation {package}") # Recharge les dépendances après installation installation_results.append("\n🔄 Rechargement des modules...") check_and_import_dependencies() self.__init__() # Réinitialise l'instance installation_results.append("✅ Modules rechargés!") return "\n".join(installation_results) def check_dependencies(self): """Vérifie et affiche l'état des dépendances""" # Force la vérification check_and_import_dependencies() deps = { "PyTorch": TORCH_AVAILABLE, "Transformers": TRANSFORMERS_AVAILABLE, "Datasets": DATASETS_AVAILABLE, "NumPy": NUMPY_AVAILABLE, "HuggingFace Hub": HF_HUB_AVAILABLE, "PIL": PIL_AVAILABLE, "Librosa": LIBROSA_AVAILABLE, "OpenCV": CV2_AVAILABLE } status = "📦 État des dépendances:\n\n" # Dépendances critiques critical_deps = ["PyTorch", "Transformers", "Datasets"] status += "🔥 CRITIQUES:\n" for dep in critical_deps: icon = "✅" if deps.get(dep) else "❌" status += f"{icon} {dep}\n" status += "\n🔧 OPTIONNELLES:\n" optional_deps = ["NumPy", "HuggingFace Hub", "PIL", "Librosa", "OpenCV"] for dep in optional_deps: icon = "✅" if deps.get(dep) else "⚠️" status += f"{icon} {dep}\n" # Système info status += f"\n💻 SYSTÈME:\n" status += f"🐍 Python: {sys.version.split()[0]}\n" status += f"💾 Device: {self.device}\n" if TORCH_AVAILABLE and torch and torch.cuda.is_available(): status += f"🚀 GPU: {torch.cuda.get_device_name()}\n" status += f"🔋 VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f}GB\n" return status def load_model(self, model_name: str, model_type: str = "causal"): """Charge un modèle depuis Hugging Face""" if not TRANSFORMERS_AVAILABLE: return "❌ Transformers non installé! Utilisez l'outil d'installation." if not TORCH_AVAILABLE or not torch: return "❌ PyTorch non installé! Utilisez l'outil d'installation." if not model_name.strip(): return "❌ Veuillez entrer un nom de modèle" try: logger.info(f"Chargement du modèle: {model_name}") # Stratégies de chargement multiples model_loaded = False error_messages = [] # Stratégie 1: AutoModelForCausalLM avec trust_remote_code if model_type == "causal" and not model_loaded: try: self.current_model = AutoModelForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else None, trust_remote_code=True ) model_loaded = True except Exception as e: error_messages.append(f"AutoModelForCausalLM: {str(e)}") # Stratégie 2: AutoModel générique if not model_loaded: try: self.current_model = AutoModel.from_pretrained( model_name, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else None, trust_remote_code=True ) model_loaded = True except Exception as e: error_messages.append(f"AutoModel: {str(e)}") # Stratégie 3: Détection automatique basée sur le nom if not model_loaded and any(x in model_name.lower() for x in ['llama', 'mistral', 'qwen', 'phi']): try: # Pour les modèles de type LLaMA/Mistral/Qwen from transformers import LlamaForCausalLM, MistralForCausalLM if 'llama' in model_name.lower(): self.current_model = LlamaForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else None, trust_remote_code=True ) elif 'mistral' in model_name.lower(): self.current_model = MistralForCausalLM.from_pretrained( model_name, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else None, trust_remote_code=True ) model_loaded = True except Exception as e: error_messages.append(f"Modèle spécifique: {str(e)}") # Stratégie 4: Configuration manuelle if not model_loaded: try: # Télécharge la configuration d'abord from transformers import AutoConfig config = AutoConfig.from_pretrained(model_name, trust_remote_code=True) # Force le model_type si manquant if not hasattr(config, 'model_type') or config.model_type is None: # Détection basée sur l'architecture if hasattr(config, 'architectures') and config.architectures: arch = config.architectures[0].lower() if 'llama' in arch: config.model_type = 'llama' elif 'mistral' in arch: config.model_type = 'mistral' elif 'qwen' in arch: config.model_type = 'qwen2' elif 'phi' in arch: config.model_type = 'phi' else: config.model_type = 'llama' # Par défaut self.current_model = AutoModelForCausalLM.from_pretrained( model_name, config=config, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, device_map="auto" if torch.cuda.is_available() else None, trust_remote_code=True ) model_loaded = True except Exception as e: error_messages.append(f"Configuration manuelle: {str(e)}") if not model_loaded: return f"❌ Impossible de charger le modèle. Erreurs:\n" + "\n".join(error_messages) # Charge le tokenizer try: self.current_tokenizer = AutoTokenizer.from_pretrained( model_name, trust_remote_code=True ) if self.current_tokenizer.pad_token is None: self.current_tokenizer.pad_token = self.current_tokenizer.eos_token except Exception as e: logger.warning(f"Tokenizer non trouvé: {e}") try: # Essaye avec un tokenizer générique from transformers import LlamaTokenizer self.current_tokenizer = LlamaTokenizer.from_pretrained( model_name, trust_remote_code=True ) except: logger.warning("Aucun tokenizer trouvé") # Charge le processor try: self.current_processor = AutoProcessor.from_pretrained( model_name, trust_remote_code=True ) except Exception as e: logger.warning(f"Processor non trouvé: {e}") return f"✅ Modèle {model_name} chargé avec succès!\nType: {type(self.current_model).__name__}\nArchitecture: {getattr(self.current_model.config, 'architectures', ['Inconnue'])[0] if hasattr(self.current_model, 'config') else 'Inconnue'}" except Exception as e: error_msg = f"❌ Erreur lors du chargement: {str(e)}" logger.error(error_msg) return error_msg def load_single_dataset(self, dataset_name: str, split: str = "train"): """Charge un dataset individuel""" if not DATASETS_AVAILABLE or not load_dataset: return "❌ Datasets non installé! Utilisez l'outil d'installation." if not dataset_name.strip(): return "❌ Veuillez entrer un nom de dataset" try: dataset = load_dataset(dataset_name, split=split) if hasattr(self, 'training_data') and self.training_data: self.training_data = concatenate_datasets([self.training_data, dataset]) else: self.training_data = dataset return f"✅ Dataset {dataset_name} ajouté!\n📊 Total: {len(self.training_data)} exemples\n🔍 Colonnes: {list(self.training_data.column_names)}" except Exception as e: error_msg = f"❌ Erreur dataset: {str(e)}" logger.error(error_msg) return error_msg def simulate_training(self, output_dir: str, num_epochs: int, learning_rate: float, batch_size: int): """Simulation d'entraînement (mode démo)""" if not self.current_model and not self.training_data: return "❌ Aucun modèle ou donnée chargé!" # Simulation steps = ["🏗️ Préparation des données", "🔧 Configuration du modèle", "🚀 Début entraînement"] result = "📋 SIMULATION D'ENTRAÎNEMENT:\n\n" result += f"📂 Sortie: {output_dir}\n" result += f"🔄 Époques: {num_epochs}\n" result += f"📚 Learning rate: {learning_rate}\n" result += f"📦 Batch size: {batch_size}\n\n" for i, step in enumerate(steps): result += f"Étape {i+1}: {step} ✅\n" if TORCH_AVAILABLE and TRANSFORMERS_AVAILABLE: result += "\n✅ Prêt pour un vrai entraînement!" else: result += "\n⚠️ MODE DÉMO - Installez PyTorch + Transformers pour un vrai entraînement" return result def get_model_info(self): """Retourne les informations du modèle actuel""" if not self.current_model: return "❌ Aucun modèle chargé" info = f"📋 INFORMATIONS DU MODÈLE:\n\n" info += f"🏷️ Type: {type(self.current_model).__name__}\n" if TORCH_AVAILABLE and torch: info += f"💾 Device: {next(self.current_model.parameters()).device}\n" # Compte les paramètres total_params = sum(p.numel() for p in self.current_model.parameters()) trainable_params = sum(p.numel() for p in self.current_model.parameters() if p.requires_grad) info += f"🔢 Paramètres totaux: {total_params:,}\n" info += f"🎯 Paramètres entraînables: {trainable_params:,}\n" if hasattr(self, 'training_data') and self.training_data: info += f"\n📊 DONNÉES:\n" info += f"📈 Exemples: {len(self.training_data):,}\n" info += f"📝 Colonnes: {list(self.training_data.column_names)}\n" def diagnose_model(self, model_name: str): """Diagnostique un modèle avant chargement""" if not model_name.strip(): return "❌ Veuillez entrer un nom de modèle" try: from transformers import AutoConfig import requests result = f"🔍 DIAGNOSTIC DU MODÈLE: {model_name}\n\n" # Vérification de l'existence try: config = AutoConfig.from_pretrained(model_name, trust_remote_code=True) result += "✅ Modèle accessible\n" # Informations sur la configuration result += f"📋 Type de modèle: {getattr(config, 'model_type', 'Non défini')}\n" result += f"🏗️ Architecture: {getattr(config, 'architectures', ['Inconnue'])}\n" result += f"📚 Vocabulaire: {getattr(config, 'vocab_size', 'Inconnu')}\n" result += f"🧠 Couches cachées: {getattr(config, 'hidden_size', 'Inconnu')}\n" result += f"🔢 Nombre de couches: {getattr(config, 'num_hidden_layers', 'Inconnu')}\n" # Recommandations if not hasattr(config, 'model_type') or config.model_type is None: result += "\n⚠️ PROBLÈME: model_type manquant\n" result += "💡 SOLUTION: Le chargeur essaiera de détecter automatiquement\n" if hasattr(config, 'architectures') and config.architectures: arch = config.architectures[0].lower() if 'llama' in arch: result += "🎯 Type détecté: LLaMA\n" elif 'mistral' in arch: result += "🎯 Type détecté: Mistral\n" elif 'qwen' in arch: result += "🎯 Type détecté: Qwen\n" elif 'phi' in arch: result += "🎯 Type détecté: Phi\n" result += "\n✅ Chargement possible avec les stratégies multiples" except Exception as e: result += f"❌ Erreur d'accès: {str(e)}\n" result += "💡 Vérifiez que le modèle existe et est public\n" return result except Exception as e: return f"❌ Erreur diagnostic: {str(e)}" # Initialisation trainer = MultimodalTrainer() # Interface Gradio def create_interface(): with gr.Blocks(title="🔥 Multimodal Training Hub", theme=gr.themes.Soft()) as app: gr.Markdown(""" # 🔥 Multimodal Training Hub ### Plateforme d'entraînement de modèles multimodaux 🤖 Modèles • 📊 Datasets • 🏋️ Training • 🛠️ Outils """) with gr.Tab("🔧 Diagnostic"): gr.Markdown("### 🩺 Vérification du système") with gr.Row(): check_deps_btn = gr.Button("🔍 Vérifier dépendances", variant="primary") install_core_btn = gr.Button("📦 Installer packages critiques", variant="secondary") deps_status = gr.Textbox( label="État des dépendances", lines=15, interactive=False ) with gr.Row(): install_transformers_btn = gr.Button("🤗 Installer Transformers") install_torch_btn = gr.Button("🔥 Installer PyTorch") install_datasets_btn = gr.Button("📊 Installer Datasets") install_status = gr.Textbox( label="Status d'installation", lines=8, interactive=False ) # Events check_deps_btn.click(trainer.check_dependencies, outputs=deps_status) install_transformers_btn.click( lambda: trainer.install_dependencies(["transformers"]), outputs=install_status ) install_torch_btn.click( lambda: trainer.install_dependencies(["torch"]), outputs=install_status ) install_datasets_btn.click( lambda: trainer.install_dependencies(["datasets"]), outputs=install_status ) install_core_btn.click( lambda: trainer.install_dependencies(["torch", "transformers", "datasets", "accelerate"]), outputs=install_status ) with gr.Tab("🤖 Modèle"): with gr.Row(): with gr.Column(): model_input = gr.Textbox( label="Nom du modèle HuggingFace", placeholder="microsoft/DialoGPT-medium", value="microsoft/DialoGPT-medium" ) model_type = gr.Dropdown( label="Type de modèle", choices=["causal", "base"], value="causal" ) load_model_btn = gr.Button("🔄 Charger le modèle", variant="primary") diagnose_btn = gr.Button("🔍 Diagnostiquer le modèle", variant="secondary") with gr.Column(): model_status = gr.Textbox( label="Status du modèle", interactive=False, lines=8 ) info_btn = gr.Button("ℹ️ Info modèle") model_info = gr.Textbox( label="Informations détaillées", interactive=False, lines=8 ) load_model_btn.click( trainer.load_model, inputs=[model_input, model_type], outputs=model_status ) diagnose_btn.click( trainer.diagnose_model, inputs=[model_input], outputs=model_status ) info_btn.click(trainer.get_model_info, outputs=model_info) with gr.Tab("📊 Données"): with gr.Row(): with gr.Column(): gr.Markdown("### 📝 Dataset individuel") dataset_input = gr.Textbox( label="Nom du dataset", placeholder="wikitext", value="wikitext" ) dataset_config = gr.Textbox( label="Configuration (optionnel)", placeholder="wikitext-2-raw-v1" ) dataset_split = gr.Textbox( label="Split", value="train" ) load_dataset_btn = gr.Button("➕ Ajouter dataset", variant="primary") with gr.Column(): data_status = gr.Textbox( label="Status des données", interactive=False, lines=12 ) def load_dataset_with_config(dataset_name, config_name, split): if config_name.strip(): full_name = f"{dataset_name}/{config_name}" if "/" not in config_name else config_name else: full_name = dataset_name return trainer.load_single_dataset(full_name, split) load_dataset_btn.click( load_dataset_with_config, inputs=[dataset_input, dataset_config, dataset_split], outputs=data_status ) with gr.Tab("🏋️ Entraînement"): with gr.Row(): with gr.Column(): output_dir = gr.Textbox( label="Dossier de sortie", value="./trained_model" ) with gr.Row(): num_epochs = gr.Number( label="Époques", value=3, minimum=1 ) batch_size = gr.Number( label="Batch size", value=4, minimum=1 ) learning_rate = gr.Number( label="Learning rate", value=5e-5, step=1e-6 ) train_btn = gr.Button("🚀 Simuler entraînement", variant="primary", size="lg") with gr.Column(): training_status = gr.Textbox( label="Status de l'entraînement", interactive=False, lines=12 ) train_btn.click( trainer.simulate_training, inputs=[output_dir, num_epochs, learning_rate, batch_size], outputs=training_status ) # Auto-check au démarrage app.load(trainer.check_dependencies, outputs=deps_status) return app # Lancement if __name__ == "__main__": app = create_interface() app.launch(share=True, server_name="0.0.0.0", server_port=7860)