Spaces:
Running
on
Zero
Running
on
Zero
import logging | |
import traceback | |
import numpy as np | |
from typing import Dict, List, Any, Optional | |
logger = logging.getLogger(__name__) | |
class PatternAnalyzer: | |
""" | |
負責各種模式分析,包含交通流動、行人穿越、車輛分佈等的辨識 | |
專門處理動態區域和移動相關的區域分析 | |
""" | |
def __init__(self): | |
"""初始化模式分析器""" | |
try: | |
logger.info("PatternAnalyzer initialized successfully") | |
except Exception as e: | |
logger.error(f"Failed to initialize PatternAnalyzer: {str(e)}") | |
logger.error(traceback.format_exc()) | |
raise | |
def analyze_crossing_patterns(self, pedestrians: List[Dict], traffic_lights: List[Dict]) -> Dict: | |
""" | |
Analyze pedestrian crossing patterns to identify crossing zones. | |
若同一 region 中同時有行人與紅綠燈,則將兩者都放入該區域的 objects。 | |
Args: | |
pedestrians: 行人物件列表(每個 obj 應包含 'class_id', 'region', 'confidence' 等) | |
traffic_lights: 紅綠燈物件列表(每個 obj 應包含 'class_id', 'region', 'confidence' 等) | |
Returns: | |
crossing_zones: 字典,key 為 zone 名稱,value 包含 'region', 'objects', 'description' | |
""" | |
try: | |
crossing_zones = {} | |
# 如果沒有任何行人,就不辨識任何 crossing zone | |
if not pedestrians: | |
return crossing_zones | |
# (1) 按照 region 分組行人 | |
pedestrian_regions = {} | |
for p in pedestrians: | |
region = p["region"] | |
pedestrian_regions.setdefault(region, []).append(p) | |
# (2) 針對每個 region,看是否同時有紅綠燈 | |
# 建立一個對照表 mapping: region -> { "pedestrians": [...], "traffic_lights": [...] } | |
combined_regions = {} | |
for region, peds in pedestrian_regions.items(): | |
# 取得該 region 下所有紅綠燈 | |
tls_in_region = [t for t in traffic_lights if t["region"] == region] | |
combined_regions[region] = { | |
"pedestrians": peds, | |
"traffic_lights": tls_in_region | |
} | |
# (3) 按照行人數量排序,找出前兩個需要建立 crossing zone 的 region | |
sorted_regions = sorted( | |
combined_regions.items(), | |
key=lambda x: len(x[1]["pedestrians"]), | |
reverse=True | |
) | |
# (4) 將前兩個 region 建立 Crossing Zone,objects 同時包含行人與紅綠燈 | |
for idx, (region, group) in enumerate(sorted_regions[:2]): | |
peds = group["pedestrians"] | |
tls = group["traffic_lights"] | |
has_nearby_signals = len(tls) > 0 | |
# 生成 zone_name(基於 region 方向 + idx 決定主/次 crossing) | |
direction = self._get_directional_description_local(region) | |
if direction and direction != "central": | |
zone_name = f"{direction} crossing area" | |
else: | |
zone_name = "main crossing area" if idx == 0 else "secondary crossing area" | |
# 組合 description | |
description = f"Pedestrian crossing area with {len(peds)} " | |
description += "person" if len(peds) == 1 else "people" | |
if direction: | |
description += f" in {direction} direction" | |
if has_nearby_signals: | |
description += " near traffic signals" | |
# 將行人 + 同區紅綠燈一併放入 objects | |
obj_list = ["pedestrian"] * len(peds) | |
if has_nearby_signals: | |
obj_list += ["traffic light"] * len(tls) | |
crossing_zones[zone_name] = { | |
"region": region, | |
"objects": obj_list, | |
"description": description | |
} | |
return crossing_zones | |
except Exception as e: | |
logger.error(f"Error in analyze_crossing_patterns: {str(e)}") | |
logger.error(traceback.format_exc()) | |
return {} | |
def analyze_traffic_zones(self, vehicles: List[Dict]) -> Dict: | |
""" | |
分析車輛分布以識別具有方向感知的交通區域 | |
Args: | |
vehicles: 車輛物件列表 | |
Returns: | |
識別出的交通區域字典 | |
""" | |
try: | |
traffic_zones = {} | |
if not vehicles: | |
return traffic_zones | |
# 按區域分組車輛 | |
vehicle_regions = {} | |
for v in vehicles: | |
region = v["region"] | |
if region not in vehicle_regions: | |
vehicle_regions[region] = [] | |
vehicle_regions[region].append(v) | |
# 為有車輛的區域創建交通區域 | |
main_traffic_region = max(vehicle_regions.items(), key=lambda x: len(x[1]), default=(None, [])) | |
if main_traffic_region[0] is not None: | |
region = main_traffic_region[0] | |
vehicles_in_region = main_traffic_region[1] | |
# 獲取車輛類型列表用於描述 | |
vehicle_types = [v["class_name"] for v in vehicles_in_region] | |
unique_types = list(set(vehicle_types)) | |
# 獲取方向描述 | |
direction = self._get_directional_description_local(region) | |
# 創建描述性區域 | |
traffic_zones["vehicle_zone"] = { | |
"region": region, | |
"objects": vehicle_types, | |
"description": f"Vehicle traffic area with {', '.join(unique_types[:3])}" + | |
(f" in {direction} area" if direction else "") | |
} | |
# 如果車輛分布在多個區域,創建次要區域 | |
if len(vehicle_regions) > 1: | |
# 獲取第二大車輛聚集區域 | |
sorted_regions = sorted(vehicle_regions.items(), key=lambda x: len(x[1]), reverse=True) | |
if len(sorted_regions) > 1: | |
second_region, second_vehicles = sorted_regions[1] | |
direction = self._get_directional_description_local(second_region) | |
vehicle_types = [v["class_name"] for v in second_vehicles] | |
unique_types = list(set(vehicle_types)) | |
traffic_zones["secondary_vehicle_zone"] = { | |
"region": second_region, | |
"objects": vehicle_types, | |
"description": f"Secondary traffic area with {', '.join(unique_types[:2])}" + | |
(f" in {direction} direction" if direction else "") | |
} | |
return traffic_zones | |
except Exception as e: | |
logger.error(f"Error analyzing traffic zones: {str(e)}") | |
logger.error(traceback.format_exc()) | |
return {} | |
def analyze_aerial_traffic_patterns(self, vehicle_objs: List[Dict]) -> Dict: | |
""" | |
分析空中視角的車輛交通模式 | |
Args: | |
vehicle_objs: 車輛物件列表 | |
Returns: | |
交通模式區域字典 | |
""" | |
try: | |
zones = {} | |
if not vehicle_objs: | |
return zones | |
# 將位置轉換為數組進行模式分析 | |
positions = np.array([obj["normalized_center"] for obj in vehicle_objs]) | |
if len(positions) >= 2: | |
# 計算分布指標 | |
x_coords = positions[:, 0] | |
y_coords = positions[:, 1] | |
x_mean = np.mean(x_coords) | |
y_mean = np.mean(y_coords) | |
x_std = np.std(x_coords) | |
y_std = np.std(y_coords) | |
# 判斷車輛是否組織成車道 | |
if x_std < y_std * 0.5: | |
# 車輛垂直對齊 - 代表南北交通 | |
zones["vertical_traffic_flow"] = { | |
"region": "central_vertical", | |
"objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
"description": "North-south traffic flow visible from aerial view" | |
} | |
elif y_std < x_std * 0.5: | |
# 車輛水平對齊 - 代表東西交通 | |
zones["horizontal_traffic_flow"] = { | |
"region": "central_horizontal", | |
"objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
"description": "East-west traffic flow visible from aerial view" | |
} | |
else: | |
# 車輛多方向 - 代表十字路口 | |
zones["intersection_traffic"] = { | |
"region": "central", | |
"objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
"description": "Multi-directional traffic at intersection visible from aerial view" | |
} | |
return zones | |
except Exception as e: | |
logger.error(f"Error analyzing aerial traffic patterns: {str(e)}") | |
logger.error(traceback.format_exc()) | |
return {} | |
def identify_park_recreational_zones(self, detected_objects: List[Dict]) -> Dict: | |
""" | |
識別公園的休閒活動區域 | |
Args: | |
detected_objects: 檢測到的物件列表 | |
Returns: | |
休閒區域字典 | |
""" | |
try: | |
zones = {} | |
# 尋找休閒物件(運動球、風箏等) | |
rec_items = [] | |
rec_regions = {} | |
for obj in detected_objects: | |
if obj["class_id"] in [32, 33, 34, 35, 38]: # sports ball, kite, baseball bat, glove, tennis racket | |
region = obj["region"] | |
if region not in rec_regions: | |
rec_regions[region] = [] | |
rec_regions[region].append(obj) | |
rec_items.append(obj["class_name"]) | |
if rec_items: | |
main_rec_region = max(rec_regions.items(), | |
key=lambda x: len(x[1]), | |
default=(None, [])) | |
if main_rec_region[0] is not None: | |
zones["recreational_zone"] = { | |
"region": main_rec_region[0], | |
"objects": list(set(rec_items)), | |
"description": f"Recreational area with {', '.join(list(set(rec_items)))}" | |
} | |
return zones | |
except Exception as e: | |
logger.error(f"Error identifying park recreational zones: {str(e)}") | |
logger.error(traceback.format_exc()) | |
return {} | |
def identify_parking_zones(self, detected_objects: List[Dict]) -> Dict: | |
""" | |
停車場的停車區域 | |
Args: | |
detected_objects: 檢測到的物件列表 | |
Returns: | |
停車區域字典 | |
""" | |
try: | |
zones = {} | |
# 尋找停放的汽車 | |
car_objs = [obj for obj in detected_objects if obj["class_id"] == 2] # cars | |
if len(car_objs) >= 3: | |
# 檢查汽車是否按模式排列 | |
car_positions = [obj["normalized_center"] for obj in car_objs] | |
# 通過分析垂直位置檢查行模式 | |
y_coords = [pos[1] for pos in car_positions] | |
y_clusters = {} | |
# 按相似y坐標分組汽車 | |
for i, y in enumerate(y_coords): | |
assigned = False | |
for cluster_y in y_clusters.keys(): | |
if abs(y - cluster_y) < 0.1: # 圖像高度的10%內 | |
y_clusters[cluster_y].append(i) | |
assigned = True | |
break | |
if not assigned: | |
y_clusters[y] = [i] | |
# 如果有行模式 | |
if max(len(indices) for indices in y_clusters.values()) >= 2: | |
zones["parking_row"] = { | |
"region": "central", | |
"objects": ["car"] * len(car_objs), | |
"description": f"Organized parking area with vehicles arranged in rows" | |
} | |
else: | |
zones["parking_area"] = { | |
"region": "wide", | |
"objects": ["car"] * len(car_objs), | |
"description": f"Parking area with {len(car_objs)} vehicles" | |
} | |
return zones | |
except Exception as e: | |
logger.error(f"Error identifying parking zones: {str(e)}") | |
logger.error(traceback.format_exc()) | |
return {} | |
def _get_directional_description_local(self, region: str) -> str: | |
""" | |
本地方向描述方法 | |
將區域名稱轉換為方位描述(東西南北) | |
Args: | |
region: 區域名稱 | |
Returns: | |
方位描述字串 | |
""" | |
try: | |
region_lower = region.lower() | |
if "top" in region_lower and "left" in region_lower: | |
return "northwest" | |
elif "top" in region_lower and "right" in region_lower: | |
return "northeast" | |
elif "bottom" in region_lower and "left" in region_lower: | |
return "southwest" | |
elif "bottom" in region_lower and "right" in region_lower: | |
return "southeast" | |
elif "top" in region_lower: | |
return "north" | |
elif "bottom" in region_lower: | |
return "south" | |
elif "left" in region_lower: | |
return "west" | |
elif "right" in region_lower: | |
return "east" | |
else: | |
return "central" | |
except Exception as e: | |
logger.error(f"Error getting directional description for region '{region}': {str(e)}") | |
return "central" | |