Spaces:
Sleeping
Sleeping
| import requests | |
| import fitz | |
| import textwrap | |
| import os | |
| import google.generativeai as genai | |
| from dotenv import load_dotenv | |
| from pinecone import Pinecone, ServerlessSpec | |
| import hashlib | |
| import time | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY") | |
| PINECONE_ENVIRONMENT = os.environ.get("PINECONE_ENVIRONMENT") | |
| # Initialize clients | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| # --- CORRECTED FUNCTION: Handles both URLs and binary file content --- | |
| def get_document_text(source) -> str: | |
| """ | |
| Extracts text from a document, handling either a URL or raw binary content. | |
| """ | |
| document_content = None | |
| if isinstance(source, str): # If the source is a URL string | |
| print(f"Downloading document from {source}...") | |
| try: | |
| response = requests.get(source) | |
| response.raise_for_status() | |
| document_content = response.content | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error downloading the document: {e}") | |
| return "" | |
| elif isinstance(source, bytes): # If the source is raw file content (from upload) | |
| print("Processing uploaded document content...") | |
| document_content = source | |
| else: | |
| print("Invalid source type provided to get_document_text.") | |
| return "" | |
| if not document_content: | |
| return "" | |
| print("Extracting text from the document...") | |
| document_text = "" | |
| try: | |
| pdf_document = fitz.open(stream=document_content, filetype="pdf") | |
| for page_num in range(len(pdf_document)): | |
| page = pdf_document.load_page(page_num) | |
| document_text += page.get_text() | |
| except Exception as e: | |
| print(f"Error extracting text: {e}") | |
| return "" | |
| return document_text | |
| def create_document_id(source: str) -> str: | |
| """Creates a stable SHA256 hash of the URL to use as a document ID.""" | |
| return hashlib.sha256(source.encode()).hexdigest() | |
| def split_text_into_chunks(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> list[str]: | |
| """ | |
| Splits a large text document into smaller, overlapping chunks using a recursive strategy. | |
| """ | |
| def _recursive_split(t, separators, size, overlap): | |
| if not separators: | |
| return textwrap.wrap(t, size) | |
| current_sep = separators[0] | |
| other_seps = separators[1:] | |
| parts = t.split(current_sep) | |
| chunks = [] | |
| for part in parts: | |
| if len(part) > size: | |
| chunks.extend(_recursive_split(part, other_seps, size, overlap)) | |
| else: | |
| chunks.append(part) | |
| final_chunks = [] | |
| if chunks: | |
| current_chunk = chunks[0] | |
| for i in range(1, len(chunks)): | |
| if len(current_chunk) + len(chunks[i]) <= size + overlap: | |
| current_chunk += current_sep + chunks[i] | |
| else: | |
| final_chunks.append(current_chunk) | |
| current_chunk = chunks[i] | |
| final_chunks.append(current_chunk) | |
| return [c for c in final_chunks if c.strip()] | |
| separators = ["\n\n", "\n", ". ", " "] | |
| chunks = _recursive_split(text, separators, chunk_size, chunk_overlap) | |
| return chunks | |
| def generate_embeddings(text_chunks: list[str]) -> list: | |
| """ | |
| Generates vector embeddings for a list of text chunks using Gemini Pro API. | |
| """ | |
| print(f"Generating embeddings for {len(text_chunks)} chunks using Gemini Pro...") | |
| embeddings = [] | |
| try: | |
| response = genai.embed_content( | |
| model="models/embedding-001", | |
| content=text_chunks | |
| ) | |
| embeddings = response['embedding'] | |
| print("Embeddings generated successfully.") | |
| except Exception as e: | |
| print(f"Error generating embeddings: {e}") | |
| return embeddings | |
| def index_chunks_in_pinecone(chunks: list[str], embeddings: list, index_name: str, namespace: str): | |
| """ | |
| Indexes the text chunks and their embeddings in a specific Pinecone namespace. | |
| """ | |
| print(f"Indexing {len(chunks)} chunks in Pinecone index '{index_name}' under namespace '{namespace}'...") | |
| try: | |
| # Check if index exists, and create if it doesn't | |
| if index_name not in pc.list_indexes().names(): | |
| print(f"Creating new Pinecone index: '{index_name}'") | |
| pc.create_index( | |
| name=index_name, | |
| dimension=len(embeddings[0]), | |
| metric='cosine', | |
| spec=ServerlessSpec(cloud='aws', region='us-east-1') | |
| ) | |
| print("Index created successfully. Waiting for it to become ready...") | |
| # Wait for index to be ready | |
| while not pc.describe_index(index_name).status.ready: | |
| time.sleep(1) | |
| index = pc.Index(index_name) | |
| # Prepare data for upsert | |
| vectors_to_upsert = [] | |
| for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): | |
| vectors_to_upsert.append({ | |
| "id": f"chunk-{namespace}-{i}", # Make ID unique across namespaces | |
| "values": embedding, | |
| "metadata": {"text": chunk} | |
| }) | |
| # Upsert in batches | |
| batch_size = 100 | |
| for i in range(0, len(vectors_to_upsert), batch_size): | |
| batch = vectors_to_upsert[i:i + batch_size] | |
| index.upsert(vectors=batch, namespace=namespace) # <-- USE THE NAMESPACE | |
| print(f"Upserted batch {i // batch_size + 1} into namespace '{namespace}'") | |
| print(f"Successfully indexed {len(chunks)} chunks in namespace '{namespace}'.") | |
| # Give a moment for the index to become queryable | |
| time.sleep(5) | |
| except Exception as e: | |
| print(f"Error indexing in Pinecone: {e}") | |
| if __name__ == "__main__": | |
| sample_url = "https://hackrx.blob.core.windows.net/assets/hackrx_6/policies/BAJHLIP23020V012223.pdf?sv=2023-01-03&st=2025-07-30T06%3A46%3A49Z&se=2025-09-01T06%3A46%3A00Z&sr=c&sp=rl&sig=9szykRKdGYj0BVm1skP%2BX8N9%2FRENEn2k7MQPUp33jyQ%3D" | |
| index_name = "hackrx-policy-index" | |
| document_content = get_document_text(sample_url) | |
| if document_content: | |
| chunks = split_text_into_chunks(document_content) | |
| print(f"\n--- Document Split into {len(chunks)} Chunks ---") | |
| embeddings = generate_embeddings(chunks) | |
| if embeddings: | |
| print(f"Generated {len(embeddings)} embeddings.") | |
| print(f"Size of each embedding vector: {len(embeddings[0])}") | |
| # Index the chunks in Pinecone | |
| print("--- Running standalone script test ---") | |
| test_namespace = create_document_id(sample_url) # Use the new function! | |
| index_chunks_in_pinecone(chunks, embeddings, index_name, namespace=test_namespace) | |
| else: | |
| print("Failed to generate embeddings. Pinecone indexing skipped.") | |
| else: | |
| print("Failed to process document content.") |