Spaces:
Sleeping
Sleeping
import threading | |
import time | |
import os | |
import paho.mqtt.client as mqtt | |
from dotenv import load_dotenv | |
import state # ูุญุชูู ุนูู `singlas_state` | |
# ------------------ Load .env ------------------ # | |
load_dotenv(dotenv_path="keys.env") | |
mqtt_broker = os.getenv("MQTT_BROKER") | |
mqtt_port = int(os.getenv("MQTT_PORT")) | |
MQTT_PARTOPIC_state = os.getenv("MQTT_PARTOPIC_state") | |
MQTT_SUBTOPIC_reply = os.getenv("MQTT_SUBTOPIC_reply") | |
MQTT_SUBTOPIC_msg = os.getenv("MQTT_SUBTOPIC_msg") | |
def topic_msg(tl_id): | |
return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_msg}" | |
def topic_rep(tl_id): | |
return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_reply}" | |
# ------------------ Get state ------------------ # | |
def get_State(tl_id): | |
return state.singlas_state.get(tl_id, "FREE") | |
def checked_ql(ql): | |
max_queue_length = 16 # 15 # this is the maximum queue length for the road | |
return int(ql)<max_queue_length # for now, always return True, can be improved later to check if the queue length is valid or not | |
# ------------------ Listener ------------------ # | |
def mqtt_listener(): | |
def on_message(client, userdata, message): | |
msg = message.payload.decode() | |
topic = message.topic | |
tl_id = topic.split('/')[1] | |
print(f"{state.request}-----{tl_id}") | |
print(f"๐ฅ Reply Received on {topic}: {msg}") | |
if (msg == 'AVBL'): | |
state.request[tl_id]['accepted'] = True | |
state.singlas_state[tl_id] = state.request[tl_id]['State'] | |
next_request = 'DONE QL?' # ask for queue length if to open road in emergency | |
client.publish(topic_msg(tl_id), next_request, qos=1) | |
if (msg.startswith("QL")): | |
ql = msg.split()[1] | |
if checked_ql(ql): | |
client.publish(topic_msg(tl_id), state.singlas_state[tl_id] +" "+str(state.request[tl_id]['Duration']), qos=1) | |
#if(state.request[tl_id]['State']==state.status_emr): | |
# client.publish(topic_msg(tl_id), state.status_emr +" "+str(state.request[tl_id]['Duration']), qos=1) | |
client = mqtt.Client() | |
client.connect(mqtt_broker, mqtt_port) | |
# ุงุดุชุฑู ูู ูู ุงูุฑุฏูุฏ | |
for tl_id in state.singlas_state.keys(): | |
client.subscribe(topic_rep(tl_id)) | |
print(f"โ Subscribed to {topic_rep(tl_id)}") | |
client.on_message = on_message | |
client.loop_forever() | |
# ------------------ Publisher ------------------ # | |
def mqtt_publisher_loop(tl_id): | |
pub_client = mqtt.Client() | |
pub_client.connect(mqtt_broker, mqtt_port) | |
pub_client.loop_start() | |
last_state = None | |
while True: | |
current_state = state.singlas_state[tl_id] | |
if current_state != last_state: | |
print(f"๐ก state {current_state} on {topic_msg(tl_id)} ,state") | |
#if current_state != state.status_free: | |
topic = topic_msg(tl_id) | |
pub_client.publish(topic, current_state,qos=1) | |
last_state = current_state | |
time.sleep(.01) | |
# ------------------ Startup ------------------ # | |
def start_mqtt(): | |
# Start listener once | |
threading.Thread(target=mqtt_listener, daemon=True).start() | |
# Start one publisher per tl_id | |
for tl_id in state.singlas_state.keys(): | |
threading.Thread(target=mqtt_publisher_loop, args=(tl_id,), daemon=True).start() | |
''' | |
from fastapi import WebSocket, WebSocketDisconnect | |
import asyncio | |
@app.websocket("/ws/state") | |
async def websocket_get_state(websocket: WebSocket): | |
await websocket.accept() | |
try: | |
tl_id = await websocket.receive_text() | |
print(f"โ Client subscribed to {tl_id}") | |
last_state = None | |
while True: | |
current_state = get_State(tl_id) | |
if current_state != last_state: | |
await websocket.send_text(current_state) | |
last_state = current_state | |
await asyncio.sleep(1) # ูู ุซุงููุฉ ูุดูู ุงูุชุบููุฑ | |
except WebSocketDisconnect: | |
print("โ Client disconnected") | |
@app.get("/test-db") | |
def test_db(): | |
try: | |
connection = get_db() | |
with connection.cursor() as cursor: | |
cursor.execute("SELECT 1") | |
result = cursor.fetchone() | |
connection.close() | |
return {"status": "connected", "result": result} | |
except Exception as e: | |
return {"status": "error", "detail": str(e)} | |
''' | |