martialroberge
Amélioration du prompt pour la détection de langue et de l'interface utilisateur
cbae9ff
import gradio as gr | |
import os | |
import json | |
import shutil | |
from pathlib import Path | |
import google.generativeai as genai | |
from huggingface_hub import HfApi, create_repo | |
import tempfile | |
import uuid | |
from PIL import Image | |
import io | |
import hashlib | |
from typing import List, Dict, Set | |
import pdf2image | |
import fitz # PyMuPDF | |
from dotenv import load_dotenv | |
# Charger les variables d'environnement | |
load_dotenv() | |
# Récupérer les clés API depuis les variables d'environnement | |
DEFAULT_GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "") | |
DEFAULT_HF_TOKEN = os.getenv("HF_TOKEN", "") | |
def generate_unique_id() -> str: | |
"""Génère un ID unique de 50 caractères""" | |
# Générer deux UUID4 et les combiner | |
uuid1 = str(uuid.uuid4()) | |
uuid2 = str(uuid.uuid4()) | |
# Prendre les 25 premiers caractères de chaque UUID | |
return f"{uuid1[:25]}{uuid2[:25]}" | |
def check_duplicates(entries: List[Dict]) -> List[Dict]: | |
"""Vérifie et supprime les doublons dans les entrées du dataset""" | |
seen_questions = set() | |
seen_images = set() | |
unique_entries = [] | |
for entry in entries: | |
# Créer une clé unique basée sur la question et l'image | |
question_key = entry['query'].lower().strip() | |
image_key = entry['image'] | |
# Vérifier si la question est similaire à une question existante | |
is_duplicate = False | |
for seen_question in seen_questions: | |
# Calculer la similarité entre les questions | |
if question_key in seen_question or seen_question in question_key: | |
is_duplicate = True | |
break | |
# Vérifier si l'image a déjà été utilisée trop de fois | |
image_count = sum(1 for e in unique_entries if e['image'] == image_key) | |
if not is_duplicate and image_count < 5: # Limite à 5 questions par image | |
seen_questions.add(question_key) | |
seen_images.add(image_key) | |
unique_entries.append(entry) | |
return unique_entries | |
def process_files(api_key: str, hf_token: str, files: List[str], dataset_name: str, progress=gr.Progress()) -> str: | |
""" | |
Traite les fichiers (images ou PDFs) et crée le dataset | |
""" | |
try: | |
print(f"🔍 Début du traitement avec {len(files)} fichiers") | |
# Dossier temporaire pour toutes les images avant mélange | |
with tempfile.TemporaryDirectory() as temp_images_dir: | |
print("📂 Création du dossier temporaire pour les images") | |
# Compteur pour numéroter les images | |
image_counter = 1 | |
# Liste pour stocker tous les chemins d'images | |
all_images = [] | |
# Traiter d'abord tous les PDFs | |
print("📄 Traitement des PDFs...") | |
for file in files: | |
if file.name.lower().endswith('.pdf'): | |
print(f"📄 Conversion du PDF: {file.name}") | |
try: | |
# Ouvrir le PDF avec PyMuPDF | |
pdf_document = fitz.open(file.name) | |
# Convertir chaque page en image | |
for page_num in range(len(pdf_document)): | |
page = pdf_document[page_num] | |
pix = page.get_pixmap() | |
image_path = os.path.join(temp_images_dir, f"{image_counter}.png") | |
pix.save(image_path) | |
all_images.append(image_path) | |
print(f" 📄 Page {page_num + 1} convertie en {image_counter}.png") | |
image_counter += 1 | |
pdf_document.close() | |
except Exception as e: | |
print(f"❌ Erreur lors de la conversion du PDF {file.name}: {str(e)}") | |
continue | |
# Traiter ensuite toutes les images | |
print("🖼️ Traitement des images...") | |
for file in files: | |
file_lower = file.name.lower() | |
if file_lower.endswith(('.png', '.jpg', '.jpeg')): | |
try: | |
# Copier et renommer l'image | |
new_path = os.path.join(temp_images_dir, f"{image_counter}.png") | |
# Convertir en PNG si nécessaire | |
if file_lower.endswith(('.jpg', '.jpeg')): | |
img = Image.open(file.name) | |
img.save(new_path, 'PNG') | |
else: | |
shutil.copy2(file.name, new_path) | |
all_images.append(new_path) | |
print(f"🖼️ Image {file.name} copiée en {image_counter}.png") | |
image_counter += 1 | |
except Exception as e: | |
print(f"❌ Erreur lors du traitement de l'image {file.name}: {str(e)}") | |
continue | |
if not all_images: | |
return "❌ Erreur: Aucune image valide trouvée. Veuillez fournir des fichiers PDF ou des images (PNG, JPG, JPEG)." | |
# Mélanger toutes les images | |
print(f"🔄 Mélange des {len(all_images)} images...") | |
import random | |
random.shuffle(all_images) | |
# Créer un nouveau dossier pour les images mélangées et renumérotées | |
with tempfile.TemporaryDirectory() as final_images_dir: | |
final_images = [] | |
for i, image_path in enumerate(all_images, 1): | |
new_path = os.path.join(final_images_dir, f"{i}.png") | |
shutil.copy2(image_path, new_path) | |
final_images.append(new_path) | |
print(f"📝 Image {os.path.basename(image_path)} renumérotée en {i}.png") | |
print(f"✅ Total des images à traiter: {len(final_images)}") | |
# Continuer avec le traitement des images | |
return process_images(api_key, hf_token, final_images, dataset_name, progress) | |
except Exception as e: | |
return f"❌ Erreur lors du traitement des fichiers: {str(e)}" | |
def process_images(api_key, hf_token, images, dataset_name, progress=gr.Progress()): | |
""" | |
Traite les images et crée le dataset | |
""" | |
try: | |
print(f"🔍 Début du traitement avec {len(images)} images") | |
print(f"🔑 API Key: {api_key[:5]}...") | |
print(f"🔑 HF Token: {hf_token[:5]}...") | |
print(f"📁 Dataset name: {dataset_name}") | |
if not api_key or not hf_token: | |
print("❌ Erreur: API Key ou HF Token manquant") | |
return "❌ Erreur: Veuillez entrer votre clé API Google Gemini et votre token Hugging Face." | |
# Configuration de l'API Gemini | |
print("⚙️ Configuration de l'API Gemini...") | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel('gemini-1.5-flash') | |
# Créer d'abord le repository sur Hugging Face | |
try: | |
print("📦 Création du repository sur Hugging Face...") | |
api = HfApi(token=hf_token) | |
create_repo(dataset_name, repo_type="dataset", token=hf_token, exist_ok=True) | |
print("✅ Repository créé avec succès") | |
except Exception as e: | |
print(f"❌ Erreur lors de la création du repository: {str(e)}") | |
return f"❌ Erreur lors de la création du repository sur Hugging Face: {str(e)}" | |
# Création d'un dossier temporaire pour le dataset | |
with tempfile.TemporaryDirectory() as temp_dir: | |
print(f"📂 Création du dossier temporaire: {temp_dir}") | |
# Extraire le nom du dataset sans le nom d'utilisateur | |
repo_name = dataset_name.split('/')[-1] | |
dataset_path = Path(temp_dir) / repo_name | |
dataset_path.mkdir() | |
# Création des dossiers pour les splits | |
splits = ['train', 'validation', 'test'] | |
for split in splits: | |
(dataset_path / split / 'images').mkdir(parents=True) | |
print(f"📁 Création du dossier {split}") | |
# Mélanger les images aléatoirement | |
import random | |
random.shuffle(images) | |
total_images = len(images) | |
progress(0, desc="Démarrage du traitement...") | |
# Créer un dossier temporaire pour les images renommées | |
with tempfile.TemporaryDirectory() as renamed_images_dir: | |
renamed_images = [] | |
print("🔄 Renommage des images...") | |
# Renommer et copier toutes les images d'abord | |
for i, image in enumerate(images, 1): | |
new_image_path = Path(renamed_images_dir) / f"{i}.png" | |
shutil.copy2(image, new_image_path) | |
renamed_images.append(str(new_image_path)) | |
print(f"📝 Image {image} renommée en {i}.png") | |
# Traitement des images renommées | |
for i, image in enumerate(renamed_images, 1): | |
print(f"\n🖼️ Traitement de l'image {i}/{total_images}") | |
progress(i / total_images, desc=f"Traitement de l'image {i}/{total_images}") | |
# Déterminer le split (80% train, 10% validation, 10% test) | |
if i <= len(images) * 0.8: | |
split = 'train' | |
elif i <= len(images) * 0.9: | |
split = 'validation' | |
else: | |
split = 'test' | |
print(f"📂 Split: {split}") | |
# Copier l'image | |
image_path = dataset_path / split / 'images' / f"{i}.png" | |
print(f"📄 Copie de l'image vers: {image_path}") | |
shutil.copy2(image, image_path) | |
# Générer les questions/réponses avec Gemini | |
all_qa_pairs = [] | |
# Une seule tentative par image | |
with open(image, 'rb') as img_file: | |
img_data = img_file.read() | |
# Générer un nombre aléatoire de questions à poser | |
nb_questions = random.randint(1, 5) | |
print(f"❓ Génération de {nb_questions} questions...") | |
prompt = f"""Tu es un expert en analyse financière, en création de datasets de haute qualité et en analyse multilingue. Examine attentivement ce document financier, IDENTIFIE D'ABORD SA LANGUE, puis génère exactement {nb_questions} questions/réponses de qualité professionnelle dans la même langue que le document. | |
ÉTAPE 1 - IDENTIFICATION DE LA LANGUE : | |
- Analyse attentivement le texte dans l'image | |
- Identifie la langue principale (fr pour français, en pour anglais, de pour allemand, etc.) | |
- Utilise cette langue pour toutes les questions et réponses | |
- Si plusieurs langues sont présentes, choisis la langue dominante | |
Format de réponse requis (JSON) : | |
[ | |
{{ | |
"query": "Question financière précise et détaillée dans la langue du document", | |
"answer": "Réponse complète et exacte dans la langue du document", | |
"langue": "code ISO de la langue (fr, en, de, etc.)", | |
"is_negative": false | |
}} | |
] | |
Instructions détaillées pour la création du dataset : | |
1. TYPES DE QUESTIONS FINANCIÈRES (sois créatif et précis) : | |
- Analyse quantitative : | |
* Montants exacts et variations | |
* Pourcentages et ratios financiers | |
* Évolutions temporelles | |
* Comparaisons chiffrées | |
- Analyse qualitative : | |
* Stratégies et objectifs | |
* Risques et opportunités | |
* Contexte réglementaire | |
* Implications business | |
- Dates et échéances : | |
* Périodes de reporting | |
* Dates clés | |
* Échéances importantes | |
* Historique des événements | |
2. QUESTIONS NÉGATIVES (TRÈS IMPORTANT) : | |
- Tu DOIS générer au moins 1 question sur {nb_questions} où l'information n'est PAS dans le document | |
- Pour ces questions, tu DOIS mettre "is_negative": true | |
- La réponse DOIT commencer par une phrase indiquant l'absence d'information dans la langue du document : | |
* FR: "Cette information ne figure pas dans le document" | |
* EN: "This information is not present in the document" | |
* DE: "Diese Information ist im Dokument nicht enthalten" | |
- Les questions négatives doivent être plausibles et pertinentes pour un document financier | |
3. QUALITÉ DES QUESTIONS : | |
- Précision : utilise des chiffres exacts quand possible | |
- Clarté : questions non ambiguës | |
- Pertinence : focus sur les aspects financiers importants | |
- Variété : mélange différents types de questions | |
- Profondeur : questions qui nécessitent une analyse approfondie | |
4. QUALITÉ DES RÉPONSES : | |
- Pour les questions normales (is_negative: false) : | |
* Exactitude : informations vérifiables dans le document | |
* Complétude : réponses détaillées et exhaustives | |
* Clarté : formulation professionnelle et précise | |
* Contexte : inclure les éléments de contexte pertinents | |
- Pour les questions négatives (is_negative: true) : | |
* TOUJOURS commencer par la phrase d'absence d'information dans la bonne langue | |
* Expliquer brièvement pourquoi cette information serait pertinente | |
5. RÈGLES STRICTES : | |
- Questions et réponses UNIQUEMENT dans la langue détectée du document | |
- Pas de questions vagues ou générales | |
- Pas de répétition de questions similaires | |
- Pas de devinettes ou d'inférences non documentées | |
- Respect strict du format JSON demandé | |
- Au moins 1 question négative (is_negative: true) par image | |
- Code langue ISO correct (fr, en, de, etc.) | |
La réponse doit être uniquement le JSON, sans texte supplémentaire.""" | |
response = model.generate_content([ | |
prompt, | |
{"mime_type": "image/png", "data": img_data} | |
]) | |
try: | |
# Nettoyer la réponse pour ne garder que le JSON | |
response_text = response.text.strip() | |
if response_text.startswith("```json"): | |
response_text = response_text[7:] | |
if response_text.endswith("```"): | |
response_text = response_text[:-3] | |
response_text = response_text.strip() | |
# Extraire et formater les Q/R | |
qa_pairs = json.loads(response_text) | |
# Vérifier que c'est une liste | |
if not isinstance(qa_pairs, list): | |
raise ValueError("La réponse n'est pas une liste") | |
# Vérifier qu'il y a au moins une question négative | |
has_negative = False | |
for qa in qa_pairs: | |
if qa.get("is_negative", False): | |
if not qa["answer"].startswith("Cette information ne figure pas dans le document"): | |
qa["answer"] = "Cette information ne figure pas dans le document. " + qa["answer"] | |
has_negative = True | |
if not has_negative: | |
print("⚠️ Attention: Aucune question négative générée, on réessaie...") | |
continue | |
# Vérifier que chaque élément a les bons champs | |
for qa in qa_pairs: | |
if not all(key in qa for key in ["query", "answer", "langue", "is_negative"]): | |
raise ValueError("Un élément ne contient pas tous les champs requis") | |
# Vérifier que la langue est fr | |
if qa["langue"] != "fr": | |
qa["langue"] = "fr" | |
# Générer un ID unique | |
qa["id"] = generate_unique_id() | |
# Ajouter le chemin de l'image avec le nouveau nom | |
qa["image"] = f"images/{i}.png" | |
qa["file_name"] = f"images/{i}.png" | |
all_qa_pairs.extend(qa_pairs) | |
except json.JSONDecodeError as e: | |
print(f"Erreur JSON: {str(e)}") | |
continue | |
except ValueError as e: | |
print(f"Erreur de format: {str(e)}") | |
continue | |
# Vérifier et supprimer les doublons | |
unique_qa_pairs = check_duplicates(all_qa_pairs) | |
# Créer les entrées pour le JSONL | |
for qa in unique_qa_pairs: | |
entry = { | |
"id": qa["id"], | |
"image": qa["image"], | |
"query": qa["query"], | |
"answer": qa["answer"], | |
"langue": qa["langue"], | |
"file_name": qa["file_name"], | |
"is_negative": qa["is_negative"] | |
} | |
# Ajouter au fichier JSONL correspondant | |
jsonl_path = dataset_path / split / "metadata.jsonl" | |
with open(jsonl_path, 'a', encoding='utf-8') as f: | |
f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
# Créer le fichier LICENSE | |
with open(dataset_path / "LICENSE", 'w') as f: | |
f.write("""Apache License 2.0 | |
Copyright [yyyy] [name of copyright owner] | |
Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | |
You may obtain a copy of the License at | |
http://www.apache.org/licenses/LICENSE-2.0 | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
""") | |
progress(0.9, desc="Upload du dataset sur Hugging Face...") | |
try: | |
# Uploader le dataset | |
api.upload_folder( | |
folder_path=str(dataset_path), | |
repo_id=dataset_name, | |
repo_type="dataset" | |
) | |
progress(1.0, desc="Terminé !") | |
return f"✅ Dataset créé avec succès !\n\nAccédez à votre dataset : https://huggingface.co/datasets/{dataset_name}" | |
except Exception as e: | |
return f"❌ Erreur lors de l'upload du dataset sur Hugging Face: {str(e)}" | |
except Exception as e: | |
return f"❌ Erreur: {str(e)}" | |
# Interface Gradio | |
with gr.Blocks() as demo: | |
gr.Markdown(""" | |
# 🎯 Mini-VLM Dataset Builder | |
### Créez des datasets de haute qualité pour l'entraînement de modèles Vision-Langage (VLM) | |
## 🎓 Pourquoi utiliser cette application ? | |
Cette application permet de créer des datasets de questions/réponses de haute qualité en utilisant l'API Gemini de Google. | |
L'objectif est de permettre l'entraînement de petits modèles VLM (Vision-Language Models) plus légers et plus rapides, | |
tout en conservant des performances proches des grands modèles comme GPT-4V ou Gemini. | |
### 🔄 Pipeline de fonctionnement : | |
1. **Upload des documents** : PDF ou images (PNG, JPG, JPEG) | |
2. **Prétraitement** : | |
- Conversion des PDFs en images | |
- Normalisation des formats | |
- Mélange aléatoire pour une meilleure distribution | |
3. **Analyse par Gemini** : | |
- Détection automatique de la langue (FR, EN, DE, etc.) | |
- Génération de 1 à 5 questions/réponses par image | |
- Inclusion de questions négatives pour l'entraînement | |
4. **Structuration du dataset** : | |
- Split automatique (80% train, 10% validation, 10% test) | |
- Format standardisé avec IDs uniques | |
- Métadonnées complètes (langue, type de question, etc.) | |
5. **Publication sur Hugging Face** : | |
- Création automatique du repository | |
- Upload des images et métadonnées | |
- Structure optimisée pour l'entraînement | |
### ⚠️ Prérequis | |
1. Une [clé API Gemini](https://makersuite.google.com/app/apikey) pour l'analyse des documents | |
2. Un [token Hugging Face](https://huggingface.co/settings/tokens) avec droits d'écriture pour la publication | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
api_key = gr.Textbox( | |
label="🔑 Clé API Google Gemini", | |
type="password", | |
placeholder="Entrez votre clé API Gemini", | |
value="" | |
) | |
hf_token = gr.Textbox( | |
label="🔑 Token Hugging Face", | |
type="password", | |
placeholder="Entrez votre token Hugging Face", | |
value="" | |
) | |
dataset_name = gr.Textbox( | |
label="📁 Nom du dataset", | |
placeholder="votre-username/nom-du-dataset", | |
info="Format requis : username/nom-du-dataset (exemple: marsouuu/finance-dataset-fr)" | |
) | |
with gr.Column(scale=1): | |
files = gr.File( | |
label="📄 Documents (PDF, PNG, JPG, JPEG)", | |
file_count="multiple", | |
height=200 | |
) | |
gr.Markdown(""" | |
### 📊 Statistiques générées : | |
- 80% des images pour l'entraînement | |
- 10% pour la validation | |
- 10% pour les tests | |
- 1 à 5 questions par image | |
- Au moins 1 question négative par image | |
- Détection automatique de la langue | |
""") | |
submit_btn = gr.Button("🚀 Créer le dataset", variant="primary", scale=2) | |
output = gr.Textbox( | |
label="📝 Résultat", | |
lines=3, | |
interactive=False | |
) | |
submit_btn.click( | |
fn=process_files, | |
inputs=[api_key, hf_token, files, dataset_name], | |
outputs=output | |
) | |
if __name__ == "__main__": | |
demo.launch() | |
else: | |
# Pour Hugging Face Spaces | |
app = demo |