trafficManager / mqtt_manager.py
LapStore
corrected bug in queue length check
80dfab2
import threading
import time
import os
import paho.mqtt.client as mqtt
from dotenv import load_dotenv
import state # ูŠุญุชูˆูŠ ุนู„ู‰ `singlas_state`
# ------------------ Load .env ------------------ #
load_dotenv(dotenv_path="keys.env")
mqtt_broker = os.getenv("MQTT_BROKER")
mqtt_port = int(os.getenv("MQTT_PORT"))
MQTT_PARTOPIC_state = os.getenv("MQTT_PARTOPIC_state")
MQTT_SUBTOPIC_reply = os.getenv("MQTT_SUBTOPIC_reply")
MQTT_SUBTOPIC_msg = os.getenv("MQTT_SUBTOPIC_msg")
def topic_msg(tl_id):
return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_msg}"
def topic_rep(tl_id):
return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_reply}"
# ------------------ Get state ------------------ #
def get_State(tl_id):
return state.singlas_state.get(tl_id, "FREE")
def checked_ql(ql):
max_queue_length = 16 # 15 # this is the maximum queue length for the road
return int(ql)<max_queue_length # for now, always return True, can be improved later to check if the queue length is valid or not
# ------------------ Listener ------------------ #
def mqtt_listener():
def on_message(client, userdata, message):
msg = message.payload.decode()
topic = message.topic
tl_id = topic.split('/')[1]
print(f"{state.request}-----{tl_id}")
print(f"๐Ÿ“ฅ Reply Received on {topic}: {msg}")
if (msg == 'AVBL'):
state.request[tl_id]['accepted'] = True
state.singlas_state[tl_id] = state.request[tl_id]['State']
next_request = 'DONE QL?' # ask for queue length if to open road in emergency
client.publish(topic_msg(tl_id), next_request, qos=1)
if (msg.startswith("QL")):
ql = msg.split()[1]
if checked_ql(ql):
client.publish(topic_msg(tl_id), state.singlas_state[tl_id] +" "+str(state.request[tl_id]['Duration']), qos=1)
#if(state.request[tl_id]['State']==state.status_emr):
# client.publish(topic_msg(tl_id), state.status_emr +" "+str(state.request[tl_id]['Duration']), qos=1)
client = mqtt.Client()
client.connect(mqtt_broker, mqtt_port)
# ุงุดุชุฑูƒ ููŠ ูƒู„ ุงู„ุฑุฏูˆุฏ
for tl_id in state.singlas_state.keys():
client.subscribe(topic_rep(tl_id))
print(f"โœ… Subscribed to {topic_rep(tl_id)}")
client.on_message = on_message
client.loop_forever()
# ------------------ Publisher ------------------ #
def mqtt_publisher_loop(tl_id):
pub_client = mqtt.Client()
pub_client.connect(mqtt_broker, mqtt_port)
pub_client.loop_start()
last_state = None
while True:
current_state = state.singlas_state[tl_id]
if current_state != last_state:
print(f"๐Ÿ“ก state {current_state} on {topic_msg(tl_id)} ,state")
#if current_state != state.status_free:
topic = topic_msg(tl_id)
pub_client.publish(topic, current_state,qos=1)
last_state = current_state
time.sleep(.01)
# ------------------ Startup ------------------ #
def start_mqtt():
# Start listener once
threading.Thread(target=mqtt_listener, daemon=True).start()
# Start one publisher per tl_id
for tl_id in state.singlas_state.keys():
threading.Thread(target=mqtt_publisher_loop, args=(tl_id,), daemon=True).start()
'''
from fastapi import WebSocket, WebSocketDisconnect
import asyncio
@app.websocket("/ws/state")
async def websocket_get_state(websocket: WebSocket):
await websocket.accept()
try:
tl_id = await websocket.receive_text()
print(f"โœ… Client subscribed to {tl_id}")
last_state = None
while True:
current_state = get_State(tl_id)
if current_state != last_state:
await websocket.send_text(current_state)
last_state = current_state
await asyncio.sleep(1) # ูƒู„ ุซุงู†ูŠุฉ ูŠุดูŠูƒ ุงู„ุชุบูŠูŠุฑ
except WebSocketDisconnect:
print("โŒ Client disconnected")
@app.get("/test-db")
def test_db():
try:
connection = get_db()
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
connection.close()
return {"status": "connected", "result": result}
except Exception as e:
return {"status": "error", "detail": str(e)}
'''