import logging import traceback import numpy as np from typing import Dict, List, Any, Optional logger = logging.getLogger(__name__) class PatternAnalyzer: """ 負責各種模式分析,包含交通流動、行人穿越、車輛分佈等的辨識 專門處理動態區域和移動相關的區域分析 """ def __init__(self): """初始化模式分析器""" try: logger.info("PatternAnalyzer initialized successfully") except Exception as e: logger.error(f"Failed to initialize PatternAnalyzer: {str(e)}") logger.error(traceback.format_exc()) raise def analyze_crossing_patterns(self, pedestrians: List[Dict], traffic_lights: List[Dict]) -> Dict: """ Analyze pedestrian crossing patterns to identify crossing zones. 若同一 region 中同時有行人與紅綠燈,則將兩者都放入該區域的 objects。 Args: pedestrians: 行人物件列表(每個 obj 應包含 'class_id', 'region', 'confidence' 等) traffic_lights: 紅綠燈物件列表(每個 obj 應包含 'class_id', 'region', 'confidence' 等) Returns: crossing_zones: 字典,key 為 zone 名稱,value 包含 'region', 'objects', 'description' """ try: crossing_zones = {} # 如果沒有任何行人,就不辨識任何 crossing zone if not pedestrians: return crossing_zones # (1) 按照 region 分組行人 pedestrian_regions = {} for p in pedestrians: region = p["region"] pedestrian_regions.setdefault(region, []).append(p) # (2) 針對每個 region,看是否同時有紅綠燈 # 建立一個對照表 mapping: region -> { "pedestrians": [...], "traffic_lights": [...] } combined_regions = {} for region, peds in pedestrian_regions.items(): # 取得該 region 下所有紅綠燈 tls_in_region = [t for t in traffic_lights if t["region"] == region] combined_regions[region] = { "pedestrians": peds, "traffic_lights": tls_in_region } # (3) 按照行人數量排序,找出前兩個需要建立 crossing zone 的 region sorted_regions = sorted( combined_regions.items(), key=lambda x: len(x[1]["pedestrians"]), reverse=True ) # (4) 將前兩個 region 建立 Crossing Zone,objects 同時包含行人與紅綠燈 for idx, (region, group) in enumerate(sorted_regions[:2]): peds = group["pedestrians"] tls = group["traffic_lights"] has_nearby_signals = len(tls) > 0 # 生成 zone_name(基於 region 方向 + idx 決定主/次 crossing) direction = self._get_directional_description_local(region) if direction and direction != "central": zone_name = f"{direction} crossing area" else: zone_name = "main crossing area" if idx == 0 else "secondary crossing area" # 組合 description description = f"Pedestrian crossing area with {len(peds)} " description += "person" if len(peds) == 1 else "people" if direction: description += f" in {direction} direction" if has_nearby_signals: description += " near traffic signals" # 將行人 + 同區紅綠燈一併放入 objects obj_list = ["pedestrian"] * len(peds) if has_nearby_signals: obj_list += ["traffic light"] * len(tls) crossing_zones[zone_name] = { "region": region, "objects": obj_list, "description": description } return crossing_zones except Exception as e: logger.error(f"Error in analyze_crossing_patterns: {str(e)}") logger.error(traceback.format_exc()) return {} def analyze_traffic_zones(self, vehicles: List[Dict]) -> Dict: """ 分析車輛分布以識別具有方向感知的交通區域 Args: vehicles: 車輛物件列表 Returns: 識別出的交通區域字典 """ try: traffic_zones = {} if not vehicles: return traffic_zones # 按區域分組車輛 vehicle_regions = {} for v in vehicles: region = v["region"] if region not in vehicle_regions: vehicle_regions[region] = [] vehicle_regions[region].append(v) # 為有車輛的區域創建交通區域 main_traffic_region = max(vehicle_regions.items(), key=lambda x: len(x[1]), default=(None, [])) if main_traffic_region[0] is not None: region = main_traffic_region[0] vehicles_in_region = main_traffic_region[1] # 獲取車輛類型列表用於描述 vehicle_types = [v["class_name"] for v in vehicles_in_region] unique_types = list(set(vehicle_types)) # 獲取方向描述 direction = self._get_directional_description_local(region) # 創建描述性區域 traffic_zones["vehicle_zone"] = { "region": region, "objects": vehicle_types, "description": f"Vehicle traffic area with {', '.join(unique_types[:3])}" + (f" in {direction} area" if direction else "") } # 如果車輛分布在多個區域,創建次要區域 if len(vehicle_regions) > 1: # 獲取第二大車輛聚集區域 sorted_regions = sorted(vehicle_regions.items(), key=lambda x: len(x[1]), reverse=True) if len(sorted_regions) > 1: second_region, second_vehicles = sorted_regions[1] direction = self._get_directional_description_local(second_region) vehicle_types = [v["class_name"] for v in second_vehicles] unique_types = list(set(vehicle_types)) traffic_zones["secondary_vehicle_zone"] = { "region": second_region, "objects": vehicle_types, "description": f"Secondary traffic area with {', '.join(unique_types[:2])}" + (f" in {direction} direction" if direction else "") } return traffic_zones except Exception as e: logger.error(f"Error analyzing traffic zones: {str(e)}") logger.error(traceback.format_exc()) return {} def analyze_aerial_traffic_patterns(self, vehicle_objs: List[Dict]) -> Dict: """ 分析空中視角的車輛交通模式 Args: vehicle_objs: 車輛物件列表 Returns: 交通模式區域字典 """ try: zones = {} if not vehicle_objs: return zones # 將位置轉換為數組進行模式分析 positions = np.array([obj["normalized_center"] for obj in vehicle_objs]) if len(positions) >= 2: # 計算分布指標 x_coords = positions[:, 0] y_coords = positions[:, 1] x_mean = np.mean(x_coords) y_mean = np.mean(y_coords) x_std = np.std(x_coords) y_std = np.std(y_coords) # 判斷車輛是否組織成車道 if x_std < y_std * 0.5: # 車輛垂直對齊 - 代表南北交通 zones["vertical_traffic_flow"] = { "region": "central_vertical", "objects": [obj["class_name"] for obj in vehicle_objs[:5]], "description": "North-south traffic flow visible from aerial view" } elif y_std < x_std * 0.5: # 車輛水平對齊 - 代表東西交通 zones["horizontal_traffic_flow"] = { "region": "central_horizontal", "objects": [obj["class_name"] for obj in vehicle_objs[:5]], "description": "East-west traffic flow visible from aerial view" } else: # 車輛多方向 - 代表十字路口 zones["intersection_traffic"] = { "region": "central", "objects": [obj["class_name"] for obj in vehicle_objs[:5]], "description": "Multi-directional traffic at intersection visible from aerial view" } return zones except Exception as e: logger.error(f"Error analyzing aerial traffic patterns: {str(e)}") logger.error(traceback.format_exc()) return {} def identify_park_recreational_zones(self, detected_objects: List[Dict]) -> Dict: """ 識別公園的休閒活動區域 Args: detected_objects: 檢測到的物件列表 Returns: 休閒區域字典 """ try: zones = {} # 尋找休閒物件(運動球、風箏等) rec_items = [] rec_regions = {} for obj in detected_objects: if obj["class_id"] in [32, 33, 34, 35, 38]: # sports ball, kite, baseball bat, glove, tennis racket region = obj["region"] if region not in rec_regions: rec_regions[region] = [] rec_regions[region].append(obj) rec_items.append(obj["class_name"]) if rec_items: main_rec_region = max(rec_regions.items(), key=lambda x: len(x[1]), default=(None, [])) if main_rec_region[0] is not None: zones["recreational_zone"] = { "region": main_rec_region[0], "objects": list(set(rec_items)), "description": f"Recreational area with {', '.join(list(set(rec_items)))}" } return zones except Exception as e: logger.error(f"Error identifying park recreational zones: {str(e)}") logger.error(traceback.format_exc()) return {} def identify_parking_zones(self, detected_objects: List[Dict]) -> Dict: """ 停車場的停車區域 Args: detected_objects: 檢測到的物件列表 Returns: 停車區域字典 """ try: zones = {} # 尋找停放的汽車 car_objs = [obj for obj in detected_objects if obj["class_id"] == 2] # cars if len(car_objs) >= 3: # 檢查汽車是否按模式排列 car_positions = [obj["normalized_center"] for obj in car_objs] # 通過分析垂直位置檢查行模式 y_coords = [pos[1] for pos in car_positions] y_clusters = {} # 按相似y坐標分組汽車 for i, y in enumerate(y_coords): assigned = False for cluster_y in y_clusters.keys(): if abs(y - cluster_y) < 0.1: # 圖像高度的10%內 y_clusters[cluster_y].append(i) assigned = True break if not assigned: y_clusters[y] = [i] # 如果有行模式 if max(len(indices) for indices in y_clusters.values()) >= 2: zones["parking_row"] = { "region": "central", "objects": ["car"] * len(car_objs), "description": f"Organized parking area with vehicles arranged in rows" } else: zones["parking_area"] = { "region": "wide", "objects": ["car"] * len(car_objs), "description": f"Parking area with {len(car_objs)} vehicles" } return zones except Exception as e: logger.error(f"Error identifying parking zones: {str(e)}") logger.error(traceback.format_exc()) return {} def _get_directional_description_local(self, region: str) -> str: """ 本地方向描述方法 將區域名稱轉換為方位描述(東西南北) Args: region: 區域名稱 Returns: 方位描述字串 """ try: region_lower = region.lower() if "top" in region_lower and "left" in region_lower: return "northwest" elif "top" in region_lower and "right" in region_lower: return "northeast" elif "bottom" in region_lower and "left" in region_lower: return "southwest" elif "bottom" in region_lower and "right" in region_lower: return "southeast" elif "top" in region_lower: return "north" elif "bottom" in region_lower: return "south" elif "left" in region_lower: return "west" elif "right" in region_lower: return "east" else: return "central" except Exception as e: logger.error(f"Error getting directional description for region '{region}': {str(e)}") return "central"