File size: 7,801 Bytes
711bc31 6578635 711bc31 ac49be7 711bc31 ac49be7 711bc31 6578635 711bc31 6578635 711bc31 ac49be7 711bc31 ac49be7 711bc31 ac49be7 711bc31 6578635 711bc31 ac49be7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
from typing import TypedDict, Optional
from climateqa.engine.talk_to_data.ipcc.config import HUGE_MACRO_COUNTRIES, MACRO_COUNTRIES
from climateqa.engine.talk_to_data.config import IPCC_DATASET_URL
class IndicatorPerYearAtLocationQueryParams(TypedDict, total=False):
"""
Parameters for querying the evolution of an indicator per year at a specific location.
Attributes:
indicator_column (str): Name of the climate indicator column.
latitude (str): Latitude of the location.
longitude (str): Longitude of the location.
country_code (str): Country code.
admin1 (str): Administrative region (optional).
"""
indicator_column: str
latitude: str
longitude: str
country_code: str
admin1: Optional[str]
def indicator_per_year_at_location_query(
table: str, params: IndicatorPerYearAtLocationQueryParams
) -> str:
"""
Builds an SQL query to get the evolution of an indicator per year at a specific location.
Args:
table (str): SQL table of the indicator.
params (IndicatorPerYearAtLocationQueryParams): Dictionary with the required params for the query.
Returns:
str: The SQL query string, or an empty string if required parameters are missing.
"""
indicator_column = params.get("indicator_column")
latitude = params.get("latitude")
longitude = params.get("longitude")
country_code = params.get("country_code")
admin1 = params.get("admin1")
if not all([indicator_column, latitude, longitude, country_code]):
return ""
if country_code in MACRO_COUNTRIES:
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}_monthly_macro.parquet'"
sql_query = f"""
SELECT year, scenario, AVG({indicator_column}) as {indicator_column}
FROM {table_path}
WHERE latitude = {latitude} AND longitude = {longitude} AND year >= 1950
GROUP BY scenario, year
ORDER BY year, scenario
"""
elif country_code in HUGE_MACRO_COUNTRIES:
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}_annualy_macro.parquet'"
sql_query = f"""
SELECT year, scenario, {indicator_column}
FROM {table_path}
WHERE latitude = {latitude} AND longitude = {longitude} AND year >= 1950
ORDER BY year, scenario
"""
else:
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}.parquet'"
sql_query = f"""
WITH medians_per_month AS (
SELECT year, scenario, month, MEDIAN({indicator_column}) AS median_value
FROM {table_path}
WHERE latitude = {latitude} AND longitude = {longitude} AND year >= 1950
GROUP BY scenario, year, month
)
SELECT year, scenario, AVG(median_value) AS {indicator_column}
FROM medians_per_month
GROUP BY scenario, year
ORDER BY year, scenario
"""
return sql_query.strip()
class IndicatorPerYearAndSpecificMonthAtLocationQueryParams(TypedDict, total=False):
"""
Parameters for querying the evolution of an indicator per year for a specific month at a specific location.
Attributes:
indicator_column (str): Name of the climate indicator column.
latitude (str): Latitude of the location.
longitude (str): Longitude of the location.
country_code (str): Country code.
month (str): Month targeted
"""
indicator_column: str
latitude: str
longitude: str
country_code: str
month: str
def indicator_per_year_and_specific_month_at_location_query(
table: str, params: IndicatorPerYearAndSpecificMonthAtLocationQueryParams
) -> str:
"""
Builds an SQL query to get the evolution of an indicator per year for a specific month at a specific location.
Args:
table (str): SQL table of the indicator.
params (dict): Dictionary with required params:
- indicator_column (str)
- latitude (str or float)
- longitude (str or float)
- month (int)
Returns:
str: The SQL query string.
"""
indicator_column = params.get("indicator_column")
latitude = params.get("latitude")
longitude = params.get("longitude")
country_code = params.get("country_code")
month = params.get('month_number')
if not all([indicator_column, latitude, longitude, country_code, month]):
return ""
if country_code in (MACRO_COUNTRIES+HUGE_MACRO_COUNTRIES):
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}_monthly_macro.parquet'"
sql_query = f"""
SELECT year, scenario, {indicator_column}
FROM {table_path}
WHERE latitude = {latitude} AND longitude = {longitude} AND year >= 1950 AND month={month}
ORDER BY year, scenario
"""
else:
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}.parquet'"
sql_query = f"""
SELECT year, scenario, MEDIAN({indicator_column}) AS {indicator_column}
FROM {table_path}
WHERE latitude = {latitude} AND longitude = {longitude} AND year >= 1950 AND month={month}
GROUP BY scenario, year
"""
return sql_query.strip()
class IndicatorForGivenYearQueryParams(TypedDict, total=False):
"""
Parameters for querying an indicator's values across locations for a specific year.
Attributes:
indicator_column (str): The column name for the climate indicator.
year (str): The year to query.
country_code (str): The country code.
"""
indicator_column: str
year: str
country_code: str
def indicator_for_given_year_query(
table: str, params: IndicatorForGivenYearQueryParams
) -> str:
"""
Builds an SQL query to get the values of an indicator with their latitudes, longitudes,
and scenarios for a given year.
Args:
table (str): SQL table of the indicator.
params (IndicatorForGivenYearQueryParams): Dictionary with the required params for the query.
Returns:
str: The SQL query string, or an empty string if required parameters are missing.
"""
indicator_column = params.get("indicator_column")
year = params.get("year") or 2050
country_code = params.get("country_code")
if not all([indicator_column, year, country_code]):
return ""
if country_code in MACRO_COUNTRIES:
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}_monthly_macro.parquet'"
sql_query = f"""
SELECT latitude, longitude, scenario, AVG({indicator_column}) as {indicator_column}
FROM {table_path}
WHERE year = {year}
GROUP BY latitude, longitude, scenario
ORDER BY latitude, longitude, scenario
"""
elif country_code in HUGE_MACRO_COUNTRIES:
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}_annualy_macro.parquet'"
sql_query = f"""
SELECT latitude, longitude, scenario, {indicator_column}
FROM {table_path}
WHERE year = {year}
ORDER BY latitude, longitude, scenario
"""
else:
table_path = f"'{IPCC_DATASET_URL}/{table.lower()}/{country_code}.parquet'"
sql_query = f"""
WITH medians_per_month AS (
SELECT latitude, longitude, scenario, month, MEDIAN({indicator_column}) AS median_value
FROM {table_path}
WHERE year = {year}
GROUP BY latitude, longitude, scenario, month
)
SELECT latitude, longitude, scenario, AVG(median_value) AS {indicator_column}
FROM medians_per_month
GROUP BY latitude, longitude, scenario
ORDER BY latitude, longitude, scenario
"""
return sql_query.strip()
|