from fastapi import FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse from pydantic import BaseModel from typing import List, Optional import uvicorn import torch from scripts.model import Net from scripts.training.train import train, start_comparison_training from pathlib import Path from fastapi import BackgroundTasks import warnings import asyncio import json import numpy as np warnings.filterwarnings("ignore", category=UserWarning, module="torchvision.transforms") app = FastAPI() # Mount static files with a name parameter app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") # Model configurations class TrainingConfig(BaseModel): block1: int block2: int block3: int optimizer: str batch_size: int epochs: int = 1 class ComparisonConfig(BaseModel): model1: TrainingConfig model2: TrainingConfig def get_available_models(): models_dir = Path("scripts/training/models") if not models_dir.exists(): models_dir.mkdir(exist_ok=True, parents=True) return [f.stem for f in models_dir.glob("*.pth")] # Add a global variable to store training task training_task = None @app.get("/", response_class=HTMLResponse) async def home(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/train", response_class=HTMLResponse) async def train_page(request: Request): return templates.TemplateResponse("train.html", {"request": request}) @app.get("/inference", response_class=HTMLResponse) async def inference_page(request: Request): available_models = get_available_models() return templates.TemplateResponse( "inference.html", { "request": request, "available_models": available_models } ) @app.post("/train") async def train_model(config: TrainingConfig, background_tasks: BackgroundTasks): try: # Create model instance with the configuration model = Net( kernels=[config.block1, config.block2, config.block3] ) # Store training configuration training_config = { "optimizer": config.optimizer, "batch_size": config.batch_size } return {"status": "success", "message": "Training configuration received"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.websocket("/ws/train") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: print("WebSocket connection accepted for single model training") config_data = await websocket.receive_json() print(f"Received config data: {config_data}") model = Net( kernels=[ config_data['block1'], config_data['block2'], config_data['block3'] ] ) # Create TrainingConfig object for single model using **kwargs config = TrainingConfig(**{ 'block1': config_data['block1'], 'block2': config_data['block2'], 'block3': config_data['block3'], 'optimizer': config_data['optimizer'], 'batch_size': config_data['batch_size'], 'epochs': config_data['epochs'] }) print(f"Starting training with config: {config_data}") try: await train(model, config, websocket, model_type="single") except Exception as e: print(f"Training error: {str(e)}") await websocket.send_json({ "type": "training_error", "data": { "message": f"Training failed: {str(e)}" } }) except WebSocketDisconnect: print("WebSocket disconnected") except Exception as e: print(f"WebSocket error: {str(e)}") await websocket.send_json({ "type": "training_error", "data": { "message": f"WebSocket error: {str(e)}" } }) finally: print("WebSocket connection closed") @app.websocket("/ws/compare") async def websocket_endpoint(websocket: WebSocket): print("\n=== New WebSocket Connection ===") print("New WebSocket connection attempt") try: await websocket.accept() print("WebSocket connection accepted") print("Waiting for initial message...") data = await websocket.receive_json() print(f"Received initial message: {data}") if 'action' not in data: print("Error: Missing 'action' in message") await websocket.send_json({ 'status': 'error', 'message': 'Missing action in request' }) return if data['action'] == 'start_training': if 'parameters' not in data: print("Error: Missing 'parameters' in message") await websocket.send_json({ 'status': 'error', 'message': 'Missing parameters in request' }) return print("Starting training task") try: training_task = asyncio.create_task(start_comparison_training( websocket, data['parameters'] )) print("Training task created, awaiting completion...") await training_task print("Training task completed") except Exception as e: print(f"Error during training task: {str(e)}") await websocket.send_json({ 'status': 'error', 'message': f'Training error: {str(e)}' }) else: print(f"Unknown action received: {data['action']}") except WebSocketDisconnect: print("WebSocket disconnected") except json.JSONDecodeError as e: print(f"JSON decode error: {str(e)}") except Exception as e: print(f"Unexpected error in websocket handler: {str(e)}") finally: print("=== WebSocket Connection Closed ===\n") # @app.post("/api/train_single") # async def train_single_model(config: TrainingConfig): # try: # model = Net(kernels=config.kernels) # # Start training without passing the websocket # await train(model, config) # return {"status": "success"} # except Exception as e: # # Log the error for debugging # print(f"Error during training: {str(e)}") # # Return a JSON response with the error message # raise HTTPException(status_code=500, detail=f"Error during training: {str(e)}") @app.post("/api/train_compare") async def train_compare_models(config: ComparisonConfig): try: # Train both models model1 = Net(kernels=config.model1.kernels) model2 = Net(kernels=config.model2.kernels) results1 = train(model1, config.model1) results2 = train(model2, config.model2) return { "status": "success", "model1_results": results1, "model2_results": results2 } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) def parse_model_filename(filename): """Extract configuration from model filename""" # Example filename: single_arch_32_64_128_opt_adam_batch_64_20240322_123456.pth try: parts = filename.split('_') # Find architecture values arch_index = parts.index('arch') block1 = int(parts[arch_index + 1]) block2 = int(parts[arch_index + 2]) block3 = int(parts[arch_index + 3]) # Find optimizer opt_index = parts.index('opt') optimizer = parts[opt_index + 1] # Find batch size batch_index = parts.index('batch') batch_size = int(parts[batch_index + 1]) return { 'block1': block1, 'block2': block2, 'block3': block3, 'optimizer': optimizer, 'batch_size': batch_size } except Exception as e: print(f"Error parsing model filename: {e}") return None @app.post("/api/inference") async def perform_inference(data: dict): try: model_name = data.get("model_name") if not model_name: raise HTTPException(status_code=400, detail="No model selected") model_path = Path("scripts/training/models") / f"{model_name}.pth" if not model_path.exists(): raise HTTPException(status_code=404, detail=f"Model not found: {model_path}") # Parse model configuration from filename config = parse_model_filename(model_name) if not config: raise HTTPException(status_code=500, detail="Could not parse model configuration") # Create model with the correct configuration model = Net( kernels=[ config['block1'], config['block2'], config['block3'] ] ) # Load model weights model.load_state_dict(torch.load(str(model_path), map_location=torch.device('cpu'), weights_only=True)) model.eval() # Process image data and get prediction image_data = data.get("image") if not image_data: raise HTTPException(status_code=400, detail="No image data provided") # Convert base64 image to tensor and process try: # Remove the data URL prefix image_data = image_data.split(',')[1] import base64 import io from PIL import Image import torchvision.transforms as transforms # Decode base64 to image image_bytes = base64.b64decode(image_data) image = Image.open(io.BytesIO(image_bytes)).convert('L') # Convert to grayscale # Resize using PIL directly with LANCZOS image = image.resize((28, 28), Image.LANCZOS) # Invert the image (subtract from 255 to invert grayscale) image = Image.fromarray(255 - np.array(image)) # Preprocess image transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ]) # Convert to tensor and add batch dimension image_tensor = transform(image).unsqueeze(0) # Get prediction with torch.no_grad(): output = model(image_tensor) prediction = output.argmax(dim=1).item() # Add configuration info to response return { "prediction": prediction, "model_config": { "architecture": f"{config['block1']}-{config['block2']}-{config['block3']}", "optimizer": config['optimizer'], "batch_size": config['batch_size'] } } except Exception as e: raise HTTPException(status_code=500, detail=f"Error processing image: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/train/single", response_class=HTMLResponse) async def train_single_page(request: Request): return templates.TemplateResponse("train_single.html", {"request": request}) @app.get("/train/compare", response_class=HTMLResponse) async def train_compare_page(request: Request): return templates.TemplateResponse("train_compare.html", {"request": request}) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)