Spaces:
Running
Running
# pws_service.py (Robust Version) | |
import requests | |
import json | |
from datetime import datetime | |
# ✨ CORRECTED IMPORT | |
from config.settings import PWS_API_URL, CWA_PWS_EARTHQUAKE_API | |
def fetch_latest_pws_info() -> str: | |
""" | |
從 MCP PWS 伺服器擷取最新的 PWS (Public Weather Service) 發布情形。 | |
此版本經過優化,能穩定處理 Gradio API 的 Server-Sent Events (SSE) 串流。 | |
""" | |
final_report = None | |
try: | |
# 使用 stream=True 保持連線,接收伺服器持續發送的事件 | |
with requests.get(PWS_API_URL, timeout=20, stream=True) as r: | |
r.raise_for_status() # 確保 HTTP 連線成功 | |
# 迭代處理從伺服器接收到的每一行資料 | |
for line in r.iter_lines(): | |
if line: | |
decoded_line = line.decode('utf-8') | |
# SSE 事件的資料以 "data: " 開頭 | |
if decoded_line.startswith('data:'): | |
json_data_str = decoded_line[len('data:'):].strip() | |
try: | |
data = json.loads(json_data_str) | |
# Gradio API 在完成時會發送 'process_completed' 訊息 | |
if data.get("msg") == "process_completed": | |
# 最終結果通常在 output['data'][0] 中 | |
output_data = data.get("output", {}).get("data", []) | |
if output_data: | |
final_report = output_data[0] | |
break # 收到最終結果,跳出迴圈 | |
except (json.JSONDecodeError, KeyError, IndexError): | |
# 如果中途的訊息格式不符或不含所需資料,靜默忽略並繼續等待 | |
continue | |
# 迴圈結束後,檢查是否成功取得最終報告 | |
if final_report: | |
header = "🛰️ 最新 PWS 發布情形:" | |
separator = "-" * 20 | |
return f"{header}\n{separator}\n{final_report}" | |
else: | |
return "❌ PWS 查詢失敗:已連接伺服器,但未收到有效的最終報告資料。" | |
except requests.exceptions.Timeout: | |
return f"❌ PWS 查詢失敗:連線超時,伺服器沒有在 20 秒內回應。" | |
except requests.exceptions.RequestException as e: | |
return f"❌ PWS 查詢失敗:網路連線錯誤。\n錯誤訊息:{e}" | |
except Exception as e: | |
return f"❌ PWS 查詢失敗:發生未預期的錯誤。\n錯誤訊息:{e}" | |
def fetch_cwa_pws_earthquake_info() -> str: | |
"""從指定的 API 端點擷取最新的 CWA PWS 地震訊息並格式化輸出。""" | |
try: | |
response = requests.get(CWA_PWS_EARTHQUAKE_API, timeout=15) | |
response.raise_for_status() | |
data = response.json() | |
source = data.get("source", "未知來源") | |
last_updated_raw = data.get("last_updated") | |
# 格式化資料更新時間 | |
if last_updated_raw: | |
try: | |
last_updated_dt = datetime.fromisoformat(last_updated_raw) | |
last_updated_str = last_updated_dt.strftime('%Y-%m-%d %H:%M') | |
except (ValueError, TypeError): | |
last_updated_str = last_updated_raw | |
else: | |
last_updated_str = "未知" | |
# 建立訊息標頭 | |
lines = [ | |
f"📢 最新地震PWS警報 ({source})", | |
f" (資料更新時間: {last_updated_str})", | |
"─" * 25 | |
] | |
alerts = data.get("alerts", []) | |
if not alerts: | |
lines.append("\n✅ 目前無最新的地震PWS警報。") | |
return "\n".join(lines) | |
# 逐條解析並格式化警報 | |
for i, alert in enumerate(alerts): | |
title = alert.get("title", "無標題") | |
publish_time_raw = alert.get("publish_time") | |
# 格式化警報發布時間 | |
if publish_time_raw: | |
try: | |
publish_time_dt = datetime.fromisoformat(publish_time_raw) | |
publish_time_str = publish_time_dt.strftime('%Y-%m-%d %H:%M') | |
except (ValueError, TypeError): | |
publish_time_str = publish_time_raw | |
else: | |
publish_time_str = "未知" | |
summary = alert.get("summary", "無摘要。").strip() | |
affected_counties = alert.get("affected_counties", []) | |
affected_str = ", ".join(affected_counties) if affected_counties else "未提供" | |
# 組合單條警報訊息 | |
alert_lines = [ | |
f"🚨 {title} ({publish_time_str})", | |
f"🗺️ 影響地區: {affected_str}", | |
f"💬 內容: {summary}" | |
] | |
# 將單條警報加入主訊息列表 | |
lines.append("\n".join(alert_lines)) | |
# 在警報之間加入分隔線 | |
if i < len(alerts) - 1: | |
lines.append("-" * 15) | |
return "\n\n".join(lines) | |
except requests.exceptions.Timeout: | |
return "❌ PWS 地震訊息查詢失敗:連線超時。" | |
except requests.exceptions.RequestException as e: | |
return f"❌ PWS 地震訊息查詢失敗:網路連線錯誤。\n錯誤訊息:{e}" | |
except json.JSONDecodeError: | |
return "❌ PWS 地震訊息查詢失敗:無法解析伺服器回傳的資料格式。" | |
except Exception as e: | |
return f"❌ PWS 地震訊息查詢失敗:發生未預期的錯誤。\n錯誤訊息:{e}" |