Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, Depends, HTTPException, Request | |
from sqlalchemy.orm import Session | |
from typing import List | |
from datetime import datetime, timezone | |
from ..database import get_db, Table as TableModel, Order, get_session_db | |
from ..models.table import Table, TableCreate, TableUpdate, TableStatus | |
from ..middleware import get_session_id | |
router = APIRouter( | |
prefix="/tables", | |
tags=["tables"], | |
responses={404: {"description": "Not found"}}, | |
) | |
# Dependency to get session-aware database | |
def get_session_database(request: Request): | |
session_id = get_session_id(request) | |
return next(get_session_db(session_id)) | |
# Get all tables | |
def get_all_tables(request: Request, db: Session = Depends(get_session_database)): | |
return db.query(TableModel).order_by(TableModel.table_number).all() | |
# Get table by ID | |
def get_table(table_id: int, request: Request, db: Session = Depends(get_session_database)): | |
db_table = db.query(TableModel).filter(TableModel.id == table_id).first() | |
if not db_table: | |
raise HTTPException(status_code=404, detail="Table not found") | |
return db_table | |
# Get table by table number | |
def get_table_by_number(table_number: int, request: Request, db: Session = Depends(get_session_database)): | |
db_table = ( | |
db.query(TableModel).filter(TableModel.table_number == table_number).first() | |
) | |
if not db_table: | |
raise HTTPException(status_code=404, detail="Table not found") | |
return db_table | |
# Create new table | |
def create_table(table: TableCreate, request: Request, db: Session = Depends(get_session_database)): | |
# Check if a table with this number already exists | |
existing_table = ( | |
db.query(TableModel) | |
.filter(TableModel.table_number == table.table_number) | |
.first() | |
) | |
if existing_table: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Table with number {table.table_number} already exists", | |
) | |
# Create new table | |
db_table = TableModel( | |
table_number=table.table_number, | |
is_occupied=table.is_occupied, | |
current_order_id=table.current_order_id, | |
last_occupied_at=table.last_occupied_at, | |
created_at=datetime.now(timezone.utc), | |
updated_at=datetime.now(timezone.utc), | |
) | |
db.add(db_table) | |
db.commit() | |
db.refresh(db_table) | |
return db_table | |
# Update table | |
def update_table( | |
table_id: int, table_update: TableUpdate, request: Request, db: Session = Depends(get_session_database) | |
): | |
db_table = db.query(TableModel).filter(TableModel.id == table_id).first() | |
if not db_table: | |
raise HTTPException(status_code=404, detail="Table not found") | |
# Update fields if provided | |
if table_update.is_occupied is not None: | |
db_table.is_occupied = table_update.is_occupied | |
if table_update.current_order_id is not None: | |
db_table.current_order_id = table_update.current_order_id | |
db_table.updated_at = datetime.now(timezone.utc) | |
db.commit() | |
db.refresh(db_table) | |
return db_table | |
# Delete table | |
def delete_table(table_id: int, request: Request, db: Session = Depends(get_session_database)): | |
db_table = db.query(TableModel).filter(TableModel.id == table_id).first() | |
if not db_table: | |
raise HTTPException(status_code=404, detail="Table not found") | |
# Check if table is currently occupied | |
if db_table.is_occupied: | |
raise HTTPException( | |
status_code=400, detail="Cannot delete a table that is currently occupied" | |
) | |
db.delete(db_table) | |
db.commit() | |
return {"message": "Table deleted successfully"} | |
# Get table status (total, occupied, free) | |
def get_table_status(request: Request, db: Session = Depends(get_session_database)): | |
total_tables = db.query(TableModel).count() | |
occupied_tables = ( | |
db.query(TableModel).filter(TableModel.is_occupied == True).count() | |
) | |
free_tables = total_tables - occupied_tables | |
return { | |
"total_tables": total_tables, | |
"occupied_tables": occupied_tables, | |
"free_tables": free_tables, | |
} | |
# Set table as occupied | |
def set_table_occupied( | |
table_id: int, order_id: int = None, request: Request = None, db: Session = Depends(get_session_database) | |
): | |
db_table = db.query(TableModel).filter(TableModel.id == table_id).first() | |
if not db_table: | |
raise HTTPException(status_code=404, detail="Table not found") | |
# Check if table is already occupied | |
if db_table.is_occupied: | |
raise HTTPException(status_code=400, detail="Table is already occupied") | |
# Update table status | |
db_table.is_occupied = True | |
# Link to order if provided | |
if order_id: | |
# Verify order exists | |
order = db.query(Order).filter(Order.id == order_id).first() | |
if not order: | |
raise HTTPException(status_code=404, detail="Order not found") | |
db_table.current_order_id = order_id | |
db_table.updated_at = datetime.now(timezone.utc) | |
db.commit() | |
db.refresh(db_table) | |
return db_table | |
# Set table as free | |
def set_table_free(table_id: int, request: Request, db: Session = Depends(get_session_database)): | |
db_table = db.query(TableModel).filter(TableModel.id == table_id).first() | |
if not db_table: | |
raise HTTPException(status_code=404, detail="Table not found") | |
# Check if table is already free | |
if not db_table.is_occupied: | |
raise HTTPException(status_code=400, detail="Table is already free") | |
# Update table status | |
db_table.is_occupied = False | |
db_table.current_order_id = None | |
db_table.updated_at = datetime.now(timezone.utc) | |
db.commit() | |
db.refresh(db_table) | |
return db_table | |
# Set table as occupied by table number | |
def set_table_occupied_by_number(table_number: int, request: Request, db: Session = Depends(get_session_database)): | |
db_table = ( | |
db.query(TableModel).filter(TableModel.table_number == table_number).first() | |
) | |
if not db_table: | |
raise HTTPException(status_code=404, detail="Table not found") | |
# Update table status (even if already occupied, just update the timestamp) | |
db_table.is_occupied = True | |
db_table.last_occupied_at = datetime.now(timezone.utc) | |
db_table.updated_at = datetime.now(timezone.utc) | |
db.commit() | |
db.refresh(db_table) | |
return db_table | |
# Set table as free by table number | |
def set_table_free_by_number(table_number: int, request: Request, db: Session = Depends(get_session_database)): | |
db_table = ( | |
db.query(TableModel).filter(TableModel.table_number == table_number).first() | |
) | |
if not db_table: | |
raise HTTPException(status_code=404, detail="Table not found") | |
# Update table status to free (don't check if already free, just update) | |
db_table.is_occupied = False | |
db_table.current_order_id = None | |
db_table.updated_at = datetime.now(timezone.utc) | |
db.commit() | |
db.refresh(db_table) | |
return db_table | |
# Create multiple tables at once | |
def create_tables_batch(num_tables: int, request: Request, db: Session = Depends(get_session_database)): | |
if num_tables <= 0: | |
raise HTTPException( | |
status_code=400, detail="Number of tables must be greater than 0" | |
) | |
# Get the highest existing table number | |
highest_table = ( | |
db.query(TableModel).order_by(TableModel.table_number.desc()).first() | |
) | |
start_number = 1 | |
if highest_table: | |
start_number = highest_table.table_number + 1 | |
# Create tables | |
new_tables = [] | |
for i in range(start_number, start_number + num_tables): | |
db_table = TableModel( | |
table_number=i, | |
is_occupied=False, | |
created_at=datetime.now(timezone.utc), | |
updated_at=datetime.now(timezone.utc), | |
) | |
db.add(db_table) | |
new_tables.append(db_table) | |
db.commit() | |
# Refresh all tables | |
for table in new_tables: | |
db.refresh(table) | |
return new_tables | |