# ingest/ingest_forms.py """ Ingest every PDF in data/ ➜ local Qdrant store (run once) Also exposes load_raw_docs() for Stage‑5 synthetic generation. """ from pathlib import Path from typing import List from dotenv import load_dotenv from qdrant_client import QdrantClient, models from qdrant_client.http import models as rest from langchain_openai import OpenAIEmbeddings from langchain_community.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from tqdm import trange from typing import Optional import pathlib load_dotenv() # OPENAI_API_KEY DATA_DIR = Path("data") QDRANT_PATH = "qdrant_data" COLL = "formpilot_docs" EMB_SIZE = 1536 # text‑embedding‑3‑small # ------------------------------------------------------------------ # helpers # ------------------------------------------------------------------ def _get_client(readonly: bool = False) -> QdrantClient: """ • readonly=False – opens once for ingest (caller is __main__) • readonly=True – opens for load_raw_docs(). Uses the same folder but *disables* the LMDB write lock so it can coexist. """ return QdrantClient( path=QDRANT_PATH, force_disable_check_same_thread=True, prefer_grpc=False, # local, pure‑python read_only=readonly, ) # ingest/ingest_forms.py – drop‑in replacement for load_raw_docs def load_raw_docs(limit: Optional[int] = None) -> list[str]: # ← Optional """ Fetch text chunks back from the local Qdrant collection so that downstream scripts (e.g. make_synthetic.py) can work without re‑ingesting PDF files. Parameters ---------- limit : int | None Max number of chunks to return (handy for quick tests). Returns ------- list[str] The raw text strings you originally inserted. """ from qdrant_client import QdrantClient # local‑only client QDRANT_PATH = pathlib.Path("qdrant_data") cli = QdrantClient(path=QDRANT_PATH, force_disable_check_same_thread=True) # scroll() returns (points, next_offset). We only need points. points, _ = cli.scroll(collection_name="formpilot_docs") texts = [p.payload["text"] for p in points] if limit: texts = texts[:limit] return texts # ------------------------------------------------------------------ # main ingest routine – only runs if script executed directly # ------------------------------------------------------------------ def _ingest_all() -> None: cli = _get_client() # create collection once if COLL not in {c.name for c in cli.get_collections().collections}: cli.create_collection( collection_name=COLL, vectors_config=models.VectorParams( size=EMB_SIZE, distance=models.Distance.COSINE ), ) print(f"✅ Created collection {COLL}") embedder = OpenAIEmbeddings(model="text-embedding-3-small") splitter = RecursiveCharacterTextSplitter(chunk_size=350, chunk_overlap=50) vecs, payloads, ids = [], [], [] next_id = cli.count(COLL).count for pdf in DATA_DIR.glob("*.pdf"): form_code = pdf.stem.split("instr")[0].upper() for doc in PyPDFLoader(str(pdf)).load_and_split(splitter): vecs.append(embedder.embed_query(doc.page_content)) payloads.append( dict( text=doc.page_content, source=f"{pdf.name}:page-{doc.metadata.get('page',0)}", form=form_code, ) ) ids.append(next_id) next_id += 1 if vecs: cli.upload_collection(COLL, vecs, payloads, ids, batch_size=64) print(f"✅ Upserted {len(vecs):,} vectors into {COLL}") else: print("ℹ️ Nothing new to ingest.") if __name__ == "__main__": _ingest_all()