|
from typing import TypedDict, Optional |
|
|
|
from climateqa.engine.talk_to_data.ipcc.config import HUGE_MACRO_COUNTRIES, MACRO_COUNTRIES |
|
from climateqa.engine.talk_to_data.config import IPCC_DATASET_URL |
|
|
|
class IndicatorPerYearAtLocationQueryParams(TypedDict, total=False): |
|
""" |
|
Parameters for querying the evolution of an indicator per year at a specific location. |
|
|
|
Attributes: |
|
indicator_column (str): Name of the climate indicator column. |
|
latitude (str): Latitude of the location. |
|
longitude (str): Longitude of the location. |
|
country_code (str): Country code. |
|
admin1 (str): Administrative region (optional). |
|
""" |
|
indicator_column: str |
|
latitude: str |
|
longitude: str |
|
country_code: str |
|
admin1: Optional[str] |
|
|
|
def indicator_per_year_at_location_query( |
|
table: str, params: IndicatorPerYearAtLocationQueryParams |
|
) -> str: |
|
""" |
|
Builds an SQL query to get the evolution of an indicator per year at a specific location. |
|
|
|
Args: |
|
table (str): SQL table of the indicator. |
|
params (IndicatorPerYearAtLocationQueryParams): Dictionary with the required params for the query. |
|
|
|
Returns: |
|
str: The SQL query string, or an empty string if required parameters are missing. |
|
""" |
|
indicator_column = params.get("indicator_column") |
|
latitude = params.get("latitude") |
|
longitude = params.get("longitude") |
|
country_code = params.get("country_code") |
|
admin1 = params.get("admin1") |
|
|
|
if not all([indicator_column, latitude, longitude, country_code]): |
|
return "" |
|
|
|
if country_code in MACRO_COUNTRIES: |
|
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}_monthly_macro.parquet'" |
|
sql_query = f""" |
|
SELECT year, scenario, AVG({indicator_column}) as {indicator_column} |
|
FROM {table_path} |
|
WHERE latitude = {latitude} AND longitude = {longitude} AND year >= 1950 |
|
GROUP BY scenario, year |
|
ORDER BY year, scenario |
|
""" |
|
elif country_code in HUGE_MACRO_COUNTRIES: |
|
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}_annualy_macro.parquet'" |
|
sql_query = f""" |
|
SELECT year, scenario, {indicator_column} |
|
FROM {table_path} |
|
WHERE latitude = {latitude} AND longitude = {longitude} AND year >= 1950 |
|
ORDER BY year, scenario |
|
""" |
|
else: |
|
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}.parquet'" |
|
sql_query = f""" |
|
WITH medians_per_month AS ( |
|
SELECT year, scenario, month, MEDIAN({indicator_column}) AS median_value |
|
FROM {table_path} |
|
WHERE latitude = {latitude} AND longitude = {longitude} AND year >= 1950 |
|
GROUP BY scenario, year, month |
|
) |
|
SELECT year, scenario, AVG(median_value) AS {indicator_column} |
|
FROM medians_per_month |
|
GROUP BY scenario, year |
|
ORDER BY year, scenario |
|
""" |
|
return sql_query.strip() |
|
|
|
class IndicatorPerYearAndSpecificMonthAtLocationQueryParams(TypedDict, total=False): |
|
""" |
|
Parameters for querying the evolution of an indicator per year for a specific month at a specific location. |
|
|
|
Attributes: |
|
indicator_column (str): Name of the climate indicator column. |
|
latitude (str): Latitude of the location. |
|
longitude (str): Longitude of the location. |
|
country_code (str): Country code. |
|
month (str): Month targeted |
|
""" |
|
indicator_column: str |
|
latitude: str |
|
longitude: str |
|
country_code: str |
|
month: str |
|
|
|
def indicator_per_year_and_specific_month_at_location_query( |
|
table: str, params: IndicatorPerYearAndSpecificMonthAtLocationQueryParams |
|
) -> str: |
|
""" |
|
Builds an SQL query to get the evolution of an indicator per year for a specific month at a specific location. |
|
|
|
Args: |
|
table (str): SQL table of the indicator. |
|
params (dict): Dictionary with required params: |
|
- indicator_column (str) |
|
- latitude (str or float) |
|
- longitude (str or float) |
|
- month (int) |
|
|
|
Returns: |
|
str: The SQL query string. |
|
""" |
|
indicator_column = params.get("indicator_column") |
|
latitude = params.get("latitude") |
|
longitude = params.get("longitude") |
|
country_code = params.get("country_code") |
|
month = params.get('month_number') |
|
|
|
if not all([indicator_column, latitude, longitude, country_code, month]): |
|
return "" |
|
|
|
if country_code in (MACRO_COUNTRIES+HUGE_MACRO_COUNTRIES): |
|
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}_monthly_macro.parquet'" |
|
sql_query = f""" |
|
SELECT year, scenario, {indicator_column} |
|
FROM {table_path} |
|
WHERE latitude = {latitude} AND longitude = {longitude} AND year >= 1950 AND month={month} |
|
ORDER BY year, scenario |
|
""" |
|
else: |
|
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}.parquet'" |
|
sql_query = f""" |
|
SELECT year, scenario, MEDIAN({indicator_column}) AS {indicator_column} |
|
FROM {table_path} |
|
WHERE latitude = {latitude} AND longitude = {longitude} AND year >= 1950 AND month={month} |
|
GROUP BY scenario, year |
|
""" |
|
return sql_query.strip() |
|
class IndicatorForGivenYearQueryParams(TypedDict, total=False): |
|
""" |
|
Parameters for querying an indicator's values across locations for a specific year. |
|
|
|
Attributes: |
|
indicator_column (str): The column name for the climate indicator. |
|
year (str): The year to query. |
|
country_code (str): The country code. |
|
""" |
|
indicator_column: str |
|
year: str |
|
country_code: str |
|
|
|
def indicator_for_given_year_query( |
|
table: str, params: IndicatorForGivenYearQueryParams |
|
) -> str: |
|
""" |
|
Builds an SQL query to get the values of an indicator with their latitudes, longitudes, |
|
and scenarios for a given year. |
|
|
|
Args: |
|
table (str): SQL table of the indicator. |
|
params (IndicatorForGivenYearQueryParams): Dictionary with the required params for the query. |
|
|
|
Returns: |
|
str: The SQL query string, or an empty string if required parameters are missing. |
|
""" |
|
indicator_column = params.get("indicator_column") |
|
year = params.get("year") or 2050 |
|
country_code = params.get("country_code") |
|
|
|
if not all([indicator_column, year, country_code]): |
|
return "" |
|
|
|
if country_code in MACRO_COUNTRIES: |
|
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}_monthly_macro.parquet'" |
|
sql_query = f""" |
|
SELECT latitude, longitude, scenario, AVG({indicator_column}) as {indicator_column} |
|
FROM {table_path} |
|
WHERE year = {year} |
|
GROUP BY latitude, longitude, scenario |
|
ORDER BY latitude, longitude, scenario |
|
""" |
|
elif country_code in HUGE_MACRO_COUNTRIES: |
|
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}_annualy_macro.parquet'" |
|
sql_query = f""" |
|
SELECT latitude, longitude, scenario, {indicator_column} |
|
FROM {table_path} |
|
WHERE year = {year} |
|
ORDER BY latitude, longitude, scenario |
|
""" |
|
else: |
|
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}.parquet'" |
|
sql_query = f""" |
|
WITH medians_per_month AS ( |
|
SELECT latitude, longitude, scenario, month, MEDIAN({indicator_column}) AS median_value |
|
FROM {table_path} |
|
WHERE year = {year} |
|
GROUP BY latitude, longitude, scenario, month |
|
) |
|
SELECT latitude, longitude, scenario, AVG(median_value) AS {indicator_column} |
|
FROM medians_per_month |
|
GROUP BY latitude, longitude, scenario |
|
ORDER BY latitude, longitude, scenario |
|
""" |
|
|
|
return sql_query.strip() |
|
|