Spaces:
Sleeping
Sleeping
File size: 3,000 Bytes
ee3c704 f931fbf 3eeeacd dc09e66 b658df6 f931fbf ee3c704 3eeeacd 038d77e ee3c704 f5f278d ee3c704 dc09e66 da5366e dc09e66 ee3c704 95608ed 3cbb380 da5366e 618e662 bf9f427 618e662 d3f8ef6 5564f37 618e662 4256e8a 5edff06 78746bc 6a2b4c4 5564f37 95608ed bb00cfa d3f8ef6 16fd0dd 95608ed 1be15e1 95608ed 3eeeacd d3f8ef6 b658df6 ee3c704 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
import requests
from database_manager import *
from dotenv import load_dotenv
import os
import state
import asyncio
import threading
load_dotenv(dotenv_path="keys.env")
overpass_url = os.getenv("URL_Traffic")
import time
def get_traffic_in_container(coordinates):
(min_lat, max_lat, min_lon, max_lon) = get_rectangle_container(coordinates)
query = f"""
[out:json];
node["highway"="traffic_signals"]({min_lat},{min_lon},{max_lat},{max_lon});
out body;
"""
response = requests.get(overpass_url, params={'data': query})
data = response.json()
return [(el['lat'], el['lon']) for el in data['elements']]
def get_rectangle_container(coordinates):
lats = [i for i, _ in coordinates] # lats as it from mobile app take lats first
lons = [i for _, i in coordinates]
min_lat, max_lat = min(lats), max(lats)
min_lon, max_lon = min(lons), max(lons)
return min_lat, max_lat, min_lon, max_lon
async def wait_check_conection(tl_id,check_time):
start = time.time()
while time.time() - start < check_time:
if state.singlas_state[tl_id] != state.status_chk:
break
await asyncio.sleep(0.2)
async def open_signal(tl_id,state_,duration,delay):
if (state.singlas_state[tl_id] != state.status_free): #exist old request
request_type = "Accident to stop road " if (state.singlas_state[tl_id] == state.status_acc) else "Exist Emergency request"
return "Request Refused , Exist old request : \n"+ request_type
check_time = 4
#await asyncio.sleep(delay)
state.singlas_state[tl_id] = state.status_chk
print(state.request[tl_id])
await wait_check_conection(tl_id,check_time)
print(state.request[tl_id])
if (state.request[tl_id]['accepted']== True):
threading.Thread(target=handle_request_in_thread, args=(tl_id, state_, duration,delay)).start()
return "Request Accepted"
else:
state.delete_last_request(tl_id)
return "Request Refused , No Internet Connection or Very High Queue Length"
def handle_request_in_thread(tl_id, state_, duration,delay):
time.sleep(delay)
state.set_request(tl_id, state_, duration)
print("✅ Request Set:", state.request)
time.sleep(duration)
state.delete_last_request(tl_id)
print("🧹 Request Deleted:", state.request)
print("✅ Request Accepted")
def check_signals(coords):
connection = get_db()
found_signals = []
try:
with connection.cursor() as cursor:
for lat, lon in coords:
cursor.execute("""
SELECT tl_id_sumo FROM traffic_signals
WHERE ABS(lat - %s) < 0.0001 AND ABS(lon - %s) < 0.0001
""", (lon,lat))
result = cursor.fetchone()
if result:
found_signals.append(result['tl_id_sumo'])
finally:
connection.close()
return found_signals
|