Spaces:
Sleeping
Sleeping
# EUDR INGESTOR | |
import gradio as gr | |
import os | |
import logging | |
from datetime import datetime | |
from pathlib import Path | |
from gradio_client import Client, handle_file | |
import pandas as pd | |
# Local imports | |
from .utils import getconfig | |
config = getconfig("params.cfg") | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
hf_token = os.getenv('HF_TOKEN') | |
if not hf_token: | |
raise ValueError("HF_TOKEN environment variable not found") | |
# WHISP API configuration | |
WHISP_API_URL = config.get('whisp', 'WHISP_API_URL', fallback="https://giz-chatfed-whisp.hf.space/") | |
def get_value(df, colname): | |
"""Fetch value from WhispAPI-style Column/Value dataframe""" | |
if "Column" in df.columns and "Value" in df.columns: | |
match = df.loc[df["Column"] == colname, "Value"] | |
if not match.empty: | |
return match.values[0] | |
return "No disponible" | |
def format_whisp_statistics(df): | |
"""Format WhispAPI statistics into readable text for RAG context""" | |
try: | |
# Country code mapping | |
country_codes = { | |
'HND': 'Honduras', 'GTM': 'Guatemala', 'ECU': 'Ecuador', | |
'COL': 'Colombia', 'PER': 'Peru', 'BRA': 'Brasil', | |
'BOL': 'Bolivia', 'CRI': 'Costa Rica', 'PAN': 'Panamá', | |
'NIC': 'Nicaragua' | |
} | |
country_raw = get_value(df, "Country") | |
country = country_codes.get(country_raw, country_raw) | |
admin_level = get_value(df, "Admin_Level_1") | |
area_raw = get_value(df, "Area") | |
# Format area | |
try: | |
area_num = float(area_raw) | |
if area_num < 1: | |
area_text = f"{area_num:.3f} hectáreas" | |
elif area_num < 100: | |
area_text = f"{area_num:.2f} hectáreas" | |
else: | |
area_text = f"{area_num:,.1f} hectáreas" | |
except: | |
area_text = str(area_raw) if area_raw != "Not available" else "No disponible" | |
# Risk assessments | |
risk_pcrop = get_value(df, "risk_pcrop") | |
risk_acrop = get_value(df, "risk_acrop") | |
risk_timber = get_value(df, "risk_timber") | |
def_after_2020_raw = get_value(df, "TMF_def_after_2020") | |
def_before_2020_raw = get_value(df, "TMF_def_before_2020") | |
# Format for RAG context | |
context = f"""=== ANÁLISIS GEOGRÁFICO WHISP API === | |
País: {country} | |
Región administrativa: {admin_level} | |
Área total: {area_text} | |
EVALUACIÓN DE RIESGO DE DEFORESTACIÓN: | |
- Cultivos permanentes (Café, cacao, aceite de palma): {risk_pcrop} | |
- Cultivos anuales (Soja, maíz, arroz): {risk_acrop} | |
- Extracción de madera: {risk_timber} | |
DATOS DE DEFORESTACIÓN: | |
- Deforestación antes de 2020: {def_before_2020_raw} hectáreas | |
- Deforestación después de 2020: {def_after_2020_raw} hectáreas | |
Fuente: Forest Data Partnership (FDaP) WhispAPI | |
Fecha de análisis: {datetime.now().isoformat()}""" | |
return context | |
except Exception as e: | |
return f"Error en el análisis geográfico: {str(e)}" | |
def process_geojson_whisp(file_content: bytes, filename: str) -> tuple[str, dict]: | |
"""Process GeoJSON file through WHISP API and return formatted context""" | |
try: | |
# Create temporary file for WHISP API | |
import tempfile | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.geojson') as tmp_file: | |
tmp_file.write(file_content) | |
tmp_file_path = tmp_file.name | |
try: | |
# Call WHISP API with authentication | |
client = Client(WHISP_API_URL, hf_token=hf_token) | |
result = client.predict( | |
file=handle_file(tmp_file_path), | |
api_name="/get_statistics" | |
) | |
# Convert result to DataFrame | |
df = pd.DataFrame(result['data'], columns=result['headers']) | |
# Format for RAG context | |
formatted_context = format_whisp_statistics(df) | |
metadata = { | |
"analysis_type": "whisp_geojson", | |
"country": get_value(df, "Country"), | |
"admin_level": get_value(df, "Admin_Level_1"), | |
"area": get_value(df, "Area"), | |
"risk_levels": { | |
"pcrop": get_value(df, "risk_pcrop"), | |
"acrop": get_value(df, "risk_acrop"), | |
"timber": get_value(df, "risk_timber") | |
} | |
} | |
return formatted_context, metadata | |
finally: | |
# Clean up temporary file | |
os.unlink(tmp_file_path) | |
except Exception as e: | |
logger.error(f"WHISP API error: {str(e)}") | |
raise Exception(f"Failed to process GeoJSON through WHISP API: {str(e)}") | |
def ingest(file): | |
"""Main ingestion function - processes GeoJSON file and returns WHISP analysis context""" | |
if file is None: | |
return "No file uploaded", "" | |
try: | |
with open(file.name, 'rb') as f: | |
file_content = f.read() | |
filename = os.path.basename(file.name) | |
# Check file extension | |
file_extension = os.path.splitext(filename)[1].lower() | |
if file_extension not in ['.geojson', '.json']: | |
raise ValueError(f"Unsupported file type: {file_extension}. Only GeoJSON files are supported.") | |
# Process through WHISP API | |
context, metadata = process_geojson_whisp(file_content, filename) | |
logger.info(f"Successfully processed GeoJSON {filename} through WHISP API") | |
return context | |
except Exception as e: | |
logger.error(f"GeoJSON processing failed: {str(e)}") | |
raise Exception(f"Processing failed: {str(e)}") | |
if __name__ == "__main__": | |
ui = gr.Interface( | |
fn=ingest, | |
inputs=gr.File( | |
label="GeoJSON Upload", | |
file_types=[".geojson", ".json"] | |
), | |
outputs=gr.Textbox( | |
label="WHISP Analysis Context", | |
lines=15, | |
show_copy_button=True | |
), | |
title="EUDR Ingestion Module - WHISP API", | |
description="Processes GeoJSON files through WHISP API and returns geographic analysis context for RAG pipelines.", | |
api_name="ingest" | |
) | |
ui.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) |