File size: 8,207 Bytes
3bb5fb5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
import streamlit as st
import pandas as pd
from datasets import load_dataset
import re
from datetime import datetime, date
from io import StringIO
from typing import Optional, Tuple, List, Dict, Any
# Constants
DEFAULT_SAMPLE_SIZE = 1000
DATE_FORMAT = "%Y%m%d"
FULL_DATE_FORMAT = f"{DATE_FORMAT}%H%M%S"
# Load dataset with enhanced caching and validation
@st.cache_data(ttl=3600, show_spinner="Loading dataset...")
def load_data(sample_size: int = DEFAULT_SAMPLE_SIZE) -> pd.DataFrame:
"""
Load and validate dataset with error handling.
Args:
sample_size (int): Number of records to load
Returns:
pd.DataFrame: Loaded and validated dataframe
"""
try:
dataset = load_dataset(
"dwb2023/gdelt-gkg-2025-v2",
data_files={
"train": [
"gdelt_gkg_20250210.parquet",
"gdelt_gkg_20250211.parquet",
]
},
split="train"
)
df = pd.DataFrame(dataset)
# Basic data validation
if df.empty:
st.error("Loaded dataset is empty")
return pd.DataFrame()
if "DATE" not in df.columns:
st.error("Dataset missing required DATE column")
return pd.DataFrame()
return df
except Exception as e:
st.error(f"Error loading dataset: {str(e)}")
st.stop()
return pd.DataFrame()
def initialize_app(df: pd.DataFrame) -> None:
"""Initialize the Streamlit app interface."""
st.title("GDELT GKG 2025 Dataset Explorer")
with st.sidebar:
st.header("Search Criteria")
st.markdown("🔍 Filter dataset using the controls below")
def extract_unique_themes(df: pd.DataFrame, column: str) -> List[str]:
"""
Extract and clean unique themes from semicolon-separated column.
Args:
df (pd.DataFrame): Input dataframe
column (str): Column name containing themes
Returns:
List[str]: Sorted list of unique themes
"""
if df.empty:
return []
return sorted({
theme.split(",")[0].strip()
for themes in df[column].dropna().str.split(";")
for theme in themes if theme.strip()
})
def get_date_range(df: pd.DataFrame, date_col: str) -> Tuple[date, date]:
"""
Get min/max dates from dataset with fallback defaults.
Args:
df (pd.DataFrame): Input dataframe
date_col (str): Column name containing dates
Returns:
Tuple[date, date]: (min_date, max_date) as date objects
"""
try:
# Convert YYYYMMDDHHMMSS string format to datetime using constant
dates = pd.to_datetime(df[date_col], format=FULL_DATE_FORMAT)
return dates.min().date(), dates.max().date()
except Exception as e:
st.warning(f"Date range detection failed: {str(e)}")
return datetime(2025, 2, 10).date(), datetime(2025, 2, 11).date()
def create_filters(df: pd.DataFrame) -> Dict[str, Any]:
"""
Generate sidebar filters and return filter state.
Args:
df (pd.DataFrame): Input dataframe
Returns:
Dict[str, Any]: Dictionary of filter settings
"""
filters = {}
with st.sidebar:
# Theme multi-select
filters["themes"] = st.multiselect(
"V2EnhancedThemes (exact match)",
options=extract_unique_themes(df, "V2EnhancedThemes"),
help="Select exact themes to include (supports multiple selection)"
)
# Text-based filters
text_filters = {
"source_common_name": ("SourceCommonName", "partial name match"),
"document_identifier": ("DocumentIdentifier", "partial identifier match"),
"sharing_image": ("V2.1SharingImage", "partial image URL match")
}
for key, (label, help_text) in text_filters.items():
filters[key] = st.text_input(
f"{label} ({help_text})",
placeholder=f"Enter {help_text}...",
help=f"Case-insensitive {help_text}"
)
# Date range with dataset-based defaults
date_col = "DATE"
min_date, max_date = get_date_range(df, date_col)
filters["date_range"] = st.date_input(
"Date range",
value=(min_date, max_date),
min_value=min_date,
max_value=max_date,
)
# Record limit
filters["record_limit"] = st.number_input(
"Max records to display",
min_value=100,
max_value=5000,
value=1000,
step=100,
help="Limit results for better performance"
)
return filters
def apply_filters(df: pd.DataFrame, filters: Dict[str, Any]) -> pd.DataFrame:
"""
Apply all filters to dataframe using vectorized operations.
Args:
df (pd.DataFrame): Input dataframe to filter
filters (Dict[str, Any]): Dictionary containing filter parameters:
- themes (list): List of themes to match exactly
- source_common_name (str): Partial match for source name
- document_identifier (str): Partial match for document ID
- sharing_image (str): Partial match for image URL
- date_range (tuple): (start_date, end_date) tuple
- record_limit (int): Maximum number of records to return
Returns:
pd.DataFrame: Filtered dataframe
"""
filtered_df = df.copy()
# Theme exact match filter - set regex groups to be non-capturing using (?:) syntax
if filters["themes"]:
pattern = r'(?:^|;)(?:{})(?:$|,|;)'.format('|'.join(map(re.escape, filters["themes"])))
filtered_df = filtered_df[filtered_df["V2EnhancedThemes"].str.contains(pattern, na=False)]
# Text partial match filters
text_columns = {
"source_common_name": "SourceCommonName",
"document_identifier": "DocumentIdentifier",
"sharing_image": "V2.1SharingImage"
}
for filter_key, col_name in text_columns.items():
if value := filters.get(filter_key):
filtered_df = filtered_df[
filtered_df[col_name]
.str.contains(re.escape(value), case=False, na=False)
]
# Date range filter with validation
if len(filters["date_range"]) == 2:
start_date, end_date = filters["date_range"]
# Validate date range
if start_date > end_date:
st.error("Start date must be before end date")
return filtered_df
date_col = "DATE"
try:
# Convert full datetime strings to datetime objects using constant
date_series = pd.to_datetime(filtered_df[date_col], format=FULL_DATE_FORMAT)
# Create timestamps for start/end of day
start_timestamp = pd.Timestamp(start_date).normalize() # Start of day
end_timestamp = pd.Timestamp(end_date) + pd.Timedelta(days=1) - pd.Timedelta(seconds=1) # End of day
filtered_df = filtered_df[
(date_series >= start_timestamp) &
(date_series <= end_timestamp)
]
except Exception as e:
st.error(f"Error applying date filter: {str(e)}")
return filtered_df
# Apply record limit
return filtered_df.head(filters["record_limit"])
def main():
"""Main application entry point."""
df = load_data()
if df.empty:
st.warning("No data available - check data source")
return
initialize_app(df)
filters = create_filters(df)
filtered_df = apply_filters(df, filters)
# Display results
st.subheader(f"Results: {len(filtered_df)} records")
st.dataframe(filtered_df, use_container_width=True)
st.download_button(
label="Download CSV",
data=filtered_df.to_csv(index=False).encode(),
file_name="filtered_results.csv",
mime="text/csv",
help="Download filtered results as CSV"
)
main()
|