martialroberge
Amélioration du prompt pour la détection de langue et de l'interface utilisateur
cbae9ff
raw
history blame
23.4 kB
import gradio as gr
import os
import json
import shutil
from pathlib import Path
import google.generativeai as genai
from huggingface_hub import HfApi, create_repo
import tempfile
import uuid
from PIL import Image
import io
import hashlib
from typing import List, Dict, Set
import pdf2image
import fitz # PyMuPDF
from dotenv import load_dotenv
# Charger les variables d'environnement
load_dotenv()
# Récupérer les clés API depuis les variables d'environnement
DEFAULT_GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
DEFAULT_HF_TOKEN = os.getenv("HF_TOKEN", "")
def generate_unique_id() -> str:
"""Génère un ID unique de 50 caractères"""
# Générer deux UUID4 et les combiner
uuid1 = str(uuid.uuid4())
uuid2 = str(uuid.uuid4())
# Prendre les 25 premiers caractères de chaque UUID
return f"{uuid1[:25]}{uuid2[:25]}"
def check_duplicates(entries: List[Dict]) -> List[Dict]:
"""Vérifie et supprime les doublons dans les entrées du dataset"""
seen_questions = set()
seen_images = set()
unique_entries = []
for entry in entries:
# Créer une clé unique basée sur la question et l'image
question_key = entry['query'].lower().strip()
image_key = entry['image']
# Vérifier si la question est similaire à une question existante
is_duplicate = False
for seen_question in seen_questions:
# Calculer la similarité entre les questions
if question_key in seen_question or seen_question in question_key:
is_duplicate = True
break
# Vérifier si l'image a déjà été utilisée trop de fois
image_count = sum(1 for e in unique_entries if e['image'] == image_key)
if not is_duplicate and image_count < 5: # Limite à 5 questions par image
seen_questions.add(question_key)
seen_images.add(image_key)
unique_entries.append(entry)
return unique_entries
def process_files(api_key: str, hf_token: str, files: List[str], dataset_name: str, progress=gr.Progress()) -> str:
"""
Traite les fichiers (images ou PDFs) et crée le dataset
"""
try:
print(f"🔍 Début du traitement avec {len(files)} fichiers")
# Dossier temporaire pour toutes les images avant mélange
with tempfile.TemporaryDirectory() as temp_images_dir:
print("📂 Création du dossier temporaire pour les images")
# Compteur pour numéroter les images
image_counter = 1
# Liste pour stocker tous les chemins d'images
all_images = []
# Traiter d'abord tous les PDFs
print("📄 Traitement des PDFs...")
for file in files:
if file.name.lower().endswith('.pdf'):
print(f"📄 Conversion du PDF: {file.name}")
try:
# Ouvrir le PDF avec PyMuPDF
pdf_document = fitz.open(file.name)
# Convertir chaque page en image
for page_num in range(len(pdf_document)):
page = pdf_document[page_num]
pix = page.get_pixmap()
image_path = os.path.join(temp_images_dir, f"{image_counter}.png")
pix.save(image_path)
all_images.append(image_path)
print(f" 📄 Page {page_num + 1} convertie en {image_counter}.png")
image_counter += 1
pdf_document.close()
except Exception as e:
print(f"❌ Erreur lors de la conversion du PDF {file.name}: {str(e)}")
continue
# Traiter ensuite toutes les images
print("🖼️ Traitement des images...")
for file in files:
file_lower = file.name.lower()
if file_lower.endswith(('.png', '.jpg', '.jpeg')):
try:
# Copier et renommer l'image
new_path = os.path.join(temp_images_dir, f"{image_counter}.png")
# Convertir en PNG si nécessaire
if file_lower.endswith(('.jpg', '.jpeg')):
img = Image.open(file.name)
img.save(new_path, 'PNG')
else:
shutil.copy2(file.name, new_path)
all_images.append(new_path)
print(f"🖼️ Image {file.name} copiée en {image_counter}.png")
image_counter += 1
except Exception as e:
print(f"❌ Erreur lors du traitement de l'image {file.name}: {str(e)}")
continue
if not all_images:
return "❌ Erreur: Aucune image valide trouvée. Veuillez fournir des fichiers PDF ou des images (PNG, JPG, JPEG)."
# Mélanger toutes les images
print(f"🔄 Mélange des {len(all_images)} images...")
import random
random.shuffle(all_images)
# Créer un nouveau dossier pour les images mélangées et renumérotées
with tempfile.TemporaryDirectory() as final_images_dir:
final_images = []
for i, image_path in enumerate(all_images, 1):
new_path = os.path.join(final_images_dir, f"{i}.png")
shutil.copy2(image_path, new_path)
final_images.append(new_path)
print(f"📝 Image {os.path.basename(image_path)} renumérotée en {i}.png")
print(f"✅ Total des images à traiter: {len(final_images)}")
# Continuer avec le traitement des images
return process_images(api_key, hf_token, final_images, dataset_name, progress)
except Exception as e:
return f"❌ Erreur lors du traitement des fichiers: {str(e)}"
def process_images(api_key, hf_token, images, dataset_name, progress=gr.Progress()):
"""
Traite les images et crée le dataset
"""
try:
print(f"🔍 Début du traitement avec {len(images)} images")
print(f"🔑 API Key: {api_key[:5]}...")
print(f"🔑 HF Token: {hf_token[:5]}...")
print(f"📁 Dataset name: {dataset_name}")
if not api_key or not hf_token:
print("❌ Erreur: API Key ou HF Token manquant")
return "❌ Erreur: Veuillez entrer votre clé API Google Gemini et votre token Hugging Face."
# Configuration de l'API Gemini
print("⚙️ Configuration de l'API Gemini...")
genai.configure(api_key=api_key)
model = genai.GenerativeModel('gemini-1.5-flash')
# Créer d'abord le repository sur Hugging Face
try:
print("📦 Création du repository sur Hugging Face...")
api = HfApi(token=hf_token)
create_repo(dataset_name, repo_type="dataset", token=hf_token, exist_ok=True)
print("✅ Repository créé avec succès")
except Exception as e:
print(f"❌ Erreur lors de la création du repository: {str(e)}")
return f"❌ Erreur lors de la création du repository sur Hugging Face: {str(e)}"
# Création d'un dossier temporaire pour le dataset
with tempfile.TemporaryDirectory() as temp_dir:
print(f"📂 Création du dossier temporaire: {temp_dir}")
# Extraire le nom du dataset sans le nom d'utilisateur
repo_name = dataset_name.split('/')[-1]
dataset_path = Path(temp_dir) / repo_name
dataset_path.mkdir()
# Création des dossiers pour les splits
splits = ['train', 'validation', 'test']
for split in splits:
(dataset_path / split / 'images').mkdir(parents=True)
print(f"📁 Création du dossier {split}")
# Mélanger les images aléatoirement
import random
random.shuffle(images)
total_images = len(images)
progress(0, desc="Démarrage du traitement...")
# Créer un dossier temporaire pour les images renommées
with tempfile.TemporaryDirectory() as renamed_images_dir:
renamed_images = []
print("🔄 Renommage des images...")
# Renommer et copier toutes les images d'abord
for i, image in enumerate(images, 1):
new_image_path = Path(renamed_images_dir) / f"{i}.png"
shutil.copy2(image, new_image_path)
renamed_images.append(str(new_image_path))
print(f"📝 Image {image} renommée en {i}.png")
# Traitement des images renommées
for i, image in enumerate(renamed_images, 1):
print(f"\n🖼️ Traitement de l'image {i}/{total_images}")
progress(i / total_images, desc=f"Traitement de l'image {i}/{total_images}")
# Déterminer le split (80% train, 10% validation, 10% test)
if i <= len(images) * 0.8:
split = 'train'
elif i <= len(images) * 0.9:
split = 'validation'
else:
split = 'test'
print(f"📂 Split: {split}")
# Copier l'image
image_path = dataset_path / split / 'images' / f"{i}.png"
print(f"📄 Copie de l'image vers: {image_path}")
shutil.copy2(image, image_path)
# Générer les questions/réponses avec Gemini
all_qa_pairs = []
# Une seule tentative par image
with open(image, 'rb') as img_file:
img_data = img_file.read()
# Générer un nombre aléatoire de questions à poser
nb_questions = random.randint(1, 5)
print(f"❓ Génération de {nb_questions} questions...")
prompt = f"""Tu es un expert en analyse financière, en création de datasets de haute qualité et en analyse multilingue. Examine attentivement ce document financier, IDENTIFIE D'ABORD SA LANGUE, puis génère exactement {nb_questions} questions/réponses de qualité professionnelle dans la même langue que le document.
ÉTAPE 1 - IDENTIFICATION DE LA LANGUE :
- Analyse attentivement le texte dans l'image
- Identifie la langue principale (fr pour français, en pour anglais, de pour allemand, etc.)
- Utilise cette langue pour toutes les questions et réponses
- Si plusieurs langues sont présentes, choisis la langue dominante
Format de réponse requis (JSON) :
[
{{
"query": "Question financière précise et détaillée dans la langue du document",
"answer": "Réponse complète et exacte dans la langue du document",
"langue": "code ISO de la langue (fr, en, de, etc.)",
"is_negative": false
}}
]
Instructions détaillées pour la création du dataset :
1. TYPES DE QUESTIONS FINANCIÈRES (sois créatif et précis) :
- Analyse quantitative :
* Montants exacts et variations
* Pourcentages et ratios financiers
* Évolutions temporelles
* Comparaisons chiffrées
- Analyse qualitative :
* Stratégies et objectifs
* Risques et opportunités
* Contexte réglementaire
* Implications business
- Dates et échéances :
* Périodes de reporting
* Dates clés
* Échéances importantes
* Historique des événements
2. QUESTIONS NÉGATIVES (TRÈS IMPORTANT) :
- Tu DOIS générer au moins 1 question sur {nb_questions} où l'information n'est PAS dans le document
- Pour ces questions, tu DOIS mettre "is_negative": true
- La réponse DOIT commencer par une phrase indiquant l'absence d'information dans la langue du document :
* FR: "Cette information ne figure pas dans le document"
* EN: "This information is not present in the document"
* DE: "Diese Information ist im Dokument nicht enthalten"
- Les questions négatives doivent être plausibles et pertinentes pour un document financier
3. QUALITÉ DES QUESTIONS :
- Précision : utilise des chiffres exacts quand possible
- Clarté : questions non ambiguës
- Pertinence : focus sur les aspects financiers importants
- Variété : mélange différents types de questions
- Profondeur : questions qui nécessitent une analyse approfondie
4. QUALITÉ DES RÉPONSES :
- Pour les questions normales (is_negative: false) :
* Exactitude : informations vérifiables dans le document
* Complétude : réponses détaillées et exhaustives
* Clarté : formulation professionnelle et précise
* Contexte : inclure les éléments de contexte pertinents
- Pour les questions négatives (is_negative: true) :
* TOUJOURS commencer par la phrase d'absence d'information dans la bonne langue
* Expliquer brièvement pourquoi cette information serait pertinente
5. RÈGLES STRICTES :
- Questions et réponses UNIQUEMENT dans la langue détectée du document
- Pas de questions vagues ou générales
- Pas de répétition de questions similaires
- Pas de devinettes ou d'inférences non documentées
- Respect strict du format JSON demandé
- Au moins 1 question négative (is_negative: true) par image
- Code langue ISO correct (fr, en, de, etc.)
La réponse doit être uniquement le JSON, sans texte supplémentaire."""
response = model.generate_content([
prompt,
{"mime_type": "image/png", "data": img_data}
])
try:
# Nettoyer la réponse pour ne garder que le JSON
response_text = response.text.strip()
if response_text.startswith("```json"):
response_text = response_text[7:]
if response_text.endswith("```"):
response_text = response_text[:-3]
response_text = response_text.strip()
# Extraire et formater les Q/R
qa_pairs = json.loads(response_text)
# Vérifier que c'est une liste
if not isinstance(qa_pairs, list):
raise ValueError("La réponse n'est pas une liste")
# Vérifier qu'il y a au moins une question négative
has_negative = False
for qa in qa_pairs:
if qa.get("is_negative", False):
if not qa["answer"].startswith("Cette information ne figure pas dans le document"):
qa["answer"] = "Cette information ne figure pas dans le document. " + qa["answer"]
has_negative = True
if not has_negative:
print("⚠️ Attention: Aucune question négative générée, on réessaie...")
continue
# Vérifier que chaque élément a les bons champs
for qa in qa_pairs:
if not all(key in qa for key in ["query", "answer", "langue", "is_negative"]):
raise ValueError("Un élément ne contient pas tous les champs requis")
# Vérifier que la langue est fr
if qa["langue"] != "fr":
qa["langue"] = "fr"
# Générer un ID unique
qa["id"] = generate_unique_id()
# Ajouter le chemin de l'image avec le nouveau nom
qa["image"] = f"images/{i}.png"
qa["file_name"] = f"images/{i}.png"
all_qa_pairs.extend(qa_pairs)
except json.JSONDecodeError as e:
print(f"Erreur JSON: {str(e)}")
continue
except ValueError as e:
print(f"Erreur de format: {str(e)}")
continue
# Vérifier et supprimer les doublons
unique_qa_pairs = check_duplicates(all_qa_pairs)
# Créer les entrées pour le JSONL
for qa in unique_qa_pairs:
entry = {
"id": qa["id"],
"image": qa["image"],
"query": qa["query"],
"answer": qa["answer"],
"langue": qa["langue"],
"file_name": qa["file_name"],
"is_negative": qa["is_negative"]
}
# Ajouter au fichier JSONL correspondant
jsonl_path = dataset_path / split / "metadata.jsonl"
with open(jsonl_path, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')
# Créer le fichier LICENSE
with open(dataset_path / "LICENSE", 'w') as f:
f.write("""Apache License 2.0
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
""")
progress(0.9, desc="Upload du dataset sur Hugging Face...")
try:
# Uploader le dataset
api.upload_folder(
folder_path=str(dataset_path),
repo_id=dataset_name,
repo_type="dataset"
)
progress(1.0, desc="Terminé !")
return f"✅ Dataset créé avec succès !\n\nAccédez à votre dataset : https://huggingface.co/datasets/{dataset_name}"
except Exception as e:
return f"❌ Erreur lors de l'upload du dataset sur Hugging Face: {str(e)}"
except Exception as e:
return f"❌ Erreur: {str(e)}"
# Interface Gradio
with gr.Blocks() as demo:
gr.Markdown("""
# 🎯 Mini-VLM Dataset Builder
### Créez des datasets de haute qualité pour l'entraînement de modèles Vision-Langage (VLM)
## 🎓 Pourquoi utiliser cette application ?
Cette application permet de créer des datasets de questions/réponses de haute qualité en utilisant l'API Gemini de Google.
L'objectif est de permettre l'entraînement de petits modèles VLM (Vision-Language Models) plus légers et plus rapides,
tout en conservant des performances proches des grands modèles comme GPT-4V ou Gemini.
### 🔄 Pipeline de fonctionnement :
1. **Upload des documents** : PDF ou images (PNG, JPG, JPEG)
2. **Prétraitement** :
- Conversion des PDFs en images
- Normalisation des formats
- Mélange aléatoire pour une meilleure distribution
3. **Analyse par Gemini** :
- Détection automatique de la langue (FR, EN, DE, etc.)
- Génération de 1 à 5 questions/réponses par image
- Inclusion de questions négatives pour l'entraînement
4. **Structuration du dataset** :
- Split automatique (80% train, 10% validation, 10% test)
- Format standardisé avec IDs uniques
- Métadonnées complètes (langue, type de question, etc.)
5. **Publication sur Hugging Face** :
- Création automatique du repository
- Upload des images et métadonnées
- Structure optimisée pour l'entraînement
### ⚠️ Prérequis
1. Une [clé API Gemini](https://makersuite.google.com/app/apikey) pour l'analyse des documents
2. Un [token Hugging Face](https://huggingface.co/settings/tokens) avec droits d'écriture pour la publication
""")
with gr.Row():
with gr.Column(scale=1):
api_key = gr.Textbox(
label="🔑 Clé API Google Gemini",
type="password",
placeholder="Entrez votre clé API Gemini",
value=""
)
hf_token = gr.Textbox(
label="🔑 Token Hugging Face",
type="password",
placeholder="Entrez votre token Hugging Face",
value=""
)
dataset_name = gr.Textbox(
label="📁 Nom du dataset",
placeholder="votre-username/nom-du-dataset",
info="Format requis : username/nom-du-dataset (exemple: marsouuu/finance-dataset-fr)"
)
with gr.Column(scale=1):
files = gr.File(
label="📄 Documents (PDF, PNG, JPG, JPEG)",
file_count="multiple",
height=200
)
gr.Markdown("""
### 📊 Statistiques générées :
- 80% des images pour l'entraînement
- 10% pour la validation
- 10% pour les tests
- 1 à 5 questions par image
- Au moins 1 question négative par image
- Détection automatique de la langue
""")
submit_btn = gr.Button("🚀 Créer le dataset", variant="primary", scale=2)
output = gr.Textbox(
label="📝 Résultat",
lines=3,
interactive=False
)
submit_btn.click(
fn=process_files,
inputs=[api_key, hf_token, files, dataset_name],
outputs=output
)
if __name__ == "__main__":
demo.launch()
else:
# Pour Hugging Face Spaces
app = demo