Geophysics_class / services /pws_service.py
cwadayi's picture
Upload 15 files
cc337b3 verified
# pws_service.py (Robust Version)
import requests
import json
from datetime import datetime
# ✨ CORRECTED IMPORT
from config.settings import PWS_API_URL, CWA_PWS_EARTHQUAKE_API
def fetch_latest_pws_info() -> str:
"""
從 MCP PWS 伺服器擷取最新的 PWS (Public Weather Service) 發布情形。
此版本經過優化,能穩定處理 Gradio API 的 Server-Sent Events (SSE) 串流。
"""
final_report = None
try:
# 使用 stream=True 保持連線,接收伺服器持續發送的事件
with requests.get(PWS_API_URL, timeout=20, stream=True) as r:
r.raise_for_status() # 確保 HTTP 連線成功
# 迭代處理從伺服器接收到的每一行資料
for line in r.iter_lines():
if line:
decoded_line = line.decode('utf-8')
# SSE 事件的資料以 "data: " 開頭
if decoded_line.startswith('data:'):
json_data_str = decoded_line[len('data:'):].strip()
try:
data = json.loads(json_data_str)
# Gradio API 在完成時會發送 'process_completed' 訊息
if data.get("msg") == "process_completed":
# 最終結果通常在 output['data'][0] 中
output_data = data.get("output", {}).get("data", [])
if output_data:
final_report = output_data[0]
break # 收到最終結果,跳出迴圈
except (json.JSONDecodeError, KeyError, IndexError):
# 如果中途的訊息格式不符或不含所需資料,靜默忽略並繼續等待
continue
# 迴圈結束後,檢查是否成功取得最終報告
if final_report:
header = "🛰️ 最新 PWS 發布情形:"
separator = "-" * 20
return f"{header}\n{separator}\n{final_report}"
else:
return "❌ PWS 查詢失敗:已連接伺服器,但未收到有效的最終報告資料。"
except requests.exceptions.Timeout:
return f"❌ PWS 查詢失敗:連線超時,伺服器沒有在 20 秒內回應。"
except requests.exceptions.RequestException as e:
return f"❌ PWS 查詢失敗:網路連線錯誤。\n錯誤訊息:{e}"
except Exception as e:
return f"❌ PWS 查詢失敗:發生未預期的錯誤。\n錯誤訊息:{e}"
def fetch_cwa_pws_earthquake_info() -> str:
"""從指定的 API 端點擷取最新的 CWA PWS 地震訊息並格式化輸出。"""
try:
response = requests.get(CWA_PWS_EARTHQUAKE_API, timeout=15)
response.raise_for_status()
data = response.json()
source = data.get("source", "未知來源")
last_updated_raw = data.get("last_updated")
# 格式化資料更新時間
if last_updated_raw:
try:
last_updated_dt = datetime.fromisoformat(last_updated_raw)
last_updated_str = last_updated_dt.strftime('%Y-%m-%d %H:%M')
except (ValueError, TypeError):
last_updated_str = last_updated_raw
else:
last_updated_str = "未知"
# 建立訊息標頭
lines = [
f"📢 最新地震PWS警報 ({source})",
f" (資料更新時間: {last_updated_str})",
"─" * 25
]
alerts = data.get("alerts", [])
if not alerts:
lines.append("\n✅ 目前無最新的地震PWS警報。")
return "\n".join(lines)
# 逐條解析並格式化警報
for i, alert in enumerate(alerts):
title = alert.get("title", "無標題")
publish_time_raw = alert.get("publish_time")
# 格式化警報發布時間
if publish_time_raw:
try:
publish_time_dt = datetime.fromisoformat(publish_time_raw)
publish_time_str = publish_time_dt.strftime('%Y-%m-%d %H:%M')
except (ValueError, TypeError):
publish_time_str = publish_time_raw
else:
publish_time_str = "未知"
summary = alert.get("summary", "無摘要。").strip()
affected_counties = alert.get("affected_counties", [])
affected_str = ", ".join(affected_counties) if affected_counties else "未提供"
# 組合單條警報訊息
alert_lines = [
f"🚨 {title} ({publish_time_str})",
f"🗺️ 影響地區: {affected_str}",
f"💬 內容: {summary}"
]
# 將單條警報加入主訊息列表
lines.append("\n".join(alert_lines))
# 在警報之間加入分隔線
if i < len(alerts) - 1:
lines.append("-" * 15)
return "\n\n".join(lines)
except requests.exceptions.Timeout:
return "❌ PWS 地震訊息查詢失敗:連線超時。"
except requests.exceptions.RequestException as e:
return f"❌ PWS 地震訊息查詢失敗:網路連線錯誤。\n錯誤訊息:{e}"
except json.JSONDecodeError:
return "❌ PWS 地震訊息查詢失敗:無法解析伺服器回傳的資料格式。"
except Exception as e:
return f"❌ PWS 地震訊息查詢失敗:發生未預期的錯誤。\n錯誤訊息:{e}"