trafficManager / app.py
LapStore
modified api resonse
6069409
from fastapi import FastAPI, HTTPException,BackgroundTasks
import requests
import ast
import pymysql
import time
from database_manager import *
from mqtt_manager import *
from traffic_utils import *
from Utils import *
from threading import Thread
import state
from contextlib import asynccontextmanager
# -------------------------
# App Initialization
# -------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
# Starting code
start_mqtt()
yield
# At End
print("πŸ”» Shutting down...")
app = FastAPI(lifespan=lifespan)
#--------------------------
# Routes
# -------------------------
@app.get("/")
def main():
return {"message": "Hello from FastAPI on Hugging Face Spaces!"}
@app.post("/send-coordinates")
async def receive_coordinates(payload: CoordinatesPayload,background_tasks: BackgroundTasks):
coords= payload.coords
state_= payload.state
duration= payload.duration
delay= payload.delay
if isinstance(coords, str):
coords = ast.literal_eval(coords)
traffic_lights = get_traffic_in_container(coords)
tl_ids = check_signals(traffic_lights)
if tl_ids:
for tl_id in tl_ids:
#background_tasks.add_task(open_signal, tl_id, state_, duration,delay) #"EMR" # "ACC"
answer = await open_signal(tl_id, state_, duration,delay)
print(f"Signal {tl_id} opened with state {state_}, duration {duration}, delay {delay} ,answer: {answer}")
return {"answer": answer} # temp here
else:
print(f"Request failed with coords{coords} state {state_}, duration {duration}, delay {delay} ,answer: No traffic found found")
return {"answer": "No traffic signals found in database"+str(traffic_lights)}