import numpy as np from sklearn.neighbors import NearestNeighbors from sklearn.preprocessing import StandardScaler from sklearn.cluster import DBSCAN import pandas as pd from typing import List, Dict, Any, Tuple import streamlit as st class NearestNeighborGrouping: def __init__(self): self.scaler = StandardScaler() self.feature_weights = { 'depth_mid': 0.05, # Depth position (less important for similarity) 'thickness': 0.05, # Layer thickness (less important) 'soil_type_encoded': 0.35, # Soil type (most important) 'consistency_encoded': 0.30, # Consistency/density (very important) 'strength_value': 0.15, # Strength parameter 'moisture_encoded': 0.05, # Moisture content 'color_encoded': 0.05 # Color } def encode_categorical_features(self, layers: List[Dict]) -> pd.DataFrame: """Convert categorical features to numerical for clustering""" # Create DataFrame from layers df_data = [] for i, layer in enumerate(layers): layer_data = { 'layer_index': i, 'layer_id': layer.get('layer_id', i+1), 'depth_from': layer.get('depth_from', 0), 'depth_to': layer.get('depth_to', 0), 'depth_mid': (layer.get('depth_from', 0) + layer.get('depth_to', 0)) / 2, 'thickness': layer.get('depth_to', 0) - layer.get('depth_from', 0), 'soil_type': layer.get('soil_type', 'unknown').lower(), 'consistency': layer.get('consistency', 'unknown').lower(), 'strength_value': layer.get('strength_value', 0) or layer.get('calculated_su', 0) or 0, 'moisture': layer.get('moisture', 'unknown').lower(), 'color': layer.get('color', 'unknown').lower(), 'description': layer.get('description', '') } df_data.append(layer_data) df = pd.DataFrame(df_data) # Encode soil types soil_type_mapping = { 'clay': 1, 'silt': 2, 'sand': 3, 'gravel': 4, 'rock': 5, 'unknown': 0 } df['soil_type_encoded'] = df['soil_type'].map(soil_type_mapping).fillna(0) # Encode consistency/density consistency_mapping = { 'very soft': 1, 'soft': 2, 'medium': 3, 'stiff': 4, 'very stiff': 5, 'hard': 6, 'very loose': 1, 'loose': 2, 'medium dense': 3, 'dense': 4, 'very dense': 5, 'unknown': 0 } df['consistency_encoded'] = df['consistency'].map(consistency_mapping).fillna(0) # Encode moisture moisture_mapping = { 'dry': 1, 'moist': 2, 'wet': 3, 'saturated': 4, 'unknown': 0 } df['moisture_encoded'] = df['moisture'].map(moisture_mapping).fillna(0) # Encode colors (simplified) color_mapping = { 'brown': 1, 'gray': 2, 'black': 3, 'red': 4, 'yellow': 5, 'white': 6, 'unknown': 0 } df['color_encoded'] = df['color'].map(color_mapping).fillna(0) return df def calculate_layer_similarity(self, df: pd.DataFrame) -> np.ndarray: """Calculate similarity matrix between layers using weighted features""" # Select features for similarity calculation feature_columns = [ 'depth_mid', 'thickness', 'soil_type_encoded', 'consistency_encoded', 'strength_value', 'moisture_encoded', 'color_encoded' ] # Prepare feature matrix features = df[feature_columns].copy() # Handle missing values features = features.fillna(0) # Apply feature weights for col in feature_columns: if col in self.feature_weights: features[col] = features[col] * self.feature_weights[col] # Standardize features features_scaled = self.scaler.fit_transform(features) # Calculate similarity matrix (using negative euclidean distance) from sklearn.metrics.pairwise import euclidean_distances distance_matrix = euclidean_distances(features_scaled) similarity_matrix = 1 / (1 + distance_matrix) # Convert distance to similarity return similarity_matrix, features_scaled def find_nearest_neighbors(self, df: pd.DataFrame, k: int = 3) -> List[Dict]: """Find k nearest neighbors for each soil layer""" similarity_matrix, features_scaled = self.calculate_layer_similarity(df) # Use NearestNeighbors to find k nearest neighbors nn_model = NearestNeighbors(n_neighbors=min(k+1, len(df)), metric='euclidean') nn_model.fit(features_scaled) distances, indices = nn_model.kneighbors(features_scaled) nearest_neighbors = [] for i, (layer_distances, layer_indices) in enumerate(zip(distances, indices)): neighbors = [] for j, (dist, idx) in enumerate(zip(layer_distances[1:], layer_indices[1:])): # Skip self neighbor_info = { 'neighbor_index': int(idx), 'neighbor_id': df.iloc[idx]['layer_id'], 'distance': float(dist), 'similarity_score': float(similarity_matrix[i, idx]), 'soil_type': df.iloc[idx]['soil_type'], 'consistency': df.iloc[idx]['consistency'], 'depth_range': f"{df.iloc[idx]['depth_from']:.1f}-{df.iloc[idx]['depth_to']:.1f}m" } neighbors.append(neighbor_info) layer_nn = { 'layer_index': i, 'layer_id': df.iloc[i]['layer_id'], 'soil_type': df.iloc[i]['soil_type'], 'consistency': df.iloc[i]['consistency'], 'depth_range': f"{df.iloc[i]['depth_from']:.1f}-{df.iloc[i]['depth_to']:.1f}m", 'nearest_neighbors': neighbors } nearest_neighbors.append(layer_nn) return nearest_neighbors def group_similar_layers(self, df: pd.DataFrame, similarity_threshold: float = 0.7) -> List[List[int]]: """Group layers using DBSCAN clustering based on similarity""" similarity_matrix, features_scaled = self.calculate_layer_similarity(df) # Convert similarity to distance for DBSCAN distance_matrix = 1 - similarity_matrix # Use DBSCAN for clustering eps = 1 - similarity_threshold # Convert similarity threshold to distance clustering = DBSCAN(eps=eps, min_samples=1, metric='precomputed') cluster_labels = clustering.fit_predict(distance_matrix) # Group layers by cluster clusters = {} for i, label in enumerate(cluster_labels): if label not in clusters: clusters[label] = [] clusters[label].append(i) # Convert to list of groups, filter out single-layer groups layer_groups = [] for cluster_id, layer_indices in clusters.items(): if len(layer_indices) > 1: # Only groups with multiple layers layer_groups.append(layer_indices) return layer_groups, cluster_labels def analyze_group_properties(self, df: pd.DataFrame, group_indices: List[int]) -> Dict: """Analyze properties of a group of similar layers""" group_layers = df.iloc[group_indices] analysis = { 'group_size': len(group_indices), 'depth_range': { 'min': group_layers['depth_from'].min(), 'max': group_layers['depth_to'].max(), 'total_thickness': group_layers['thickness'].sum() }, 'soil_types': group_layers['soil_type'].value_counts().to_dict(), 'consistencies': group_layers['consistency'].value_counts().to_dict(), 'strength_stats': { 'mean': group_layers['strength_value'].mean(), 'min': group_layers['strength_value'].min(), 'max': group_layers['strength_value'].max(), 'std': group_layers['strength_value'].std() }, 'layer_ids': group_layers['layer_id'].tolist(), 'depth_ranges': [f"{row['depth_from']:.1f}-{row['depth_to']:.1f}m" for _, row in group_layers.iterrows()] } return analysis def suggest_layer_merging(self, layers: List[Dict], similarity_threshold: float = 0.8) -> Dict: """Suggest which layers should be merged based on nearest neighbor analysis""" if len(layers) < 2: return {"groups": [], "recommendations": []} # Encode features df = self.encode_categorical_features(layers) # Find similar layer groups layer_groups, cluster_labels = self.group_similar_layers(df, similarity_threshold) # Analyze each group group_analyses = [] recommendations = [] for i, group_indices in enumerate(layer_groups): group_analysis = self.analyze_group_properties(df, group_indices) group_analysis['group_id'] = i + 1 group_analyses.append(group_analysis) # Check if layers are adjacent or close group_df = df.iloc[group_indices].sort_values('depth_from') is_adjacent = self._check_adjacency(group_df) if is_adjacent: dominant_soil_type = max(group_analysis['soil_types'].items(), key=lambda x: x[1])[0] dominant_consistency = max(group_analysis['consistencies'].items(), key=lambda x: x[1])[0] recommendation = { 'group_id': i + 1, 'action': 'merge', 'reason': f'Similar {dominant_consistency} {dominant_soil_type} layers in adjacent depths', 'layer_ids': group_analysis['layer_ids'], 'depth_ranges': group_analysis['depth_ranges'], 'merged_properties': { 'soil_type': dominant_soil_type, 'consistency': dominant_consistency, 'depth_from': group_analysis['depth_range']['min'], 'depth_to': group_analysis['depth_range']['max'], 'thickness': group_analysis['depth_range']['total_thickness'], 'avg_strength': group_analysis['strength_stats']['mean'] } } recommendations.append(recommendation) return { 'groups': group_analyses, 'recommendations': recommendations, 'cluster_labels': cluster_labels.tolist() } def _check_adjacency(self, group_df: pd.DataFrame, max_gap: float = 0.5) -> bool: """Check if layers in group are adjacent or nearly adjacent""" if len(group_df) <= 1: return True # Sort by depth sorted_df = group_df.sort_values('depth_from') # Check gaps between consecutive layers for i in range(len(sorted_df) - 1): current_end = sorted_df.iloc[i]['depth_to'] next_start = sorted_df.iloc[i + 1]['depth_from'] gap = next_start - current_end if gap > max_gap: return False return True def get_layer_neighbors_report(self, layers: List[Dict], k: int = 3) -> str: """Generate a detailed report of nearest neighbors for each layer""" if len(layers) < 2: return "Insufficient layers for neighbor analysis." df = self.encode_categorical_features(layers) nearest_neighbors = self.find_nearest_neighbors(df, k) report_lines = [ "NEAREST NEIGHBOR ANALYSIS REPORT", "=" * 50, "" ] for layer_info in nearest_neighbors: report_lines.append(f"Layer {layer_info['layer_id']}: {layer_info['consistency']} {layer_info['soil_type']} ({layer_info['depth_range']})") report_lines.append(" Nearest Neighbors:") for i, neighbor in enumerate(layer_info['nearest_neighbors'][:k], 1): similarity_pct = neighbor['similarity_score'] * 100 report_lines.append( f" {i}. Layer {neighbor['neighbor_id']}: {neighbor['consistency']} {neighbor['soil_type']} " f"({neighbor['depth_range']}) - Similarity: {similarity_pct:.1f}%" ) report_lines.append("") return "\n".join(report_lines)