db_query / utils /utils_functions.py
DavMelchi's picture
Improve multi distance calculator
588f1c0
import warnings
import pandas as pd
from geopy.distance import geodesic
# Function to calculate distances while preserving all original columns
# def calculate_distances(
# df1: pd.DataFrame,
# df2: pd.DataFrame,
# code_col1,
# lat_col1,
# long_col1,
# code_col2,
# lat_col2,
# long_col2,
# min_distance: int = 1,
# ):
# distances = []
# for _, row1 in df1.iterrows():
# for _, row2 in df2.iterrows():
# coord1 = (row1[lat_col1], row1[long_col1])
# coord2 = (row2[lat_col2], row2[long_col2])
# distance_km = geodesic(coord1, coord2).kilometers # Compute distance
# # Combine all original columns + distance
# combined_row = {
# **row1.to_dict(), # Keep all columns from Dataset1
# **{
# f"{col}_Dataset2": row2[col] for col in df2.columns
# }, # Keep all columns from Dataset2
# "Distance_km": distance_km,
# }
# distances.append(combined_row)
# df_distances = pd.DataFrame(distances)
# # Find the closest point for each Point1
# df_closest: pd.DataFrame = df_distances.loc[
# df_distances.groupby(code_col1)["Distance_km"].idxmin()
# ]
# # Find the distnce below min_distance
# df_closest_min_distance = df_distances[df_distances["Distance_km"] < min_distance]
# return df_distances, df_closest, df_closest_min_distance
def calculate_distances(
df1: pd.DataFrame,
df2: pd.DataFrame,
code_col1: str,
lat_col1: str,
long_col1: str,
code_col2: str,
lat_col2: str,
long_col2: str,
min_distance: float = 1.0,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Calculate distances between points in two datasets and find closest matches.
Args:
df1: First DataFrame containing reference points
df2: Second DataFrame containing points to compare
code_col1: Column name in df1 containing point identifiers
lat_col1: Column name in df1 containing latitude
long_col1: Column name in df1 containing longitude
code_col2: Column name in df2 containing point identifiers
lat_col2: Column name in df2 containing latitude
long_col2: Column name in df2 containing longitude
min_distance: Minimum distance threshold in kilometers
Returns:
tuple: (all_distances, closest_matches, matches_below_threshold)
"""
# Validate input columns
required_cols_1 = {code_col1, lat_col1, long_col1}
required_cols_2 = {code_col2, lat_col2, long_col2}
if not required_cols_1.issubset(df1.columns):
raise ValueError(
f"df1 is missing required columns: {required_cols_1 - set(df1.columns)}"
)
if not required_cols_2.issubset(df2.columns):
raise ValueError(
f"df2 is missing required columns: {required_cols_2 - set(df2.columns)}"
)
# Convert to list of tuples for vectorized operations
coords1 = df1[[lat_col1, long_col1]].apply(tuple, axis=1).tolist()
coords2 = df2[[lat_col2, long_col2]].apply(tuple, axis=1).tolist()
# Calculate all pairwise distances
distances = []
for i, coord1 in enumerate(coords1):
for j, coord2 in enumerate(coords2):
try:
distance_km = geodesic(coord1, coord2).kilometers
distances.append(
{
**df1.iloc[i].to_dict(),
**{f"{col}_Dataset2": df2.iloc[j][col] for col in df2.columns},
"Distance_km": distance_km,
}
)
except ValueError as e:
warnings.warn(
f"Skipping invalid coordinates: {coord1} or {coord2}: {e}"
)
continue
if not distances:
raise ValueError("No valid coordinate pairs were processed")
df_distances = pd.DataFrame(distances)
# Find closest matches
df_closest = df_distances.loc[
df_distances.groupby(code_col1)["Distance_km"].idxmin()
]
# Filter by minimum distance
df_closest_min_distance = df_distances[df_distances["Distance_km"] < min_distance]
return df_distances, df_closest, df_closest_min_distance