Spaces:
Running
Running
import asyncio | |
from aiohttp import web, WSMsgType | |
from google import genai | |
from google.genai import types | |
from PyPDF2 import PdfReader | |
from sentence_transformers import SentenceTransformer | |
import faiss | |
import numpy as np | |
import os | |
def load_pdf_chunks(pdf_path, max_chunk_length=500): | |
reader = PdfReader(pdf_path) | |
full_text = "" | |
for page in reader.pages: | |
text = page.extract_text() | |
if text: | |
full_text += text + "\n" | |
# Split into smaller chunks | |
chunks = [full_text[i:i + max_chunk_length] for i in range(0, len(full_text), max_chunk_length)] | |
return chunks | |
PDF_PATH = "resume.pdf" | |
chunks = load_pdf_chunks(PDF_PATH) | |
embedding_model = SentenceTransformer('all-MiniLM-L6-v2') | |
chunk_embeddings = embedding_model.encode(chunks, convert_to_numpy=True) | |
dimension = chunk_embeddings.shape[1] | |
index = faiss.IndexFlatL2(dimension) | |
index.add(chunk_embeddings) | |
def retrieve_context(query, top_k=7): | |
query_embedding = embedding_model.encode([query], convert_to_numpy=True) | |
distances, indices = index.search(query_embedding, top_k) | |
return [chunks[i] for i in indices[0]] | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
if not GEMINI_API_KEY: | |
raise ValueError("GEMINI_API_KEY environment variable not set!") | |
# Initialize Gemini client | |
client = genai.Client(api_key=GEMINI_API_KEY) | |
model = "gemini-2.0-flash-live-001" | |
config = types.LiveConnectConfig( | |
response_modalities=["TEXT"], | |
system_instruction=types.Content( | |
parts=[ | |
types.Part.from_text( | |
text="You are Soumyajit's AI Assistant which is chatbot on Soumyajit's profile website, you have to act like a professional AI Assistant, and answer precisely in maximum 2-3 lines." | |
) | |
], | |
role="user" | |
) | |
) | |
# WebSocket handler | |
async def ws_handler(request): | |
ws = web.WebSocketResponse() | |
await ws.prepare(request) | |
async with client.aio.live.connect(model=model, config=config) as session: | |
async for msg in ws: | |
if msg.type == WSMsgType.TEXT: | |
user_query = msg.data | |
# π RAG Retrieval | |
retrieved_docs = retrieve_context(user_query) | |
context_text = "\n".join(retrieved_docs) | |
# Combine into final prompt | |
final_prompt = f"Context:\n{context_text}\n\nQuestion:\n{user_query}" | |
await session.send_client_content( | |
turns={"role": "user", "parts": [{"text": final_prompt}]}, | |
turn_complete=True | |
) | |
full_response = "" | |
async for response in session.receive(): | |
if response.text: | |
full_response += response.text | |
await ws.send_str(full_response) | |
elif msg.type == WSMsgType.ERROR: | |
print('WebSocket connection closed with exception:', ws.exception()) | |
return ws | |
# Health check | |
async def health_check(request): | |
return web.Response(text="OK") | |
# Main app setup | |
app = web.Application() | |
app.router.add_get("/", health_check) | |
app.router.add_get("/ws", ws_handler) # WebSocket endpoint | |
if __name__ == "__main__": | |
port = int(os.environ.get("PORT", 7860)) # Hugging Face uses 7860 | |
web.run_app(app, port=port, host="0.0.0.0") |