rajkhanke's picture
Create app.py
f51f3ca verified
from flask import Flask, render_template, request, Response
import requests
import datetime
from twilio.rest import Client # For Twilio integration
from geopy.geocoders import Photon
from geopy.exc import GeocoderTimedOut, GeocoderServiceError
app = Flask(__name__)
# Initialize Photon geocoder (no API key required)
photon_geolocator = Photon(user_agent="MyWeatherApp", timeout=10)
def parse_iso_datetime(timestr):
"""
Parse an ISO8601 datetime string (removing any trailing 'Z').
"""
if timestr.endswith("Z"):
timestr = timestr[:-1]
return datetime.datetime.fromisoformat(timestr)
def find_closest_hour_index(hour_times, current_time_str):
"""
Find the index in hour_times that is closest to the current_time_str.
"""
if not hour_times:
return None
dt_current = parse_iso_datetime(current_time_str)
min_diff = None
best_index = None
for i, ht in enumerate(hour_times):
dt_ht = parse_iso_datetime(ht)
diff = abs((dt_ht - dt_current).total_seconds())
if min_diff is None or diff < min_diff:
min_diff = diff
best_index = i
return best_index
def get_weather_icon(code):
"""Map the Open-Meteo weathercode to an emoji icon."""
if code == 0:
return "☀️" # Clear sky
elif code in [1, 2, 3]:
return "⛅"
elif code in [45, 48]:
return "🌫️"
elif code in [51, 53, 55]:
return "🌦️"
elif code in [56, 57]:
return "🌧️"
elif code in [61, 63, 65]:
return "🌧️"
elif code in [66, 67]:
return "🌧️"
elif code in [71, 73, 75, 77]:
return "❄️"
elif code in [80, 81, 82]:
return "🌦️"
elif code in [85, 86]:
return "❄️"
elif code in [95, 96, 99]:
return "⛈️"
else:
return "❓"
def get_weather_description(code):
"""Short textual description for the weathercode."""
descriptions = {
0: "Clear sky",
1: "Mainly clear",
2: "Partly cloudy",
3: "Overcast",
45: "Fog",
48: "Depositing rime fog",
51: "Light drizzle",
53: "Moderate drizzle",
55: "Dense drizzle",
56: "Freezing drizzle",
57: "Freezing drizzle",
61: "Slight rain",
63: "Moderate rain",
65: "Heavy rain",
66: "Freezing rain",
67: "Freezing rain",
71: "Slight snow fall",
73: "Moderate snow fall",
75: "Heavy snow fall",
77: "Snow grains",
80: "Slight rain showers",
81: "Moderate rain showers",
82: "Violent rain showers",
85: "Slight snow showers",
86: "Heavy snow showers",
95: "Thunderstorm",
96: "Thunderstorm w/ slight hail",
99: "Thunderstorm w/ heavy hail"
}
return descriptions.get(code, "Unknown")
def reverse_geocode(lat, lon):
"""
Use Photon (via geopy) to convert latitude and longitude into a human-readable address.
If the geocoding fails, returns a fallback string with the coordinates.
"""
try:
location = photon_geolocator.reverse((lat, lon), exactly_one=True)
if location:
return location.address
except (GeocoderTimedOut, GeocoderServiceError) as e:
print("Photon reverse geocode error:", e)
return f"Lat: {lat}, Lon: {lon}"
# -----------------------------
# Twilio WhatsApp Integration
# -----------------------------
def check_and_collect_alerts(forecast_list):
"""
Check the forecast for hazardous weather conditions and collect alert messages.
"""
alerts = []
for day in forecast_list:
if day.get("tmax") and day.get("tmax") > 40:
alerts.append(f"{day['day_name']} ({day['date_str']}): Extreme heat. Please take necessary precautions.")
elif day.get("desc") and any(keyword in day['desc'].lower() for keyword in ["rain", "drizzle", "overcast", "thunderstorm"]):
alerts.append(f"{day['day_name']} ({day['date_str']}): {day['desc']}. Please take necessary precautions.")
return alerts
def send_whatsapp_message(message, location, location_address):
"""
Send a WhatsApp message using Twilio API.
"""
google_maps_url = f"https://www.google.com/maps?q={location[0]},{location[1]}"
message_content = (
f"Weather Alerts for Your Farm :\n"
f"{message}\n"
f"Location (lat, lon): {location[0]}, {location[1]}\n"
f"Location: {location_address}\n"
f"Map: {google_maps_url}"
)
account_sid = 'AC490e071f8d01bf0df2f03d086c788d87'
auth_token = '224b23b950ad5a4052aba15893fdf083'
client = Client(account_sid, auth_token)
msg = client.messages.create(
from_='whatsapp:+14155238886',
body=message_content,
to='whatsapp:+917559355282'
)
print("Twilio Message SID:", msg.sid)
@app.route("/", methods=["GET", "POST"])
def index():
# Default coordinates
default_lat = 18.5196
default_lon = 73.8553
if request.method == "POST":
try:
lat = float(request.form.get("lat", default_lat))
lon = float(request.form.get("lon", default_lon))
except ValueError:
lat, lon = default_lat, default_lon
else:
lat = float(request.args.get("lat", default_lat))
lon = float(request.args.get("lon", default_lon))
location_address = reverse_geocode(lat, lon)
# Call Open-Meteo API for forecast data
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"hourly": (
"temperature_2m,relative_humidity_2m,precipitation,"
"cloudcover,windspeed_10m,pressure_msl,soil_moisture_3_to_9cm,uv_index"
),
"daily": (
"weathercode,temperature_2m_max,temperature_2m_min,"
"sunrise,sunset,uv_index_max"
),
"current_weather": True,
"forecast_days": 14,
"timezone": "auto"
}
resp = requests.get(url, params=params)
data = resp.json()
timezone = data.get("timezone", "Local")
current_weather = data.get("current_weather", {})
current_temp = current_weather.get("temperature")
current_time = current_weather.get("time")
current_code = current_weather.get("weathercode")
current_icon = get_weather_icon(current_code)
current_desc = get_weather_description(current_code)
current_wind_speed = current_weather.get("windspeed", 0.0)
current_wind_dir = current_weather.get("winddirection", 0)
if current_time:
dt_current = parse_iso_datetime(current_time)
current_time_formatted = dt_current.strftime("%A, %b %d, %Y %I:%M %p")
else:
current_time_formatted = ""
hourly_data = data.get("hourly", {})
hour_times = hourly_data.get("time", [])
hour_temp = hourly_data.get("temperature_2m", [])
hour_humidity = hourly_data.get("relative_humidity_2m", [])
hour_precip = hourly_data.get("precipitation", [])
hour_clouds = hourly_data.get("cloudcover", [])
hour_wind = hourly_data.get("windspeed_10m", [])
hour_pressure = hourly_data.get("pressure_msl", [])
hour_soil = hourly_data.get("soil_moisture_3_to_9cm", [])
hour_uv = hourly_data.get("uv_index", [])
current_index = None
if current_time:
current_index = find_closest_hour_index(hour_times, current_time)
feels_like = current_temp
if current_index is not None and current_index < len(hour_humidity):
h = hour_humidity[current_index]
feels_like = round(current_temp - 0.2 * (100 - h) / 10, 1)
today_highlights = {}
if current_index is not None:
today_highlights["humidity"] = hour_humidity[current_index] if current_index < len(hour_humidity) else None
today_highlights["precipitation"] = hour_precip[current_index] if current_index < len(hour_precip) else None
today_highlights["clouds"] = hour_clouds[current_index] if current_index < len(hour_clouds) else None
today_highlights["windspeed"] = hour_wind[current_index] if current_index < len(hour_wind) else None
today_highlights["pressure"] = hour_pressure[current_index] if current_index < len(hour_pressure) else None
today_highlights["soil_moisture"] = hour_soil[current_index] if current_index < len(hour_soil) else None
today_highlights["uv_index"] = hour_uv[current_index] if current_index < len(hour_uv) else None
else:
for k in ["humidity", "precipitation", "cloudcover", "windspeed", "pressure", "soil_moisture", "uv_index"]:
today_highlights[k] = None
daily_data = data.get("daily", {})
daily_sunrise = daily_data.get("sunrise", [])
daily_sunset = daily_data.get("sunset", [])
if len(daily_sunrise) > 0:
today_highlights["sunrise"] = daily_sunrise[0][11:16]
else:
today_highlights["sunrise"] = None
if len(daily_sunset) > 0:
today_highlights["sunset"] = daily_sunset[0][11:16]
else:
today_highlights["sunset"] = None
daily_times = daily_data.get("time", [])
daily_codes = daily_data.get("weathercode", [])
daily_tmax = daily_data.get("temperature_2m_max", [])
daily_tmin = daily_data.get("temperature_2m_min", [])
forecast_list = []
def get_hour_temp(date_str, hour_str):
target = date_str + "T" + hour_str + ":00"
best_idx = None
best_diff = None
dt_target = parse_iso_datetime(target)
for i, ht in enumerate(hour_times):
dt_ht = parse_iso_datetime(ht)
diff = abs((dt_ht - dt_target).total_seconds())
if best_diff is None or diff < best_diff:
best_diff = diff
best_idx = i
if best_idx is not None and best_idx < len(hour_temp):
return hour_temp[best_idx]
return None
for i in range(len(daily_times)):
date_str = daily_times[i]
dt_obj = parse_iso_datetime(date_str)
day_name = dt_obj.strftime("%A")
short_date = dt_obj.strftime("%b %d")
code = daily_codes[i] if i < len(daily_codes) else None
icon = get_weather_icon(code)
desc = get_weather_description(code)
tmax = daily_tmax[i] if i < len(daily_tmax) else None
tmin = daily_tmin[i] if i < len(daily_tmin) else None
avg_temp = round((tmax + tmin) / 2, 1) if tmax is not None and tmin is not None else None
morning_temp = get_hour_temp(date_str, "09")
evening_temp = get_hour_temp(date_str, "21")
sr = daily_sunrise[i][11:16] if i < len(daily_sunrise) else None
ss = daily_sunset[i][11:16] if i < len(daily_sunset) else None
forecast_list.append({
"day_name": day_name,
"date_str": short_date,
"icon": icon,
"desc": desc,
"avg_temp": avg_temp,
"morning_temp": morning_temp,
"evening_temp": evening_temp,
"sunrise": sr,
"sunset": ss,
"tmax": tmax,
"tmin": tmin
})
alerts = check_and_collect_alerts(forecast_list)
if alerts:
alert_message = "\n".join(alerts)
send_whatsapp_message(alert_message, (lat, lon), location_address)
alerts_sent = True
else:
alerts_sent = False
return render_template(
"index.html",
lat=lat,
lon=lon,
location_address=location_address,
current_temp=current_temp,
current_icon=current_icon,
current_desc=current_desc,
current_time=current_time_formatted,
current_wind_speed=current_wind_speed,
current_wind_dir=current_wind_dir,
feels_like=feels_like,
today_highlights=today_highlights,
forecast_list=forecast_list,
timezone=timezone,
alerts_sent=alerts_sent
)
if __name__ == "__main__":
app.run(debug=True)