import warnings import pandas as pd from geopy.distance import geodesic # Function to calculate distances while preserving all original columns # def calculate_distances( # df1: pd.DataFrame, # df2: pd.DataFrame, # code_col1, # lat_col1, # long_col1, # code_col2, # lat_col2, # long_col2, # min_distance: int = 1, # ): # distances = [] # for _, row1 in df1.iterrows(): # for _, row2 in df2.iterrows(): # coord1 = (row1[lat_col1], row1[long_col1]) # coord2 = (row2[lat_col2], row2[long_col2]) # distance_km = geodesic(coord1, coord2).kilometers # Compute distance # # Combine all original columns + distance # combined_row = { # **row1.to_dict(), # Keep all columns from Dataset1 # **{ # f"{col}_Dataset2": row2[col] for col in df2.columns # }, # Keep all columns from Dataset2 # "Distance_km": distance_km, # } # distances.append(combined_row) # df_distances = pd.DataFrame(distances) # # Find the closest point for each Point1 # df_closest: pd.DataFrame = df_distances.loc[ # df_distances.groupby(code_col1)["Distance_km"].idxmin() # ] # # Find the distnce below min_distance # df_closest_min_distance = df_distances[df_distances["Distance_km"] < min_distance] # return df_distances, df_closest, df_closest_min_distance def calculate_distances( df1: pd.DataFrame, df2: pd.DataFrame, code_col1: str, lat_col1: str, long_col1: str, code_col2: str, lat_col2: str, long_col2: str, min_distance: float = 1.0, ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ Calculate distances between points in two datasets and find closest matches. Args: df1: First DataFrame containing reference points df2: Second DataFrame containing points to compare code_col1: Column name in df1 containing point identifiers lat_col1: Column name in df1 containing latitude long_col1: Column name in df1 containing longitude code_col2: Column name in df2 containing point identifiers lat_col2: Column name in df2 containing latitude long_col2: Column name in df2 containing longitude min_distance: Minimum distance threshold in kilometers Returns: tuple: (all_distances, closest_matches, matches_below_threshold) """ # Validate input columns required_cols_1 = {code_col1, lat_col1, long_col1} required_cols_2 = {code_col2, lat_col2, long_col2} if not required_cols_1.issubset(df1.columns): raise ValueError( f"df1 is missing required columns: {required_cols_1 - set(df1.columns)}" ) if not required_cols_2.issubset(df2.columns): raise ValueError( f"df2 is missing required columns: {required_cols_2 - set(df2.columns)}" ) # Convert to list of tuples for vectorized operations coords1 = df1[[lat_col1, long_col1]].apply(tuple, axis=1).tolist() coords2 = df2[[lat_col2, long_col2]].apply(tuple, axis=1).tolist() # Calculate all pairwise distances distances = [] for i, coord1 in enumerate(coords1): for j, coord2 in enumerate(coords2): try: distance_km = geodesic(coord1, coord2).kilometers distances.append( { **df1.iloc[i].to_dict(), **{f"{col}_Dataset2": df2.iloc[j][col] for col in df2.columns}, "Distance_km": distance_km, } ) except ValueError as e: warnings.warn( f"Skipping invalid coordinates: {coord1} or {coord2}: {e}" ) continue if not distances: raise ValueError("No valid coordinate pairs were processed") df_distances = pd.DataFrame(distances) # Find closest matches df_closest = df_distances.loc[ df_distances.groupby(code_col1)["Distance_km"].idxmin() ] # Filter by minimum distance df_closest_min_distance = df_distances[df_distances["Distance_km"] < min_distance] return df_distances, df_closest, df_closest_min_distance