import gradio as gr import os import json import shutil from pathlib import Path import google.generativeai as genai from huggingface_hub import HfApi, create_repo import tempfile import uuid from PIL import Image import io import hashlib from typing import List, Dict, Set import pdf2image import fitz # PyMuPDF from dotenv import load_dotenv # Charger les variables d'environnement load_dotenv() # Récupérer les clés API depuis les variables d'environnement DEFAULT_GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") DEFAULT_HF_TOKEN = os.getenv("HF_TOKEN", "") def generate_unique_id() -> str: """Génère un ID unique de 50 caractères""" # Générer deux UUID4 et les combiner uuid1 = str(uuid.uuid4()) uuid2 = str(uuid.uuid4()) # Prendre les 25 premiers caractères de chaque UUID return f"{uuid1[:25]}{uuid2[:25]}" def check_duplicates(entries: List[Dict]) -> List[Dict]: """Vérifie et supprime les doublons dans les entrées du dataset""" seen_questions = set() seen_images = set() unique_entries = [] for entry in entries: # Créer une clé unique basée sur la question et l'image question_key = entry['query'].lower().strip() image_key = entry['image'] # Vérifier si la question est similaire à une question existante is_duplicate = False for seen_question in seen_questions: # Calculer la similarité entre les questions if question_key in seen_question or seen_question in question_key: is_duplicate = True break # Vérifier si l'image a déjà été utilisée trop de fois image_count = sum(1 for e in unique_entries if e['image'] == image_key) if not is_duplicate and image_count < 5: # Limite à 5 questions par image seen_questions.add(question_key) seen_images.add(image_key) unique_entries.append(entry) return unique_entries def process_files(api_key: str, hf_token: str, files: List[str], dataset_name: str, progress=gr.Progress()) -> str: """ Traite les fichiers (images ou PDFs) et crée le dataset """ try: print(f"🔍 Début du traitement avec {len(files)} fichiers") # Dossier temporaire pour toutes les images avant mélange with tempfile.TemporaryDirectory() as temp_images_dir: print("📂 Création du dossier temporaire pour les images") # Compteur pour numéroter les images image_counter = 1 # Liste pour stocker tous les chemins d'images all_images = [] # Traiter d'abord tous les PDFs print("📄 Traitement des PDFs...") for file in files: if file.name.lower().endswith('.pdf'): print(f"📄 Conversion du PDF: {file.name}") try: # Ouvrir le PDF avec PyMuPDF pdf_document = fitz.open(file.name) # Convertir chaque page en image for page_num in range(len(pdf_document)): page = pdf_document[page_num] pix = page.get_pixmap() image_path = os.path.join(temp_images_dir, f"{image_counter}.png") pix.save(image_path) all_images.append(image_path) print(f" 📄 Page {page_num + 1} convertie en {image_counter}.png") image_counter += 1 pdf_document.close() except Exception as e: print(f"❌ Erreur lors de la conversion du PDF {file.name}: {str(e)}") continue # Traiter ensuite toutes les images print("🖼️ Traitement des images...") for file in files: file_lower = file.name.lower() if file_lower.endswith(('.png', '.jpg', '.jpeg')): try: # Copier et renommer l'image new_path = os.path.join(temp_images_dir, f"{image_counter}.png") # Convertir en PNG si nécessaire if file_lower.endswith(('.jpg', '.jpeg')): img = Image.open(file.name) img.save(new_path, 'PNG') else: shutil.copy2(file.name, new_path) all_images.append(new_path) print(f"🖼️ Image {file.name} copiée en {image_counter}.png") image_counter += 1 except Exception as e: print(f"❌ Erreur lors du traitement de l'image {file.name}: {str(e)}") continue if not all_images: return "❌ Erreur: Aucune image valide trouvée. Veuillez fournir des fichiers PDF ou des images (PNG, JPG, JPEG)." # Mélanger toutes les images print(f"🔄 Mélange des {len(all_images)} images...") import random random.shuffle(all_images) # Créer un nouveau dossier pour les images mélangées et renumérotées with tempfile.TemporaryDirectory() as final_images_dir: final_images = [] for i, image_path in enumerate(all_images, 1): new_path = os.path.join(final_images_dir, f"{i}.png") shutil.copy2(image_path, new_path) final_images.append(new_path) print(f"📝 Image {os.path.basename(image_path)} renumérotée en {i}.png") print(f"✅ Total des images à traiter: {len(final_images)}") # Continuer avec le traitement des images return process_images(api_key, hf_token, final_images, dataset_name, progress) except Exception as e: return f"❌ Erreur lors du traitement des fichiers: {str(e)}" def process_images(api_key, hf_token, images, dataset_name, progress=gr.Progress()): """ Traite les images et crée le dataset """ try: print(f"🔍 Début du traitement avec {len(images)} images") print(f"🔑 API Key: {api_key[:5]}...") print(f"🔑 HF Token: {hf_token[:5]}...") print(f"📁 Dataset name: {dataset_name}") if not api_key or not hf_token: print("❌ Erreur: API Key ou HF Token manquant") return "❌ Erreur: Veuillez entrer votre clé API Google Gemini et votre token Hugging Face." # Configuration de l'API Gemini print("⚙️ Configuration de l'API Gemini...") genai.configure(api_key=api_key) model = genai.GenerativeModel('gemini-1.5-flash') # Créer d'abord le repository sur Hugging Face try: print("📦 Création du repository sur Hugging Face...") api = HfApi(token=hf_token) create_repo(dataset_name, repo_type="dataset", token=hf_token, exist_ok=True) print("✅ Repository créé avec succès") except Exception as e: print(f"❌ Erreur lors de la création du repository: {str(e)}") return f"❌ Erreur lors de la création du repository sur Hugging Face: {str(e)}" # Création d'un dossier temporaire pour le dataset with tempfile.TemporaryDirectory() as temp_dir: print(f"📂 Création du dossier temporaire: {temp_dir}") # Extraire le nom du dataset sans le nom d'utilisateur repo_name = dataset_name.split('/')[-1] dataset_path = Path(temp_dir) / repo_name dataset_path.mkdir() # Création des dossiers pour les splits splits = ['train', 'validation', 'test'] for split in splits: (dataset_path / split / 'images').mkdir(parents=True) print(f"📁 Création du dossier {split}") # Mélanger les images aléatoirement import random random.shuffle(images) total_images = len(images) progress(0, desc="Démarrage du traitement...") # Créer un dossier temporaire pour les images renommées with tempfile.TemporaryDirectory() as renamed_images_dir: renamed_images = [] print("🔄 Renommage des images...") # Renommer et copier toutes les images d'abord for i, image in enumerate(images, 1): new_image_path = Path(renamed_images_dir) / f"{i}.png" shutil.copy2(image, new_image_path) renamed_images.append(str(new_image_path)) print(f"📝 Image {image} renommée en {i}.png") # Traitement des images renommées for i, image in enumerate(renamed_images, 1): print(f"\n🖼️ Traitement de l'image {i}/{total_images}") progress(i / total_images, desc=f"Traitement de l'image {i}/{total_images}") # Déterminer le split (80% train, 10% validation, 10% test) if i <= len(images) * 0.8: split = 'train' elif i <= len(images) * 0.9: split = 'validation' else: split = 'test' print(f"📂 Split: {split}") # Copier l'image image_path = dataset_path / split / 'images' / f"{i}.png" print(f"📄 Copie de l'image vers: {image_path}") shutil.copy2(image, image_path) # Générer les questions/réponses avec Gemini all_qa_pairs = [] # Une seule tentative par image with open(image, 'rb') as img_file: img_data = img_file.read() # Générer un nombre aléatoire de questions à poser nb_questions = random.randint(1, 5) print(f"❓ Génération de {nb_questions} questions...") prompt = f"""Tu es un expert en analyse financière, en création de datasets de haute qualité et en analyse multilingue. Examine attentivement ce document financier, IDENTIFIE D'ABORD SA LANGUE, puis génère exactement {nb_questions} questions/réponses de qualité professionnelle dans la même langue que le document. ÉTAPE 1 - IDENTIFICATION DE LA LANGUE : - Analyse attentivement le texte dans l'image - Identifie la langue principale (fr pour français, en pour anglais, de pour allemand, etc.) - Utilise cette langue pour toutes les questions et réponses - Si plusieurs langues sont présentes, choisis la langue dominante Format de réponse requis (JSON) : [ {{ "query": "Question financière précise et détaillée dans la langue du document", "answer": "Réponse complète et exacte dans la langue du document", "langue": "code ISO de la langue (fr, en, de, etc.)", "is_negative": false }} ] Instructions détaillées pour la création du dataset : 1. TYPES DE QUESTIONS FINANCIÈRES (sois créatif et précis) : - Analyse quantitative : * Montants exacts et variations * Pourcentages et ratios financiers * Évolutions temporelles * Comparaisons chiffrées - Analyse qualitative : * Stratégies et objectifs * Risques et opportunités * Contexte réglementaire * Implications business - Dates et échéances : * Périodes de reporting * Dates clés * Échéances importantes * Historique des événements 2. QUESTIONS NÉGATIVES (TRÈS IMPORTANT) : - Tu DOIS générer au moins 1 question sur {nb_questions} où l'information n'est PAS dans le document - Pour ces questions, tu DOIS mettre "is_negative": true - La réponse DOIT commencer par une phrase indiquant l'absence d'information dans la langue du document : * FR: "Cette information ne figure pas dans le document" * EN: "This information is not present in the document" * DE: "Diese Information ist im Dokument nicht enthalten" - Les questions négatives doivent être plausibles et pertinentes pour un document financier 3. QUALITÉ DES QUESTIONS : - Précision : utilise des chiffres exacts quand possible - Clarté : questions non ambiguës - Pertinence : focus sur les aspects financiers importants - Variété : mélange différents types de questions - Profondeur : questions qui nécessitent une analyse approfondie 4. QUALITÉ DES RÉPONSES : - Pour les questions normales (is_negative: false) : * Exactitude : informations vérifiables dans le document * Complétude : réponses détaillées et exhaustives * Clarté : formulation professionnelle et précise * Contexte : inclure les éléments de contexte pertinents - Pour les questions négatives (is_negative: true) : * TOUJOURS commencer par la phrase d'absence d'information dans la bonne langue * Expliquer brièvement pourquoi cette information serait pertinente 5. RÈGLES STRICTES : - Questions et réponses UNIQUEMENT dans la langue détectée du document - Pas de questions vagues ou générales - Pas de répétition de questions similaires - Pas de devinettes ou d'inférences non documentées - Respect strict du format JSON demandé - Au moins 1 question négative (is_negative: true) par image - Code langue ISO correct (fr, en, de, etc.) La réponse doit être uniquement le JSON, sans texte supplémentaire.""" response = model.generate_content([ prompt, {"mime_type": "image/png", "data": img_data} ]) try: # Nettoyer la réponse pour ne garder que le JSON response_text = response.text.strip() if response_text.startswith("```json"): response_text = response_text[7:] if response_text.endswith("```"): response_text = response_text[:-3] response_text = response_text.strip() # Extraire et formater les Q/R qa_pairs = json.loads(response_text) # Vérifier que c'est une liste if not isinstance(qa_pairs, list): raise ValueError("La réponse n'est pas une liste") # Vérifier qu'il y a au moins une question négative has_negative = False for qa in qa_pairs: if qa.get("is_negative", False): if not qa["answer"].startswith("Cette information ne figure pas dans le document"): qa["answer"] = "Cette information ne figure pas dans le document. " + qa["answer"] has_negative = True if not has_negative: print("⚠️ Attention: Aucune question négative générée, on réessaie...") continue # Vérifier que chaque élément a les bons champs for qa in qa_pairs: if not all(key in qa for key in ["query", "answer", "langue", "is_negative"]): raise ValueError("Un élément ne contient pas tous les champs requis") # Vérifier que la langue est fr if qa["langue"] != "fr": qa["langue"] = "fr" # Générer un ID unique qa["id"] = generate_unique_id() # Ajouter le chemin de l'image avec le nouveau nom qa["image"] = f"images/{i}.png" qa["file_name"] = f"images/{i}.png" all_qa_pairs.extend(qa_pairs) except json.JSONDecodeError as e: print(f"Erreur JSON: {str(e)}") continue except ValueError as e: print(f"Erreur de format: {str(e)}") continue # Vérifier et supprimer les doublons unique_qa_pairs = check_duplicates(all_qa_pairs) # Créer les entrées pour le JSONL for qa in unique_qa_pairs: entry = { "id": qa["id"], "image": qa["image"], "query": qa["query"], "answer": qa["answer"], "langue": qa["langue"], "file_name": qa["file_name"], "is_negative": qa["is_negative"] } # Ajouter au fichier JSONL correspondant jsonl_path = dataset_path / split / "metadata.jsonl" with open(jsonl_path, 'a', encoding='utf-8') as f: f.write(json.dumps(entry, ensure_ascii=False) + '\n') # Créer le fichier LICENSE with open(dataset_path / "LICENSE", 'w') as f: f.write("""Apache License 2.0 Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """) progress(0.9, desc="Upload du dataset sur Hugging Face...") try: # Uploader le dataset api.upload_folder( folder_path=str(dataset_path), repo_id=dataset_name, repo_type="dataset" ) progress(1.0, desc="Terminé !") return f"✅ Dataset créé avec succès !\n\nAccédez à votre dataset : https://huggingface.co/datasets/{dataset_name}" except Exception as e: return f"❌ Erreur lors de l'upload du dataset sur Hugging Face: {str(e)}" except Exception as e: return f"❌ Erreur: {str(e)}" # Interface Gradio with gr.Blocks() as demo: gr.Markdown(""" # 🎯 Mini-VLM Dataset Builder ### Créez des datasets de haute qualité pour l'entraînement de modèles Vision-Langage (VLM) ## 🎓 Pourquoi utiliser cette application ? Cette application permet de créer des datasets de questions/réponses de haute qualité en utilisant l'API Gemini de Google. L'objectif est de permettre l'entraînement de petits modèles VLM (Vision-Language Models) plus légers et plus rapides, tout en conservant des performances proches des grands modèles comme GPT-4V ou Gemini. ### 🔄 Pipeline de fonctionnement : 1. **Upload des documents** : PDF ou images (PNG, JPG, JPEG) 2. **Prétraitement** : - Conversion des PDFs en images - Normalisation des formats - Mélange aléatoire pour une meilleure distribution 3. **Analyse par Gemini** : - Détection automatique de la langue (FR, EN, DE, etc.) - Génération de 1 à 5 questions/réponses par image - Inclusion de questions négatives pour l'entraînement 4. **Structuration du dataset** : - Split automatique (80% train, 10% validation, 10% test) - Format standardisé avec IDs uniques - Métadonnées complètes (langue, type de question, etc.) 5. **Publication sur Hugging Face** : - Création automatique du repository - Upload des images et métadonnées - Structure optimisée pour l'entraînement ### ⚠️ Prérequis 1. Une [clé API Gemini](https://makersuite.google.com/app/apikey) pour l'analyse des documents 2. Un [token Hugging Face](https://huggingface.co/settings/tokens) avec droits d'écriture pour la publication """) with gr.Row(): with gr.Column(scale=1): api_key = gr.Textbox( label="🔑 Clé API Google Gemini", type="password", placeholder="Entrez votre clé API Gemini", value="" ) hf_token = gr.Textbox( label="🔑 Token Hugging Face", type="password", placeholder="Entrez votre token Hugging Face", value="" ) dataset_name = gr.Textbox( label="📁 Nom du dataset", placeholder="votre-username/nom-du-dataset", info="Format requis : username/nom-du-dataset (exemple: marsouuu/finance-dataset-fr)" ) with gr.Column(scale=1): files = gr.File( label="📄 Documents (PDF, PNG, JPG, JPEG)", file_count="multiple", height=200 ) gr.Markdown(""" ### 📊 Statistiques générées : - 80% des images pour l'entraînement - 10% pour la validation - 10% pour les tests - 1 à 5 questions par image - Au moins 1 question négative par image - Détection automatique de la langue """) submit_btn = gr.Button("🚀 Créer le dataset", variant="primary", scale=2) output = gr.Textbox( label="📝 Résultat", lines=3, interactive=False ) submit_btn.click( fn=process_files, inputs=[api_key, hf_token, files, dataset_name], outputs=output ) if __name__ == "__main__": demo.launch() else: # Pour Hugging Face Spaces app = demo