Spaces:
Sleeping
Sleeping
import requests | |
from database_manager import * | |
from dotenv import load_dotenv | |
import os | |
import state | |
import asyncio | |
import threading | |
load_dotenv(dotenv_path="keys.env") | |
overpass_url = os.getenv("URL_Traffic") | |
import time | |
def get_traffic_in_container(coordinates): | |
(min_lat, max_lat, min_lon, max_lon) = get_rectangle_container(coordinates) | |
query = f""" | |
[out:json]; | |
node["highway"="traffic_signals"]({min_lat},{min_lon},{max_lat},{max_lon}); | |
out body; | |
""" | |
response = requests.get(overpass_url, params={'data': query}) | |
data = response.json() | |
return [(el['lat'], el['lon']) for el in data['elements']] | |
def get_rectangle_container(coordinates): | |
lats = [i for i, _ in coordinates] # lats as it from mobile app take lats first | |
lons = [i for _, i in coordinates] | |
min_lat, max_lat = min(lats), max(lats) | |
min_lon, max_lon = min(lons), max(lons) | |
return min_lat, max_lat, min_lon, max_lon | |
async def wait_check_conection(tl_id,check_time): | |
start = time.time() | |
while time.time() - start < check_time: | |
if state.singlas_state[tl_id] != state.status_chk: | |
break | |
await asyncio.sleep(0.2) | |
async def open_signal(tl_id,state_,duration,delay): | |
if (state.singlas_state[tl_id] != state.status_free): #exist old request | |
request_type = "Accident to stop road " if (state.singlas_state[tl_id] == state.status_acc) else "Exist Emergency request" | |
return "Request Refused , Exist old request : \n"+ request_type | |
check_time = 4 | |
#await asyncio.sleep(delay) | |
state.singlas_state[tl_id] = state.status_chk | |
print(state.request[tl_id]) | |
await wait_check_conection(tl_id,check_time) | |
print(state.request[tl_id]) | |
if (state.request[tl_id]['accepted']== True): | |
threading.Thread(target=handle_request_in_thread, args=(tl_id, state_, duration,delay)).start() | |
return "Request Accepted" | |
else: | |
state.delete_last_request(tl_id) | |
return "Request Refused , No Internet Connection or Very High Queue Length" | |
def handle_request_in_thread(tl_id, state_, duration,delay): | |
time.sleep(delay) | |
state.set_request(tl_id, state_, duration) | |
print("β Request Set:", state.request) | |
time.sleep(duration) | |
state.delete_last_request(tl_id) | |
print("π§Ή Request Deleted:", state.request) | |
print("β Request Accepted") | |
def check_signals(coords): | |
connection = get_db() | |
found_signals = [] | |
try: | |
with connection.cursor() as cursor: | |
for lat, lon in coords: | |
cursor.execute(""" | |
SELECT tl_id_sumo FROM traffic_signals | |
WHERE ABS(lat - %s) < 0.0001 AND ABS(lon - %s) < 0.0001 | |
""", (lon,lat)) | |
result = cursor.fetchone() | |
if result: | |
found_signals.append(result['tl_id_sumo']) | |
finally: | |
connection.close() | |
return found_signals | |