Spaces:
Sleeping
Sleeping
AUTO-FRANCE-PARTS
refactor: Update nearby parcel retrieval to return structured response and improve error handling
e6ce7f9
from shapely.geometry import Point | |
from shapely.ops import transform | |
from pyproj import Transformer | |
from typing import List, Set | |
import math | |
import asyncio | |
class GetNearbyParcelsBySamplingUseCase: | |
def __init__(self, parcel_repository, buffer_radius_m: float, step_m: float = 100): | |
self.repo = parcel_repository | |
self.radius = buffer_radius_m | |
self.step = step_m # espacement des points (densité) | |
self.to_projected = Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True).transform | |
self.to_geographic = Transformer.from_crs("EPSG:3857", "EPSG:4326", always_xy=True).transform | |
async def __call__(self, lat: float, lon: float) -> dict: | |
center = transform(self.to_projected, Point(lon, lat)) | |
sampled_points = self._generate_sampling_points(center) | |
seen_ids: Set[str] = set() | |
nearby_parcels: List[dict] = [] | |
# Traitement asynchrone en lot pour optimiser les performances | |
tasks = [] | |
for pt in sampled_points: | |
lon_, lat_ = transform(self.to_geographic, pt).coords[0] | |
tasks.append(self.repo.get_parcel_from_point(lat_, lon_)) | |
print(f"len(tasks): {len(tasks)}") | |
# Exécution parallèle des requêtes | |
parcel_results = await asyncio.gather(*tasks, return_exceptions=True) | |
for parcel in parcel_results: | |
if isinstance(parcel, Exception): | |
continue # Ignorer les erreurs et continuer | |
if parcel: | |
parcel_id = parcel.get("cadastre", {}).get("id", {}).get("value") | |
if parcel_id not in seen_ids: | |
nearby_parcels.append(parcel) | |
seen_ids.add(parcel_id) | |
return { | |
"parcels": nearby_parcels, | |
"number_of_points": len(sampled_points) | |
} | |
def _generate_sampling_points(self, center: Point) -> List[Point]: | |
points = [] | |
step = self.step | |
radius = self.radius | |
min_x, min_y = center.x - radius, center.y - radius | |
max_x, max_y = center.x + radius, center.y + radius | |
for x in range(int(min_x), int(max_x), int(step)): | |
for y in range(int(min_y), int(max_y), int(step)): | |
pt = Point(x, y) | |
if pt.distance(center) <= radius: | |
points.append(pt) | |
return points | |