Spaces:
Running
Running
File size: 7,091 Bytes
ec85693 |
|
import os
import tempfile
import json
import sqlite3
import threading
from typing import Any, List, Optional
from datetime import datetime
from pydantic import BaseModel, Field
from fastapi import APIRouter, Query, HTTPException
# ===== Config =====
def _pick_writable_dir():
candidates = [
os.getenv("DB_DIR"), # <- recommended: set this in env
"/data", # HF Spaces/container convention
"/app/data", # container-friendly
os.path.join(os.getcwd(), "data"),
tempfile.gettempdir(),
]
for d in candidates:
if not d:
continue
try:
os.makedirs(d, exist_ok=True)
test = os.path.join(d, ".rwcheck")
with open(test, "w") as f:
f.write("ok")
os.remove(test)
return d
except Exception:
continue
# absolute fallback: current working directory
return os.getcwd()
DB_DIR = _pick_writable_dir()
DB_PATH = os.path.join(DB_DIR, os.getenv("DB_FILE", "app.db"))
# ===== SQLite bootstrap =====
_conn = sqlite3.connect(DB_PATH, check_same_thread=False)
_conn.execute(
"""
CREATE TABLE IF NOT EXISTS reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
report_date TEXT,
ocr_text TEXT,
anomalies TEXT,
measurements TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
"""
)
_conn.execute(
"CREATE INDEX IF NOT EXISTS idx_reports_user_created ON reports(user_id, created_at DESC)"
)
_conn.commit()
_db_lock = threading.Lock()
def _safe_parse_json(text):
if text is None:
return None
s = text.strip()
if not s:
return None
# Try real JSON first
try:
return json.loads(s)
except json.JSONDecodeError:
pass
# Common legacy cases: Python repr (single quotes, True/False/None)
try:
return ast.literal_eval(s)
except Exception:
return None # or return s if you prefer to surface the raw text
def _row_to_dict(row: sqlite3.Row) -> dict:
return {
"id": row[0],
"user_id": row[1],
"report_date": row[2],
"ocr_text": row[3],
"anomalies": _safe_parse_json(row[4]),
"measurements": _safe_parse_json(row[5]),
"created_at": row[6],
}
def db_insert_report(payload: dict) -> int:
with _db_lock:
cur = _conn.cursor()
cur.execute(
"""
INSERT INTO reports (
user_id, report_date, ocr_text,
anomalies, measurements
)
VALUES (?, ?, ?, ?, ?)
""",
(
payload.get("user_id"),
payload.get("report_date"),
payload.get("ocr_text"),
payload.get("anomalies"),
payload.get("measurements"),
),
)
_conn.commit()
return cur.lastrowid
def db_fetch_reports(user_id: str, limit: int = 50, offset: int = 0) -> List[dict]:
_conn.row_factory = sqlite3.Row
cur = _conn.cursor()
cur.execute(
"""
SELECT id, user_id, report_date, ocr_text,
anomalies, measurements, created_at
FROM reports
WHERE user_id = ?
ORDER BY datetime(created_at) DESC, id DESC
LIMIT ? OFFSET ?
""",
(user_id, limit, offset),
)
return [_row_to_dict(r) for r in cur.fetchall()]
def db_get_report(report_id: int) -> Optional[dict]:
_conn.row_factory = sqlite3.Row
cur = _conn.cursor()
cur.execute(
"""
SELECT id, user_id, report_date, ocr_text,
anomalies, measurements, created_at
FROM reports
WHERE id = ?
""",
(report_id,),
)
row = cur.fetchone()
return _row_to_dict(row) if row else None
def db_delete_report(report_id: int) -> bool:
with _db_lock:
cur = _conn.cursor()
cur.execute("DELETE FROM reports WHERE id = ?", (report_id,))
_conn.commit()
return cur.rowcount > 0
# ===== Pydantic Schemas =====
class ReportIn(BaseModel):
user_id: str = Field(..., example="patient@example.com")
report_date: Optional[str] = Field(None, example="2025-09-07")
ocr_text: Optional[str] = None
anomalies: Optional[Any] = None
measurements: Optional[Any] = None
class ReportOut(BaseModel):
id: int
user_id: str
report_date: Optional[str]
ocr_text: Optional[str]
anomalies: Optional[Any]
measurements: Optional[Any]
created_at: str
# ===== Router =====
router = APIRouter(tags=["Reports"])
@router.post("/save_report/", response_model=dict)
def save_report(body: ReportIn):
if not body.user_id:
raise HTTPException(status_code=400, detail="user_id is required")
print(f"Saving report for user: {body}")
rid = db_insert_report(body.dict())
return {"ok": True, "id": rid}
@router.get("/reports/", response_model=List[ReportOut])
def list_reports(
user_id: str = Query(..., description="User email to filter by"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
):
return db_fetch_reports(user_id=user_id, limit=limit, offset=offset)
@router.get("/reports/{report_id}", response_model=ReportOut)
def get_report(report_id: int):
row = db_get_report(report_id)
if not row:
raise HTTPException(status_code=404, detail="Report not found")
return row
@router.delete("/reports/{report_id}", response_model=dict)
def delete_report(report_id: int):
if db_delete_report(report_id):
return {"ok": True, "deleted": report_id}
raise HTTPException(status_code=404, detail="Report not found")
if __name__ == "__main__":
#req = ReportIn(user_id="test.hd@gmail.com", report_date="2025-09-08", ocr_text="Medical Report - Cancer Patient Name: Carol Davis Age: 55 Gender: Female Clinical History: Recent biopsy confirms breast cancer (invasive ductal carcinoma). No lymph node involvement. PET scan negative for metastasis.", anomalies=[{"findings":"BREAST CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"},{"findings":"CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"},{"findings":"BRAIN CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"}], measurements=[])
#save_report(req)
print(db_fetch_reports(user_id="test.hd@gmail.com")) |