trafficManager / traffic_utils.py
LapStore
corrected bug in state change
d3f8ef6
import requests
from database_manager import *
from dotenv import load_dotenv
import os
import state
import asyncio
import threading
load_dotenv(dotenv_path="keys.env")
overpass_url = os.getenv("URL_Traffic")
import time
def get_traffic_in_container(coordinates):
(min_lat, max_lat, min_lon, max_lon) = get_rectangle_container(coordinates)
query = f"""
[out:json];
node["highway"="traffic_signals"]({min_lat},{min_lon},{max_lat},{max_lon});
out body;
"""
response = requests.get(overpass_url, params={'data': query})
data = response.json()
return [(el['lat'], el['lon']) for el in data['elements']]
def get_rectangle_container(coordinates):
lats = [i for i, _ in coordinates] # lats as it from mobile app take lats first
lons = [i for _, i in coordinates]
min_lat, max_lat = min(lats), max(lats)
min_lon, max_lon = min(lons), max(lons)
return min_lat, max_lat, min_lon, max_lon
async def wait_check_conection(tl_id,check_time):
start = time.time()
while time.time() - start < check_time:
if state.singlas_state[tl_id] != state.status_chk:
break
await asyncio.sleep(0.2)
async def open_signal(tl_id,state_,duration,delay):
if (state.singlas_state[tl_id] != state.status_free): #exist old request
request_type = "Accident to stop road " if (state.singlas_state[tl_id] == state.status_acc) else "Exist Emergency request"
return "Request Refused , Exist old request : \n"+ request_type
check_time = 4
#await asyncio.sleep(delay)
state.singlas_state[tl_id] = state.status_chk
print(state.request[tl_id])
await wait_check_conection(tl_id,check_time)
print(state.request[tl_id])
if (state.request[tl_id]['accepted']== True):
threading.Thread(target=handle_request_in_thread, args=(tl_id, state_, duration,delay)).start()
return "Request Accepted"
else:
state.delete_last_request(tl_id)
return "Request Refused , No Internet Connection or Very High Queue Length"
def handle_request_in_thread(tl_id, state_, duration,delay):
time.sleep(delay)
state.set_request(tl_id, state_, duration)
print("βœ… Request Set:", state.request)
time.sleep(duration)
state.delete_last_request(tl_id)
print("🧹 Request Deleted:", state.request)
print("βœ… Request Accepted")
def check_signals(coords):
connection = get_db()
found_signals = []
try:
with connection.cursor() as cursor:
for lat, lon in coords:
cursor.execute("""
SELECT tl_id_sumo FROM traffic_signals
WHERE ABS(lat - %s) < 0.0001 AND ABS(lon - %s) < 0.0001
""", (lon,lat))
result = cursor.fetchone()
if result:
found_signals.append(result['tl_id_sumo'])
finally:
connection.close()
return found_signals