hackrxsubmission / main.py
shreyanshknayak's picture
Upload 4 files
437d8b7 verified
import os
import json
import tempfile
import requests
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict, Union
from dotenv import load_dotenv
from processing_utility import download_and_parse_document, extract_schema_from_file
# Import functions and constants from the colbert_utils.py file
# Make sure colbert_utils.py is in the same directory or accessible via PYTHONPATH
from rag_utils import (
process_markdown_with_manual_sections,
perform_vector_search,
generate_answer_with_groq,
CHUNK_SIZE,
CHUNK_OVERLAP,
TOP_K_CHUNKS,
GROQ_MODEL_NAME
)
load_dotenv()
# --- FastAPI App Initialization ---
app = FastAPI(
title="HackRX RAG API",
description="API for Retrieval-Augmented Generation from PDF documents.",
version="1.0.0",
)
# --- Groq API Key Setup ---
# It's highly recommended to set this as an environment variable in production.
GROQ_API_KEY = os.environ.get("GROQ_API_KEY", "NOT_FOUND")
if GROQ_API_KEY == "NOT_FOUND":
print("WARNING: GROQ_API_KEY is using a placeholder or hardcoded value. Please set GROQ_API_KEY environment variable for production.")
# --- Pydantic Models for Request and Response ---
class RunRequest(BaseModel):
documents: str # URL to the PDF document
questions: List[str]
class Answer(BaseModel):
answer: str
class RunResponse(BaseModel):
answers: List[Answer]
# --- Pseudo-functions (Replace with actual implementations if needed) ---
def convert_to_markdown(pdf_url: str) -> str:
"""
PSEUDO-FUNCTION: Downloads the PDF from the URL and returns its local path.
In a real scenario, this might involve converting PDF to Markdown,
but for process_pdf_with_manual_sections, we just need the local PDF path.
"""
print(f"Downloading PDF from: {pdf_url}")
try:
response = requests.get(pdf_url, stream=True)
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
# Create a temporary file to store the PDF
temp_pdf_file = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf")
for chunk in response.iter_content(chunk_size=8192):
temp_pdf_file.write(chunk)
temp_pdf_file.close()
print(f"PDF downloaded to temporary path: {temp_pdf_file.name}")
return temp_pdf_file.name
except requests.exceptions.RequestException as e:
raise HTTPException(status_code=500, detail=f"Failed to download PDF from URL: {e}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"An unexpected error occurred during PDF download: {e}")
def fetch_headings_json(pdf_url: str) -> Dict:
"""
PSEUDO-FUNCTION: Fetches section headings for the PDF.
In a real scenario, this would involve a more sophisticated service
or logic to extract headings from the PDF.
For this example, we return a hardcoded dummy JSON.
"""
print(f"Fetching headings for PDF URL (pseudo-function): {pdf_url}")
# This dummy JSON should match the expected schema for process_pdf_with_manual_sections
# {"data":{"headings": ["Your Heading"]}}
dummy_headings = {
"run_id": "dummy-run-id",
"extraction_agent_id": "dummy-agent-id",
"data": {
"headings": [
"Policy Wordings",
"SECTION A) PREAMBLE",
"SECTION B) DEFINITIONS - STANDARD DEFINITIONS",
"SECTION B) DEFINITIONS - SPECIFIC DEFINITIONS",
"SECTION C) BENEFITS COVERED UNDER THE POLICY",
"PART A- COVERAGE- Domestic (Within India Only, for Imperial and Imperial Plus Plans)",
"PART B- COVERAGE- International",
"SECTION D) EXCLUSIONS- STANDARD EXCLUSIONS APPLICABLE TO PART A- DOMESTIC COVER UNDER SECTION C) BENEFITS COVERED UNDER THE POLICY",
"SECTION D) EXCLUSIONS– SPECIFIC EXCLUSIONS APPLICABLE TO PART A- DOMESTIC COVER UNDER SECTION C) BENEFITS COVERED UNDER THE POLICY",
"SECTION D) EXCLUSIONS- STANDARD EXCLUSIONS APPLICABLE TO PART B- INTERNATIONAL COVER UNDER SECTION C) BENEFITS COVERED UNDER THE POLICY",
"SECTION D) EXCLUSIONS– SPECIFIC EXCLUSIONS APPLICABLE TO INTERNATIONAL COVER UNDER SECTION C) BENEFITS COVERED UNDER THE POLICY",
"SECTION E) GENERAL TERMS AND CONDITIONS - STANDARD GENERAL TERMS AND CONDITIONS",
"SECTION E) GENERAL TERMS AND CONDITIONS - SPECIFIC TERMS AND CONDITIONS",
"SECTION E) GENERAL TERMS AND CLAUSES - STANDARD GENERAL TERMS AND CLAUSES"
]
},
"extraction_metadata": {
"field_metadata": {},
"usage": {
"num_pages_extracted": 49,
"num_document_tokens": 48701,
"num_output_tokens": 1229
}
}
}
return dummy_headings
# --- API Endpoint ---
@app.post("/hackrx/run", response_model=RunResponse)
async def run_rag_pipeline(request: RunRequest):
"""
Runs the RAG pipeline for a given PDF document and a list of questions.
"""
pdf_url = request.documents
questions = request.questions
local_pdf_path = None
try:
# Step 1: Download PDF (using pseudo-function)
local_markdown_path = await download_and_parse_document(pdf_url) # Renamed from convert_to_markdown to reflect it returns local path
# Step 2: Fetch headings JSON (using pseudo-function)
headings_json = extract_schema_from_file(local_markdown_path)
with open("output.json", 'w', encoding='utf-8') as f:
json.dump(headings_json, f, indent=4, ensure_ascii=False)
if not headings_json or not headings_json.get("headings"):
raise HTTPException(status_code=400, detail="Could not retrieve valid headings from the provided PDF URL.")
# Step 3: Process PDF with manual sections to get chunks with metadata
print("Processing PDF into chunks with manual sections...")
processed_documents = process_markdown_with_manual_sections(
local_markdown_path,
headings_json,
CHUNK_SIZE,
CHUNK_OVERLAP
)
if not processed_documents:
raise HTTPException(status_code=500, detail="Failed to process PDF into document chunks.")
all_answers = []
# Step 4: Iterate through questions, perform search, and generate answers
for i, question in enumerate(questions):
print(f"Processing question {i+1}/{len(questions)}: '{question}'")
# Perform vector search
retrieved_results = perform_vector_search(processed_documents, question, TOP_K_CHUNKS)
if retrieved_results:
# Generate answer using Groq
answer_text = generate_answer_with_groq(question, retrieved_results, GROQ_API_KEY)
else:
answer_text = "No relevant information found in the document to answer this question."
all_answers.append(Answer(answer=answer_text))
return RunResponse(answers=all_answers)
except HTTPException as e:
raise e
except Exception as e:
print(f"An unhandled error occurred: {e}")
raise HTTPException(status_code=500, detail=f"An internal server error occurred: {e}")
finally:
# Clean up the temporary PDF file
if local_pdf_path and os.path.exists(local_pdf_path):
os.unlink(local_pdf_path)
print(f"Cleaned up temporary PDF file: {local_pdf_path}")