from fastapi import FastAPI, UploadFile, File, Form from sentence_transformers import SentenceTransformer import pdfplumber import uuid import chromadb from chromadb.config import Settings import httpx import os # Fix: Set custom writable Hugging Face cache directory os.environ["TRANSFORMERS_CACHE"] = "/app/cache" os.makedirs("/app/cache", exist_ok=True) # Initialize FastAPI app = FastAPI() # Load SentenceTransformer model for document embeddings model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") # Initialize ChromaDB chroma_client = chromadb.Client(Settings(chroma_db_impl="duckdb+parquet", persist_directory="./chroma_storage")) collection = chroma_client.get_or_create_collection(name="documents") # RedMindGPT API details REDMIND_API_URL = "http://redmindgpt.redmindtechnologies.com/v1" REDMIND_API_KEY = "dataset-feqz5KrqHkFRdWbh2DInt58L" # Function to process PDF and store each page def process_pdf_and_store(file_bytes: bytes, filename: str): with pdfplumber.open(file_bytes) as pdf: for page_number, page in enumerate(pdf.pages, start=1): text = page.extract_text() if text: embedding = model.encode(text, normalize_embeddings=True).tolist() uid = str(uuid.uuid4()) collection.add( documents=[text], embeddings=[embedding], ids=[uid], metadatas=[{ "filename": filename, "page": page_number }] ) # Home route @app.get("/") def root(): return {"message": "Semantic Document Retrieval API with RedMindGPT is running!"} # Upload PDF and store embeddings @app.post("/upload-pdf/") async def upload_pdf(file: UploadFile = File(...)): if not file.filename.endswith(".pdf"): return {"error": "Only PDF files are supported."} contents = await file.read() try: process_pdf_and_store(file_bytes=contents, filename=file.filename) return {"message": f"Successfully processed and stored '{file.filename}'"} except Exception as e: return {"error": f"Failed to process PDF: {str(e)}"} # Search top K results @app.post("/search/") async def search_text(query: str = Form(...), top_k: int = 3): try: embedding = model.encode(query, normalize_embeddings=True).tolist() results = collection.query(query_embeddings=[embedding], n_results=top_k) return { "query": query, "results": [ { "filename": metadata["filename"], "page": metadata["page"], "snippet": doc[:200] + "..." if len(doc) > 200 else doc, "score": score } for doc, metadata, score in zip( results["documents"][0], results["metadatas"][0], results["distances"][0] ) ] } except Exception as e: return {"error": f"Search failed: {str(e)}"} # Search + send top result to RedMind API @app.post("/search-and-query/") async def search_and_query_redmind(question: str = Form(...)): try: # Get document embedding embedding = model.encode(question, normalize_embeddings=True).tolist() results = collection.query(query_embeddings=[embedding], n_results=1) if not results["documents"][0]: return {"error": "No relevant document found."} top_doc = results["documents"][0][0] # Send top doc + question to RedMind headers = { "Authorization": f"Bearer {REDMIND_API_KEY}", "Content-Type": "application/json" } payload = { "input": f"Context: {top_doc}\n\nQuestion: {question}" } async with httpx.AsyncClient() as client: response = await client.post(REDMIND_API_URL, headers=headers, json=payload) response.raise_for_status() answer = response.json() return { "question": question, "top_document_snippet": top_doc[:200] + "...", "redmind_response": answer } except Exception as e: return {"error": f"RedMind integration failed: {str(e)}"} # List all stored documents (for dev use) @app.get("/list-docs/") def list_documents(): try: return collection.peek() except Exception as e: return {"error": f"Failed to list documents: {str(e)}"}