|
import warnings |
|
|
|
import pandas as pd |
|
from geopy.distance import geodesic |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_distances( |
|
df1: pd.DataFrame, |
|
df2: pd.DataFrame, |
|
code_col1: str, |
|
lat_col1: str, |
|
long_col1: str, |
|
code_col2: str, |
|
lat_col2: str, |
|
long_col2: str, |
|
min_distance: float = 1.0, |
|
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: |
|
""" |
|
Calculate distances between points in two datasets and find closest matches. |
|
|
|
Args: |
|
df1: First DataFrame containing reference points |
|
df2: Second DataFrame containing points to compare |
|
code_col1: Column name in df1 containing point identifiers |
|
lat_col1: Column name in df1 containing latitude |
|
long_col1: Column name in df1 containing longitude |
|
code_col2: Column name in df2 containing point identifiers |
|
lat_col2: Column name in df2 containing latitude |
|
long_col2: Column name in df2 containing longitude |
|
min_distance: Minimum distance threshold in kilometers |
|
|
|
Returns: |
|
tuple: (all_distances, closest_matches, matches_below_threshold) |
|
""" |
|
|
|
required_cols_1 = {code_col1, lat_col1, long_col1} |
|
required_cols_2 = {code_col2, lat_col2, long_col2} |
|
|
|
if not required_cols_1.issubset(df1.columns): |
|
raise ValueError( |
|
f"df1 is missing required columns: {required_cols_1 - set(df1.columns)}" |
|
) |
|
if not required_cols_2.issubset(df2.columns): |
|
raise ValueError( |
|
f"df2 is missing required columns: {required_cols_2 - set(df2.columns)}" |
|
) |
|
|
|
|
|
coords1 = df1[[lat_col1, long_col1]].apply(tuple, axis=1).tolist() |
|
coords2 = df2[[lat_col2, long_col2]].apply(tuple, axis=1).tolist() |
|
|
|
|
|
distances = [] |
|
for i, coord1 in enumerate(coords1): |
|
for j, coord2 in enumerate(coords2): |
|
try: |
|
distance_km = geodesic(coord1, coord2).kilometers |
|
distances.append( |
|
{ |
|
**df1.iloc[i].to_dict(), |
|
**{f"{col}_Dataset2": df2.iloc[j][col] for col in df2.columns}, |
|
"Distance_km": distance_km, |
|
} |
|
) |
|
except ValueError as e: |
|
warnings.warn( |
|
f"Skipping invalid coordinates: {coord1} or {coord2}: {e}" |
|
) |
|
continue |
|
|
|
if not distances: |
|
raise ValueError("No valid coordinate pairs were processed") |
|
|
|
df_distances = pd.DataFrame(distances) |
|
|
|
|
|
df_closest = df_distances.loc[ |
|
df_distances.groupby(code_col1)["Distance_km"].idxmin() |
|
] |
|
|
|
|
|
df_closest_min_distance = df_distances[df_distances["Distance_km"] < min_distance] |
|
|
|
return df_distances, df_closest, df_closest_min_distance |
|
|