Anonymous1223334444
Initial commit of multimodal multilingual PDF embedding pipeline
c2e3cf5
import os
import time
import random
import traceback
import tiktoken
import google.generativeai as genai
import vertexai
from vertexai.language_models import TextEmbeddingModel
# Configuration (will be initialized from run_pipeline.py)
# For module, these should ideally be arguments or imported from a config
# GENAI_API_KEY = os.getenv("GENAI_API_KEY")
# PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
# LOCATION = os.getenv("VERTEX_AI_LOCATION")
MULTIMODAL_MODEL_GENAI = "models/gemini-1.5-flash-latest"
TEXT_EMBEDDING_MODEL_VERTEXAI = "text-multilingual-embedding-002"
EMBEDDING_DIMENSION = 768 # text-multilingual-embedding-002 has 768 dimensions
MAX_TOKENS_NORMAL = 500
ENCODING_NAME = "cl100k_base"
# Global client for Vertex AI Text Embedding Model
text_embedding_model_client = None
def initialize_clients(project_id, location, genai_api_key):
"""Initializes Vertex AI and GenAI clients."""
global text_embedding_model_client
if genai_api_key:
genai.configure(api_key=genai_api_key)
print("✓ Google Generative AI configured.")
else:
print("⚠️ AVERTISSEMENT: La clé API Gemini n'est pas définie. La génération de descriptions multimodales échouera.")
if project_id and location:
try:
vertexai.init(project=project_id, location=location)
print(f"✓ Vertex AI SDK initialisé pour le projet {project_id} dans la région {location}.")
text_embedding_model_client = TextEmbeddingModel.from_pretrained(TEXT_EMBEDDING_MODEL_VERTEXAI)
print(f"✓ Modèle d'embedding textuel Vertex AI '{TEXT_EMBEDDING_MODEL_VERTEXAI}' chargé avec succès.")
except Exception as e:
print(f"❌ ERREUR: Échec de l'initialisation du Vertex AI SDK ou du chargement du modèle d'embedding textuel : {str(e)}")
print("⚠️ La génération d'embeddings textuels échouera.")
text_embedding_model_client = None
else:
print("⚠️ Vertex AI SDK non initialisé car l'ID du projet Google Cloud ou la localisation sont manquants.")
print("⚠️ La génération d'embeddings textuels échouera.")
text_embedding_model_client = None
def token_chunking(text, max_tokens, encoding):
"""Chunk text based on token count with smarter boundaries (sentences, paragraphs)"""
if not text:
return []
tokens = encoding.encode(text)
chunks = []
start_token_idx = 0
while start_token_idx < len(tokens):
end_token_idx = min(start_token_idx + max_tokens, len(tokens))
if end_token_idx < len(tokens):
look_ahead_limit = min(start_token_idx + max_tokens * 2, len(tokens))
text_segment_to_check = encoding.decode(tokens[start_token_idx:look_ahead_limit])
paragraph_break = text_segment_to_check.rfind('\n\n', 0, len(text_segment_to_check) - (look_ahead_limit - (start_token_idx + max_tokens)))
if paragraph_break != -1:
tokens_up_to_break = encoding.encode(text_segment_to_check[:paragraph_break])
end_token_idx = start_token_idx + len(tokens_up_to_break)
else:
sentence_end = re.search(r'[.!?]\s+', text_segment_to_check[:len(text_segment_to_check) - (look_ahead_limit - (start_token_idx + max_tokens))][::-1])
if sentence_end:
char_index_in_segment = len(text_segment_to_check) - 1 - sentence_end.start()
tokens_up_to_end = encoding.encode(text_segment_to_check[:char_index_in_segment + 1])
end_token_idx = start_token_idx + len(tokens_up_to_end)
current_chunk_tokens = tokens[start_token_idx:end_token_idx]
chunk_text = encoding.decode(current_chunk_tokens).strip()
if chunk_text:
chunks.append(chunk_text)
if start_token_idx == end_token_idx:
start_token_idx += 1
else:
start_token_idx = end_token_idx
return chunks
def generate_multimodal_description(image_bytes, prompt_text, multimodal_model_genai_name=MULTIMODAL_MODEL_GENAI, max_retries=5, delay=10):
"""
Generate a text description for an image using a multimodal model (google.generativeai).
Returns description text or None if all retries fail or API key is missing.
"""
if not genai.api_key: # Check if API key is configured
print(" Skipping multimodal description generation: GEMINI_API_KEY is not set.")
return None
for attempt in range(max_retries):
try:
time.sleep(delay + random.uniform(0, 5))
content = [
prompt_text,
{
'mime_type': 'image/png',
'data': image_bytes
}
]
model = genai.GenerativeModel(multimodal_model_genai_name)
response = model.generate_content(content)
description = response.text.strip()
if description:
return description
else:
print(f" Tentative {attempt+1}/{max_retries}: Réponse vide ou inattendue du modèle multimodal.")
if attempt < max_retries - 1:
retry_delay = delay * (2 ** attempt) + random.uniform(1, 5)
print(f" Réessai dans {retry_delay:.2f}s...")
time.sleep(retry_delay)
continue
# else:
# print(f" Toutes les {max_retries} tentatives ont échoué pour générer la description.")
# return None
except Exception as e:
error_msg = str(e)
print(f" Tentative {attempt+1}/{max_retries} échouée pour la description : {error_msg}")
if "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower() or "unavailable" in error_msg.lower() or "internal error" in error_msg.lower():
if attempt < max_retries - 1:
retry_delay = delay * (2 ** attempt) + random.uniform(1, 5)
print(f" Erreur d'API retryable détectée. Réessai dans {retry_delay:.2f}s...")
time.sleep(retry_delay)
continue
# else:
# print(f" Toutes les {max_retries} tentatives ont échoué pour la description.")
# return None
else:
print(f" Erreur d'API non retryable détectée : {error_msg}")
traceback.print_exc()
return None
print(f" Toutes les {max_retries} tentatives ont échoué pour la description (fin de boucle).")
return None
def generate_text_embedding(text_content, max_retries=5, delay=5):
"""
Generate text embedding using the Vertex AI multilingual embedding model.
Returns embedding vector (list) or None if all retries fail or client is not initialized.
"""
global text_embedding_model_client # Ensure we are using the global client
if not text_embedding_model_client:
print(" Skipping text embedding generation: Vertex AI embedding client is not initialized.")
return None
if not text_content or not text_content.strip():
return None # Cannot embed empty text
for attempt in range(max_retries):
try:
time.sleep(delay + random.uniform(0, 2))
embeddings = text_embedding_model_client.get_embeddings( # Corrected method name
[text_content] # Removed task_type
)
if embeddings and len(embeddings) > 0 and hasattr(embeddings[0], 'values') and isinstance(embeddings[0].values, list) and len(embeddings[0].values) == EMBEDDING_DIMENSION:
return embeddings[0].values
else:
print(f" Tentative {attempt+1}/{max_retries}: Format d'embedding Vertex AI inattendu. Réponse : {embeddings}")
return None
except Exception as e:
error_msg = str(e)
print(f" Tentative {attempt+1}/{max_retries} échouée pour l'embedding Vertex AI : {error_msg}")
if "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower() or "unavailable" in error_msg.lower() or "internal error" in error_msg.lower():
if attempt < max_retries - 1:
retry_delay = delay * (2 ** attempt) + random.uniform(1, 5)
print(f" Erreur d'API Vertex AI retryable détectée. Réessai dans {retry_delay:.2f}s...")
time.sleep(retry_delay)
continue
# else:
# print(f" Toutes les {max_retries} tentatives ont échoué pour l'embedding Vertex AI.")
# return None
else:
print(f" Erreur d'API Vertex AI non retryable détectée : {error_msg}")
traceback.print_exc()
return None
print(f" Toutes les {max_retries} tentatives ont échoué pour l'embedding Vertex AI (fin de boucle).")
return None