File size: 7,091 Bytes
ec85693
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import os
import tempfile
import json
import sqlite3
import threading
from typing import Any, List, Optional
from datetime import datetime

from pydantic import BaseModel, Field
from fastapi import APIRouter, Query, HTTPException

# ===== Config =====
def _pick_writable_dir():
    candidates = [
        os.getenv("DB_DIR"),           # <- recommended: set this in env
        "/data",                       # HF Spaces/container convention
        "/app/data",                   # container-friendly
        os.path.join(os.getcwd(), "data"),
        tempfile.gettempdir(),
    ]
    for d in candidates:
        if not d:
            continue
        try:
            os.makedirs(d, exist_ok=True)
            test = os.path.join(d, ".rwcheck")
            with open(test, "w") as f:
                f.write("ok")
            os.remove(test)
            return d
        except Exception:
            continue
    # absolute fallback: current working directory
    return os.getcwd()

DB_DIR = _pick_writable_dir()
DB_PATH = os.path.join(DB_DIR, os.getenv("DB_FILE", "app.db"))

# ===== SQLite bootstrap =====
_conn = sqlite3.connect(DB_PATH, check_same_thread=False)
_conn.execute(
    """
    CREATE TABLE IF NOT EXISTS reports (
        id                INTEGER PRIMARY KEY AUTOINCREMENT,
        user_id        TEXT NOT NULL,
        report_date       TEXT,
        ocr_text          TEXT,
        anomalies    TEXT,
        measurements TEXT,
        created_at        TEXT DEFAULT (datetime('now'))
    )
    """
)
_conn.execute(
    "CREATE INDEX IF NOT EXISTS idx_reports_user_created ON reports(user_id, created_at DESC)"
)
_conn.commit()
_db_lock = threading.Lock()


def _safe_parse_json(text):
    if text is None:
        return None
    s = text.strip()
    if not s:
        return None
    # Try real JSON first
    try:
        return json.loads(s)
    except json.JSONDecodeError:
        pass
    # Common legacy cases: Python repr (single quotes, True/False/None)
    try:
        return ast.literal_eval(s)
    except Exception:
        return None  # or return s if you prefer to surface the raw text

def _row_to_dict(row: sqlite3.Row) -> dict:
    return {
        "id": row[0],
        "user_id": row[1],
        "report_date": row[2],
        "ocr_text": row[3],
        "anomalies": _safe_parse_json(row[4]),
        "measurements": _safe_parse_json(row[5]),
        "created_at": row[6],
    }

def db_insert_report(payload: dict) -> int:
    with _db_lock:
        cur = _conn.cursor()
        cur.execute(
            """
            INSERT INTO reports (
                user_id, report_date, ocr_text,
                anomalies, measurements
            )
            VALUES (?, ?, ?, ?, ?)
            """,
            (
                payload.get("user_id"),
                payload.get("report_date"),
                payload.get("ocr_text"),
                payload.get("anomalies"),
                payload.get("measurements"),
            ),
        )
        _conn.commit()
        return cur.lastrowid

def db_fetch_reports(user_id: str, limit: int = 50, offset: int = 0) -> List[dict]:
    _conn.row_factory = sqlite3.Row
    cur = _conn.cursor()
    cur.execute(
        """
        SELECT id, user_id, report_date, ocr_text,
               anomalies, measurements, created_at
        FROM reports
        WHERE user_id = ?
        ORDER BY datetime(created_at) DESC, id DESC
        LIMIT ? OFFSET ?
        """,
        (user_id, limit, offset),
    )
    return [_row_to_dict(r) for r in cur.fetchall()]

def db_get_report(report_id: int) -> Optional[dict]:
    _conn.row_factory = sqlite3.Row
    cur = _conn.cursor()
    cur.execute(
        """
        SELECT id, user_id, report_date, ocr_text,
               anomalies, measurements, created_at
        FROM reports
        WHERE id = ?
        """,
        (report_id,),
    )
    row = cur.fetchone()
    return _row_to_dict(row) if row else None

def db_delete_report(report_id: int) -> bool:
    with _db_lock:
        cur = _conn.cursor()
        cur.execute("DELETE FROM reports WHERE id = ?", (report_id,))
        _conn.commit()
        return cur.rowcount > 0

# ===== Pydantic Schemas =====
class ReportIn(BaseModel):
    user_id: str = Field(..., example="patient@example.com")
    report_date: Optional[str] = Field(None, example="2025-09-07")
    ocr_text: Optional[str] = None
    anomalies: Optional[Any] = None
    measurements: Optional[Any] = None

class ReportOut(BaseModel):
    id: int
    user_id: str
    report_date: Optional[str]
    ocr_text: Optional[str]
    anomalies: Optional[Any]
    measurements: Optional[Any]
    created_at: str

# ===== Router =====
router = APIRouter(tags=["Reports"])

@router.post("/save_report/", response_model=dict)
def save_report(body: ReportIn):
    if not body.user_id:
        raise HTTPException(status_code=400, detail="user_id is required")
    print(f"Saving report for user: {body}")
    rid = db_insert_report(body.dict())
    return {"ok": True, "id": rid}

@router.get("/reports/", response_model=List[ReportOut])
def list_reports(
    user_id: str = Query(..., description="User email to filter by"),
    limit: int = Query(50, ge=1, le=200),
    offset: int = Query(0, ge=0),
):
    return db_fetch_reports(user_id=user_id, limit=limit, offset=offset)

@router.get("/reports/{report_id}", response_model=ReportOut)
def get_report(report_id: int):
    row = db_get_report(report_id)
    if not row:
        raise HTTPException(status_code=404, detail="Report not found")
    return row

@router.delete("/reports/{report_id}", response_model=dict)
def delete_report(report_id: int):
    if db_delete_report(report_id):
        return {"ok": True, "deleted": report_id}
    raise HTTPException(status_code=404, detail="Report not found")


if __name__ == "__main__":
    #req = ReportIn(user_id="test.hd@gmail.com", report_date="2025-09-08", ocr_text="Medical Report - Cancer Patient Name: Carol Davis Age: 55 Gender: Female Clinical History: Recent biopsy confirms breast cancer (invasive ductal carcinoma). No lymph node involvement. PET scan negative for metastasis.", anomalies=[{"findings":"BREAST CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"},{"findings":"CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"},{"findings":"BRAIN CANCER(DETECTED AS HISTORICAL CONDITION, BUT STILL UNDER RISK.)","severity":"Severe Risk","recommendations":["Consult a doctor."],"treatment_suggestions":"Consult a specialist: General Practitioner","home_care_guidance":[],"info_link":"https://www.webmd.com/"}], measurements=[])
    #save_report(req)
    print(db_fetch_reports(user_id="test.hd@gmail.com"))