foodwise-remote-mcp / scripts /verify_schema.py
LeoWalker's picture
enhance(dev): improve development tooling and configuration
c0cfae9
#!/usr/bin/env python3
"""Verify Notion DB schemas (inventory and shopping) against Pydantic models.
Derives expected properties from `db_models.py` metadata, not a manual dict.
Run with: uv run python scripts/verify_schema.py [--db inventory|shopping|all] [DB_ID]
"""
import sys
import os
from pathlib import Path
# Add src to path so we can import our modules
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from dotenv import load_dotenv
load_dotenv()
from notion_client import Client
from typing import Any, Dict, cast, Type
import argparse
# Derive expected schema from Pydantic models rather than hardcoding
from foodwise.database.db_models import FoodInventoryItem, ShoppingListItem
import re
def _expected_properties_from_model(model: Type[Any]) -> dict:
"""Build expected Notion properties by parsing Pydantic Field descriptions.
We avoid hand-maintained JSON by deriving property names and types from the
model metadata. Special casing is minimal and limited to known Notion label
formats and title properties.
"""
fields = model.model_fields
def to_title(label: str) -> str:
# Snake to words, handling custom cases used in the Notion DB
special = {
"best_by_date": "Best By Date",
"purchase_date": "Purchase Date",
"opened_date": "Opened Date",
"freeze_date": "Freeze Date",
"location_shelf": "Location/Shelf",
"fridge_zone": "Fridge Zone",
"storage_type": "Storage Type",
"cooked_raw_status": "Cooked/Raw Status",
"prep_notes": "Prep Notes",
}
if label in special:
return special[label]
if label == "name": # inventory title label
return "Name"
return " ".join(w.capitalize() for w in label.split("_"))
def parse_notion_type(description: str | None, field_name: str) -> str:
if not description:
return "rich_text" # safe default for free text
m = re.search(r"Notion:\s*([^\)]+)\)", description)
notion_hint = m.group(1).strip() if m else ""
# Normalize common hints
mapping = {
"Text": "rich_text",
"Rich text": "rich_text",
"Title": "title",
"Select": "select",
"Multi-select": "multi_select",
"Number": "number",
"Date": "date",
"Checkbox": "checkbox",
"Checkbox/Boolean": "checkbox",
"Boolean": "checkbox",
"id": "id",
"URL": "url",
}
# Some descriptions include multiple hints like "Text/Relation" – pick first known
for part in [p.strip() for p in re.split(r"[,/]", notion_hint)]:
if part in mapping:
return mapping[part]
# Fallbacks by annotation kinds
ann = fields[field_name].annotation
ann_str = str(ann)
if "date" in ann_str:
return "date"
if "float" in ann_str or "int" in ann_str:
return "number"
if "List" in ann_str or "list" in ann_str:
return "multi_select"
if "bool" in ann_str:
return "checkbox"
return "rich_text"
expected: dict = {}
# Identify fields that must be treated as Notion title when descriptions don't specify it
forced_title_fields = {"name"} if model is FoodInventoryItem else set()
for fname, finfo in fields.items():
if fname in {"id", "url"}:
continue # not database properties
notion_name = to_title(fname)
notion_type = parse_notion_type(getattr(finfo, "description", None), fname)
if fname in forced_title_fields:
notion_type = "title"
# Required if title
expected[notion_name] = {"type": notion_type, "required": notion_type == "title"}
return expected
def verify_database_schema(database_id: str | None = None, db_kind: str = "inventory"):
"""Verify the database schema matches FoodWise expectations."""
# Resolve database id from env if not provided
env_key = "NOTION_INVENTORY_DB_ID" if db_kind == "inventory" else "NOTION_SHOPPING_DB_ID"
database_id = database_id or os.getenv(env_key)
if not database_id:
print(f"❌ {env_key} not found in environment and no ID was provided")
return False
model: Type[Any] = FoodInventoryItem if db_kind == "inventory" else ShoppingListItem
expected_properties = _expected_properties_from_model(model)
try:
notion_secret = os.getenv("NOTION_SECRET")
if not notion_secret:
print("❌ NOTION_SECRET not found in environment")
return False
client = Client(auth=notion_secret)
print(f"πŸ” Retrieving database schema for ID: {database_id}")
database = cast(Dict[str, Any], client.databases.retrieve(database_id))
database_title = "Unknown"
if database.get("title") and len(database["title"]) > 0:
database_title = database["title"][0]["text"]["content"]
print(f"πŸ“Š Database: {database_title}")
print(f"πŸ”— URL: {database['url']}")
actual_properties = database.get("properties", {})
print(f"\nπŸ“‹ Found {len(actual_properties)} properties in database")
# Normalize names so minor punctuation/case diffs don't flag as missing/unexpected
def _norm(label: str) -> str:
return "".join(ch for ch in label.lower() if ch.isalnum())
actual_norm_to_name: Dict[str, str] = {_norm(n): n for n in actual_properties.keys()}
# Check each expected property
print("\n" + "=" * 60)
print(f"πŸ” {db_kind.upper()} SCHEMA CHECK")
print("=" * 60)
missing_required = []
missing_optional = []
type_mismatches = []
correct_properties = []
for prop_name, expected in expected_properties.items():
actual_key = actual_norm_to_name.get(_norm(prop_name))
if actual_key is None:
if expected.get("required"):
missing_required.append(prop_name)
else:
missing_optional.append(prop_name)
print(f"❌ MISSING: {prop_name} ({expected['type']})")
continue
actual_prop = actual_properties[actual_key]
actual_type = actual_prop["type"]
expected_type = expected["type"]
if actual_type == expected_type:
correct_properties.append(prop_name)
else:
type_mismatches.append((prop_name, expected_type, actual_type))
print(f"❌ TYPE MISMATCH: {prop_name} (expected: {expected_type}, actual: {actual_type})")
# Check for unexpected properties
expected_norms = {_norm(n) for n in expected_properties.keys()}
unexpected_properties = [prop for prop in actual_properties if _norm(prop) not in expected_norms]
print("\n" + "-" * 60)
print("πŸ“Š SUMMARY")
print(
f" Correct: {len(correct_properties)} | Missing required: {len(missing_required)} | "
f"Missing optional: {len(missing_optional)} | Mismatches: {len(type_mismatches)} | Unexpected: {len(unexpected_properties)}"
)
if unexpected_properties:
print(f"\nπŸ” Unexpected properties found:")
for prop in unexpected_properties[:5]: # Show first 5
actual_type = actual_properties[prop]["type"]
print(f" β€’ {prop} ({actual_type})")
if len(unexpected_properties) > 5:
print(f" ... and {len(unexpected_properties) - 5} more")
# Overall assessment
if not missing_required and not type_mismatches:
print(f"\nπŸŽ‰ SCHEMA VERIFICATION PASSED!")
print(f" Your database is fully compatible with FoodWise!")
print(f"\nπŸ’‘ Next step: Add this to your .env file:")
print(f" {env_key}={database_id}")
return True
else:
print(f"\n⚠️ SCHEMA ISSUES DETECTED")
if missing_required:
print(f" Missing {len(missing_required)} required properties")
if missing_optional:
print(f" Missing {len(missing_optional)} optional properties")
if type_mismatches:
print(f" {len(type_mismatches)} properties have incorrect types")
print(f"\nπŸ”§ You may need to update your Notion database schema")
return False
except Exception as e:
print(f"❌ Failed to verify schema: {e}")
return False
if __name__ == "__main__":
print("🍽️ FoodWise Schema Verification")
print("=" * 40)
parser = argparse.ArgumentParser(description="Verify Notion DB schema against FoodWise models")
parser.add_argument("database_id", nargs="?", help="Override database ID for the selected DB kind")
parser.add_argument("--db", choices=["inventory", "shopping", "all"], default="all", help="Which schema to verify")
args = parser.parse_args()
def run_one(kind: str) -> bool:
env_key_local = "NOTION_INVENTORY_DB_ID" if kind == "inventory" else "NOTION_SHOPPING_DB_ID"
env_db_id_local = os.getenv(env_key_local)
target_local = args.database_id if args.db != "all" else env_db_id_local
if not target_local:
print(f"🎯 {kind}: set {env_key_local} or pass DB_ID when using --db {kind}")
return verify_database_schema(target_local, db_kind=kind)
if args.db == "all":
ok_inv = run_one("inventory")
ok_shop = run_one("shopping")
if ok_inv and ok_shop:
print("\nβœ… Both schemas look compatible.")
sys.exit(0)
print("\n❌ One or more schema checks failed.")
sys.exit(1)
else:
success = run_one(args.db)
if success:
print("\nβœ… Schema looks compatible.")
sys.exit(0)
print("\n❌ Schema check failed.")
sys.exit(1)