formpilot-demo / ingest /ingest_forms.py
afulara's picture
Auto‑deploy from GitHub
c3967db verified
# ingest/ingest_forms.py
"""
Ingest every PDF in data/ ➜ local Qdrant store (run once)
Also exposes load_raw_docs() for Stage‑5 synthetic generation.
"""
from pathlib import Path
from typing import List
from dotenv import load_dotenv
from qdrant_client import QdrantClient, models
from qdrant_client.http import models as rest
from langchain_openai import OpenAIEmbeddings
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from tqdm import trange
from typing import Optional
import pathlib
load_dotenv() # OPENAI_API_KEY
DATA_DIR = Path("data")
QDRANT_PATH = "qdrant_data"
COLL = "formpilot_docs"
EMB_SIZE = 1536 # text‑embedding‑3‑small
# ------------------------------------------------------------------
# helpers
# ------------------------------------------------------------------
def _get_client(readonly: bool = False) -> QdrantClient:
"""
• readonly=False – opens once for ingest (caller is __main__)
• readonly=True – opens for load_raw_docs(). Uses the same
folder but *disables* the LMDB write lock so it can coexist.
"""
return QdrantClient(
path=QDRANT_PATH,
force_disable_check_same_thread=True,
prefer_grpc=False, # local, pure‑python
read_only=readonly,
)
# ingest/ingest_forms.py – drop‑in replacement for load_raw_docs
def load_raw_docs(limit: Optional[int] = None) -> list[str]: # ← Optional
"""
Fetch text chunks back from the local Qdrant collection so that
downstream scripts (e.g. make_synthetic.py) can work without
re‑ingesting PDF files.
Parameters
----------
limit : int | None
Max number of chunks to return (handy for quick tests).
Returns
-------
list[str]
The raw text strings you originally inserted.
"""
from qdrant_client import QdrantClient # local‑only client
QDRANT_PATH = pathlib.Path("qdrant_data")
cli = QdrantClient(path=QDRANT_PATH, force_disable_check_same_thread=True)
# scroll() returns (points, next_offset). We only need points.
points, _ = cli.scroll(collection_name="formpilot_docs")
texts = [p.payload["text"] for p in points]
if limit:
texts = texts[:limit]
return texts
# ------------------------------------------------------------------
# main ingest routine – only runs if script executed directly
# ------------------------------------------------------------------
def _ingest_all() -> None:
cli = _get_client()
# create collection once
if COLL not in {c.name for c in cli.get_collections().collections}:
cli.create_collection(
collection_name=COLL,
vectors_config=models.VectorParams(
size=EMB_SIZE, distance=models.Distance.COSINE
),
)
print(f"✅ Created collection {COLL}")
embedder = OpenAIEmbeddings(model="text-embedding-3-small")
splitter = RecursiveCharacterTextSplitter(chunk_size=350, chunk_overlap=50)
vecs, payloads, ids = [], [], []
next_id = cli.count(COLL).count
for pdf in DATA_DIR.glob("*.pdf"):
form_code = pdf.stem.split("instr")[0].upper()
for doc in PyPDFLoader(str(pdf)).load_and_split(splitter):
vecs.append(embedder.embed_query(doc.page_content))
payloads.append(
dict(
text=doc.page_content,
source=f"{pdf.name}:page-{doc.metadata.get('page',0)}",
form=form_code,
)
)
ids.append(next_id)
next_id += 1
if vecs:
cli.upload_collection(COLL, vecs, payloads, ids, batch_size=64)
print(f"✅ Upserted {len(vecs):,} vectors into {COLL}")
else:
print("ℹ️ Nothing new to ingest.")
if __name__ == "__main__":
_ingest_all()