VisionScout / pattern_analyzer.py
DawnC's picture
Upload 14 files
12d9ea9 verified
import logging
import traceback
import numpy as np
from typing import Dict, List, Any, Optional
logger = logging.getLogger(__name__)
class PatternAnalyzer:
"""
負責各種模式分析,包含交通流動、行人穿越、車輛分佈等的辨識
專門處理動態區域和移動相關的區域分析
"""
def __init__(self):
"""初始化模式分析器"""
try:
logger.info("PatternAnalyzer initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize PatternAnalyzer: {str(e)}")
logger.error(traceback.format_exc())
raise
def analyze_crossing_patterns(self, pedestrians: List[Dict], traffic_lights: List[Dict]) -> Dict:
"""
Analyze pedestrian crossing patterns to identify crossing zones.
若同一 region 中同時有行人與紅綠燈,則將兩者都放入該區域的 objects。
Args:
pedestrians: 行人物件列表(每個 obj 應包含 'class_id', 'region', 'confidence' 等)
traffic_lights: 紅綠燈物件列表(每個 obj 應包含 'class_id', 'region', 'confidence' 等)
Returns:
crossing_zones: 字典,key 為 zone 名稱,value 包含 'region', 'objects', 'description'
"""
try:
crossing_zones = {}
# 如果沒有任何行人,就不辨識任何 crossing zone
if not pedestrians:
return crossing_zones
# (1) 按照 region 分組行人
pedestrian_regions = {}
for p in pedestrians:
region = p["region"]
pedestrian_regions.setdefault(region, []).append(p)
# (2) 針對每個 region,看是否同時有紅綠燈
# 建立一個對照表 mapping: region -> { "pedestrians": [...], "traffic_lights": [...] }
combined_regions = {}
for region, peds in pedestrian_regions.items():
# 取得該 region 下所有紅綠燈
tls_in_region = [t for t in traffic_lights if t["region"] == region]
combined_regions[region] = {
"pedestrians": peds,
"traffic_lights": tls_in_region
}
# (3) 按照行人數量排序,找出前兩個需要建立 crossing zone 的 region
sorted_regions = sorted(
combined_regions.items(),
key=lambda x: len(x[1]["pedestrians"]),
reverse=True
)
# (4) 將前兩個 region 建立 Crossing Zone,objects 同時包含行人與紅綠燈
for idx, (region, group) in enumerate(sorted_regions[:2]):
peds = group["pedestrians"]
tls = group["traffic_lights"]
has_nearby_signals = len(tls) > 0
# 生成 zone_name(基於 region 方向 + idx 決定主/次 crossing)
direction = self._get_directional_description_local(region)
if direction and direction != "central":
zone_name = f"{direction} crossing area"
else:
zone_name = "main crossing area" if idx == 0 else "secondary crossing area"
# 組合 description
description = f"Pedestrian crossing area with {len(peds)} "
description += "person" if len(peds) == 1 else "people"
if direction:
description += f" in {direction} direction"
if has_nearby_signals:
description += " near traffic signals"
# 將行人 + 同區紅綠燈一併放入 objects
obj_list = ["pedestrian"] * len(peds)
if has_nearby_signals:
obj_list += ["traffic light"] * len(tls)
crossing_zones[zone_name] = {
"region": region,
"objects": obj_list,
"description": description
}
return crossing_zones
except Exception as e:
logger.error(f"Error in analyze_crossing_patterns: {str(e)}")
logger.error(traceback.format_exc())
return {}
def analyze_traffic_zones(self, vehicles: List[Dict]) -> Dict:
"""
分析車輛分布以識別具有方向感知的交通區域
Args:
vehicles: 車輛物件列表
Returns:
識別出的交通區域字典
"""
try:
traffic_zones = {}
if not vehicles:
return traffic_zones
# 按區域分組車輛
vehicle_regions = {}
for v in vehicles:
region = v["region"]
if region not in vehicle_regions:
vehicle_regions[region] = []
vehicle_regions[region].append(v)
# 為有車輛的區域創建交通區域
main_traffic_region = max(vehicle_regions.items(), key=lambda x: len(x[1]), default=(None, []))
if main_traffic_region[0] is not None:
region = main_traffic_region[0]
vehicles_in_region = main_traffic_region[1]
# 獲取車輛類型列表用於描述
vehicle_types = [v["class_name"] for v in vehicles_in_region]
unique_types = list(set(vehicle_types))
# 獲取方向描述
direction = self._get_directional_description_local(region)
# 創建描述性區域
traffic_zones["vehicle_zone"] = {
"region": region,
"objects": vehicle_types,
"description": f"Vehicle traffic area with {', '.join(unique_types[:3])}" +
(f" in {direction} area" if direction else "")
}
# 如果車輛分布在多個區域,創建次要區域
if len(vehicle_regions) > 1:
# 獲取第二大車輛聚集區域
sorted_regions = sorted(vehicle_regions.items(), key=lambda x: len(x[1]), reverse=True)
if len(sorted_regions) > 1:
second_region, second_vehicles = sorted_regions[1]
direction = self._get_directional_description_local(second_region)
vehicle_types = [v["class_name"] for v in second_vehicles]
unique_types = list(set(vehicle_types))
traffic_zones["secondary_vehicle_zone"] = {
"region": second_region,
"objects": vehicle_types,
"description": f"Secondary traffic area with {', '.join(unique_types[:2])}" +
(f" in {direction} direction" if direction else "")
}
return traffic_zones
except Exception as e:
logger.error(f"Error analyzing traffic zones: {str(e)}")
logger.error(traceback.format_exc())
return {}
def analyze_aerial_traffic_patterns(self, vehicle_objs: List[Dict]) -> Dict:
"""
分析空中視角的車輛交通模式
Args:
vehicle_objs: 車輛物件列表
Returns:
交通模式區域字典
"""
try:
zones = {}
if not vehicle_objs:
return zones
# 將位置轉換為數組進行模式分析
positions = np.array([obj["normalized_center"] for obj in vehicle_objs])
if len(positions) >= 2:
# 計算分布指標
x_coords = positions[:, 0]
y_coords = positions[:, 1]
x_mean = np.mean(x_coords)
y_mean = np.mean(y_coords)
x_std = np.std(x_coords)
y_std = np.std(y_coords)
# 判斷車輛是否組織成車道
if x_std < y_std * 0.5:
# 車輛垂直對齊 - 代表南北交通
zones["vertical_traffic_flow"] = {
"region": "central_vertical",
"objects": [obj["class_name"] for obj in vehicle_objs[:5]],
"description": "North-south traffic flow visible from aerial view"
}
elif y_std < x_std * 0.5:
# 車輛水平對齊 - 代表東西交通
zones["horizontal_traffic_flow"] = {
"region": "central_horizontal",
"objects": [obj["class_name"] for obj in vehicle_objs[:5]],
"description": "East-west traffic flow visible from aerial view"
}
else:
# 車輛多方向 - 代表十字路口
zones["intersection_traffic"] = {
"region": "central",
"objects": [obj["class_name"] for obj in vehicle_objs[:5]],
"description": "Multi-directional traffic at intersection visible from aerial view"
}
return zones
except Exception as e:
logger.error(f"Error analyzing aerial traffic patterns: {str(e)}")
logger.error(traceback.format_exc())
return {}
def identify_park_recreational_zones(self, detected_objects: List[Dict]) -> Dict:
"""
識別公園的休閒活動區域
Args:
detected_objects: 檢測到的物件列表
Returns:
休閒區域字典
"""
try:
zones = {}
# 尋找休閒物件(運動球、風箏等)
rec_items = []
rec_regions = {}
for obj in detected_objects:
if obj["class_id"] in [32, 33, 34, 35, 38]: # sports ball, kite, baseball bat, glove, tennis racket
region = obj["region"]
if region not in rec_regions:
rec_regions[region] = []
rec_regions[region].append(obj)
rec_items.append(obj["class_name"])
if rec_items:
main_rec_region = max(rec_regions.items(),
key=lambda x: len(x[1]),
default=(None, []))
if main_rec_region[0] is not None:
zones["recreational_zone"] = {
"region": main_rec_region[0],
"objects": list(set(rec_items)),
"description": f"Recreational area with {', '.join(list(set(rec_items)))}"
}
return zones
except Exception as e:
logger.error(f"Error identifying park recreational zones: {str(e)}")
logger.error(traceback.format_exc())
return {}
def identify_parking_zones(self, detected_objects: List[Dict]) -> Dict:
"""
停車場的停車區域
Args:
detected_objects: 檢測到的物件列表
Returns:
停車區域字典
"""
try:
zones = {}
# 尋找停放的汽車
car_objs = [obj for obj in detected_objects if obj["class_id"] == 2] # cars
if len(car_objs) >= 3:
# 檢查汽車是否按模式排列
car_positions = [obj["normalized_center"] for obj in car_objs]
# 通過分析垂直位置檢查行模式
y_coords = [pos[1] for pos in car_positions]
y_clusters = {}
# 按相似y坐標分組汽車
for i, y in enumerate(y_coords):
assigned = False
for cluster_y in y_clusters.keys():
if abs(y - cluster_y) < 0.1: # 圖像高度的10%內
y_clusters[cluster_y].append(i)
assigned = True
break
if not assigned:
y_clusters[y] = [i]
# 如果有行模式
if max(len(indices) for indices in y_clusters.values()) >= 2:
zones["parking_row"] = {
"region": "central",
"objects": ["car"] * len(car_objs),
"description": f"Organized parking area with vehicles arranged in rows"
}
else:
zones["parking_area"] = {
"region": "wide",
"objects": ["car"] * len(car_objs),
"description": f"Parking area with {len(car_objs)} vehicles"
}
return zones
except Exception as e:
logger.error(f"Error identifying parking zones: {str(e)}")
logger.error(traceback.format_exc())
return {}
def _get_directional_description_local(self, region: str) -> str:
"""
本地方向描述方法
將區域名稱轉換為方位描述(東西南北)
Args:
region: 區域名稱
Returns:
方位描述字串
"""
try:
region_lower = region.lower()
if "top" in region_lower and "left" in region_lower:
return "northwest"
elif "top" in region_lower and "right" in region_lower:
return "northeast"
elif "bottom" in region_lower and "left" in region_lower:
return "southwest"
elif "bottom" in region_lower and "right" in region_lower:
return "southeast"
elif "top" in region_lower:
return "north"
elif "bottom" in region_lower:
return "south"
elif "left" in region_lower:
return "west"
elif "right" in region_lower:
return "east"
else:
return "central"
except Exception as e:
logger.error(f"Error getting directional description for region '{region}': {str(e)}")
return "central"