Spaces:
Running
Running
File size: 7,091 Bytes
ec85693 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
import os
import tempfile
import json
import sqlite3
import threading
from typing import Any, List, Optional
from datetime import datetime
from pydantic import BaseModel, Field
from fastapi import APIRouter, Query, HTTPException
# ===== Config =====
def _pick_writable_dir():
candidates = [
os.getenv("DB_DIR"), # <- recommended: set this in env
"/data", # HF Spaces/container convention
"/app/data", # container-friendly
os.path.join(os.getcwd(), "data"),
tempfile.gettempdir(),
]
for d in candidates:
if not d:
continue
try:
os.makedirs(d, exist_ok=True)
test = os.path.join(d, ".rwcheck")
with open(test, "w") as f:
f.write("ok")
os.remove(test)
return d
except Exception:
continue
# absolute fallback: current working directory
return os.getcwd()
DB_DIR = _pick_writable_dir()
DB_PATH = os.path.join(DB_DIR, os.getenv("DB_FILE", "app.db"))
# ===== SQLite bootstrap =====
_conn = sqlite3.connect(DB_PATH, check_same_thread=False)
_conn.execute(
"""
CREATE TABLE IF NOT EXISTS reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
report_date TEXT,
ocr_text TEXT,
anomalies TEXT,
measurements TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
"""
)
_conn.execute(
"CREATE INDEX IF NOT EXISTS idx_reports_user_created ON reports(user_id, created_at DESC)"
)
_conn.commit()
_db_lock = threading.Lock()
def _safe_parse_json(text):
if text is None:
return None
s = text.strip()
if not s:
return None
# Try real JSON first
try:
return json.loads(s)
except json.JSONDecodeError:
pass
# Common legacy cases: Python repr (single quotes, True/False/None)
try:
return ast.literal_eval(s)
except Exception:
return None # or return s if you prefer to surface the raw text
def _row_to_dict(row: sqlite3.Row) -> dict:
return {
"id": row[0],
"user_id": row[1],
"report_date": row[2],
"ocr_text": row[3],
"anomalies": _safe_parse_json(row[4]),
"measurements": _safe_parse_json(row[5]),
"created_at": row[6],
}
def db_insert_report(payload: dict) -> int:
with _db_lock:
cur = _conn.cursor()
cur.execute(
"""
INSERT INTO reports (
user_id, report_date, ocr_text,
anomalies, measurements
)
VALUES (?, ?, ?, ?, ?)
""",
(
payload.get("user_id"),
payload.get("report_date"),
payload.get("ocr_text"),
payload.get("anomalies"),
payload.get("measurements"),
),
)
_conn.commit()
return cur.lastrowid
def db_fetch_reports(user_id: str, limit: int = 50, offset: int = 0) -> List[dict]:
_conn.row_factory = sqlite3.Row
cur = _conn.cursor()
cur.execute(
"""
SELECT id, user_id, report_date, ocr_text,
anomalies, measurements, created_at
FROM reports
WHERE user_id = ?
ORDER BY datetime(created_at) DESC, id DESC
LIMIT ? OFFSET ?
""",
(user_id, limit, offset),
)
return [_row_to_dict(r) for r in cur.fetchall()]
def db_get_report(report_id: int) -> Optional[dict]:
_conn.row_factory = sqlite3.Row
cur = _conn.cursor()
cur.execute(
"""
SELECT id, user_id, report_date, ocr_text,
anomalies, measurements, created_at
FROM reports
WHERE id = ?
""",
(report_id,),
)
row = cur.fetchone()
return _row_to_dict(row) if row else None
def db_delete_report(report_id: int) -> bool:
with _db_lock:
cur = _conn.cursor()
cur.execute("DELETE FROM reports WHERE id = ?", (report_id,))
_conn.commit()
return cur.rowcount > 0
# ===== Pydantic Schemas =====
class ReportIn(BaseModel):
user_id: str = Field(..., example="patient@example.com")
report_date: Optional[str] = Field(None, example="2025-09-07")
ocr_text: Optional[str] = None
anomalies: Optional[Any] = None
measurements: Optional[Any] = None
class ReportOut(BaseModel):
id: int
user_id: str
report_date: Optional[str]
ocr_text: Optional[str]
anomalies: Optional[Any]
measurements: Optional[Any]
created_at: str
# ===== Router =====
router = APIRouter(tags=["Reports"])
@router.post("/save_report/", response_model=dict)
def save_report(body: ReportIn):
if not body.user_id:
raise HTTPException(status_code=400, detail="user_id is required")
print(f"Saving report for user: {body}")
rid = db_insert_report(body.dict())
return {"ok": True, "id": rid}
@router.get("/reports/", response_model=List[ReportOut])
def list_reports(
user_id: str = Query(..., description="User email to filter by"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
):
return db_fetch_reports(user_id=user_id, limit=limit, offset=offset)
@router.get("/reports/{report_id}", response_model=ReportOut)
def get_report(report_id: int):
row = db_get_report(report_id)
if not row:
raise HTTPException(status_code=404, detail="Report not found")
return row
@router.delete("/reports/{report_id}", response_model=dict)
def delete_report(report_id: int):
if db_delete_report(report_id):
return {"ok": True, "deleted": report_id}
raise HTTPException(status_code=404, detail="Report not found")
if __name__ == "__main__":
#req = ReportIn(user_id="test.hd@gmail.com", report_date="2025-09-08", ocr_text="Medical Report - Cancer Patient Name: Carol Davis Age: 55 Gender: Female Clinical History: Recent biopsy confirms breast cancer (invasive ductal carcinoma). No lymph node involvement. PET scan negative for metastasis.", anomalies=[{"findings":"BREAST CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"},{"findings":"CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"},{"findings":"BRAIN CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"}], measurements=[])
#save_report(req)
print(db_fetch_reports(user_id="test.hd@gmail.com")) |