import asyncio from aiohttp import web, WSMsgType from google import genai from google.genai import types from PyPDF2 import PdfReader from sentence_transformers import SentenceTransformer import faiss import numpy as np import os def load_pdf_chunks(pdf_path, max_chunk_length=500): reader = PdfReader(pdf_path) full_text = "" for page in reader.pages: text = page.extract_text() if text: full_text += text + "\n" # Split into smaller chunks chunks = [full_text[i:i + max_chunk_length] for i in range(0, len(full_text), max_chunk_length)] return chunks PDF_PATH = "resume.pdf" chunks = load_pdf_chunks(PDF_PATH) embedding_model = SentenceTransformer('all-MiniLM-L6-v2') chunk_embeddings = embedding_model.encode(chunks, convert_to_numpy=True) dimension = chunk_embeddings.shape[1] index = faiss.IndexFlatL2(dimension) index.add(chunk_embeddings) def retrieve_context(query, top_k=7): query_embedding = embedding_model.encode([query], convert_to_numpy=True) distances, indices = index.search(query_embedding, top_k) return [chunks[i] for i in indices[0]] GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") if not GEMINI_API_KEY: raise ValueError("GEMINI_API_KEY environment variable not set!") # Initialize Gemini client client = genai.Client(api_key=GEMINI_API_KEY) model = "gemini-2.0-flash-live-001" config = types.LiveConnectConfig( response_modalities=["TEXT"], system_instruction=types.Content( parts=[ types.Part.from_text( text="You are Soumyajit's AI Assistant which is chatbot on Soumyajit's profile website, you have to act like a professional AI Assistant, and answer precisely in maximum 2-3 lines." ) ], role="user" ) ) # WebSocket handler async def ws_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) async with client.aio.live.connect(model=model, config=config) as session: async for msg in ws: if msg.type == WSMsgType.TEXT: user_query = msg.data # 🔍 RAG Retrieval retrieved_docs = retrieve_context(user_query) context_text = "\n".join(retrieved_docs) # Combine into final prompt final_prompt = f"Context:\n{context_text}\n\nQuestion:\n{user_query}" await session.send_client_content( turns={"role": "user", "parts": [{"text": final_prompt}]}, turn_complete=True ) full_response = "" async for response in session.receive(): if response.text: full_response += response.text await ws.send_str(full_response) elif msg.type == WSMsgType.ERROR: print('WebSocket connection closed with exception:', ws.exception()) return ws # Health check async def health_check(request): return web.Response(text="OK") # Main app setup app = web.Application() app.router.add_get("/", health_check) app.router.add_get("/ws", ws_handler) # WebSocket endpoint if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) # Hugging Face uses 7860 web.run_app(app, port=port, host="0.0.0.0")