Insurance_DocAI / data_processor.py
archis99's picture
Initial project commit with app files
87c78a9
import requests
import fitz
import textwrap
import os
import google.generativeai as genai
from dotenv import load_dotenv
from pinecone import Pinecone, ServerlessSpec
import hashlib
import time
# Load environment variables from .env file
load_dotenv()
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
PINECONE_ENVIRONMENT = os.environ.get("PINECONE_ENVIRONMENT")
# Initialize clients
genai.configure(api_key=GOOGLE_API_KEY)
pc = Pinecone(api_key=PINECONE_API_KEY)
# --- CORRECTED FUNCTION: Handles both URLs and binary file content ---
def get_document_text(source) -> str:
"""
Extracts text from a document, handling either a URL or raw binary content.
"""
document_content = None
if isinstance(source, str): # If the source is a URL string
print(f"Downloading document from {source}...")
try:
response = requests.get(source)
response.raise_for_status()
document_content = response.content
except requests.exceptions.RequestException as e:
print(f"Error downloading the document: {e}")
return ""
elif isinstance(source, bytes): # If the source is raw file content (from upload)
print("Processing uploaded document content...")
document_content = source
else:
print("Invalid source type provided to get_document_text.")
return ""
if not document_content:
return ""
print("Extracting text from the document...")
document_text = ""
try:
pdf_document = fitz.open(stream=document_content, filetype="pdf")
for page_num in range(len(pdf_document)):
page = pdf_document.load_page(page_num)
document_text += page.get_text()
except Exception as e:
print(f"Error extracting text: {e}")
return ""
return document_text
def create_document_id(source: str) -> str:
"""Creates a stable SHA256 hash of the URL to use as a document ID."""
return hashlib.sha256(source.encode()).hexdigest()
def split_text_into_chunks(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> list[str]:
"""
Splits a large text document into smaller, overlapping chunks using a recursive strategy.
"""
def _recursive_split(t, separators, size, overlap):
if not separators:
return textwrap.wrap(t, size)
current_sep = separators[0]
other_seps = separators[1:]
parts = t.split(current_sep)
chunks = []
for part in parts:
if len(part) > size:
chunks.extend(_recursive_split(part, other_seps, size, overlap))
else:
chunks.append(part)
final_chunks = []
if chunks:
current_chunk = chunks[0]
for i in range(1, len(chunks)):
if len(current_chunk) + len(chunks[i]) <= size + overlap:
current_chunk += current_sep + chunks[i]
else:
final_chunks.append(current_chunk)
current_chunk = chunks[i]
final_chunks.append(current_chunk)
return [c for c in final_chunks if c.strip()]
separators = ["\n\n", "\n", ". ", " "]
chunks = _recursive_split(text, separators, chunk_size, chunk_overlap)
return chunks
def generate_embeddings(text_chunks: list[str]) -> list:
"""
Generates vector embeddings for a list of text chunks using Gemini Pro API.
"""
print(f"Generating embeddings for {len(text_chunks)} chunks using Gemini Pro...")
embeddings = []
try:
response = genai.embed_content(
model="models/embedding-001",
content=text_chunks
)
embeddings = response['embedding']
print("Embeddings generated successfully.")
except Exception as e:
print(f"Error generating embeddings: {e}")
return embeddings
def index_chunks_in_pinecone(chunks: list[str], embeddings: list, index_name: str, namespace: str):
"""
Indexes the text chunks and their embeddings in a specific Pinecone namespace.
"""
print(f"Indexing {len(chunks)} chunks in Pinecone index '{index_name}' under namespace '{namespace}'...")
try:
# Check if index exists, and create if it doesn't
if index_name not in pc.list_indexes().names():
print(f"Creating new Pinecone index: '{index_name}'")
pc.create_index(
name=index_name,
dimension=len(embeddings[0]),
metric='cosine',
spec=ServerlessSpec(cloud='aws', region='us-east-1')
)
print("Index created successfully. Waiting for it to become ready...")
# Wait for index to be ready
while not pc.describe_index(index_name).status.ready:
time.sleep(1)
index = pc.Index(index_name)
# Prepare data for upsert
vectors_to_upsert = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
vectors_to_upsert.append({
"id": f"chunk-{namespace}-{i}", # Make ID unique across namespaces
"values": embedding,
"metadata": {"text": chunk}
})
# Upsert in batches
batch_size = 100
for i in range(0, len(vectors_to_upsert), batch_size):
batch = vectors_to_upsert[i:i + batch_size]
index.upsert(vectors=batch, namespace=namespace) # <-- USE THE NAMESPACE
print(f"Upserted batch {i // batch_size + 1} into namespace '{namespace}'")
print(f"Successfully indexed {len(chunks)} chunks in namespace '{namespace}'.")
# Give a moment for the index to become queryable
time.sleep(5)
except Exception as e:
print(f"Error indexing in Pinecone: {e}")
if __name__ == "__main__":
sample_url = "https://hackrx.blob.core.windows.net/assets/hackrx_6/policies/BAJHLIP23020V012223.pdf?sv=2023-01-03&st=2025-07-30T06%3A46%3A49Z&se=2025-09-01T06%3A46%3A00Z&sr=c&sp=rl&sig=9szykRKdGYj0BVm1skP%2BX8N9%2FRENEn2k7MQPUp33jyQ%3D"
index_name = "hackrx-policy-index"
document_content = get_document_text(sample_url)
if document_content:
chunks = split_text_into_chunks(document_content)
print(f"\n--- Document Split into {len(chunks)} Chunks ---")
embeddings = generate_embeddings(chunks)
if embeddings:
print(f"Generated {len(embeddings)} embeddings.")
print(f"Size of each embedding vector: {len(embeddings[0])}")
# Index the chunks in Pinecone
print("--- Running standalone script test ---")
test_namespace = create_document_id(sample_url) # Use the new function!
index_chunks_in_pinecone(chunks, embeddings, index_name, namespace=test_namespace)
else:
print("Failed to generate embeddings. Pinecone indexing skipped.")
else:
print("Failed to process document content.")