Spaces:
Running
Running
File size: 5,727 Bytes
3aab70e cc337b3 3aab70e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# pws_service.py (Robust Version)
import requests
import json
from datetime import datetime
# ✨ CORRECTED IMPORT
from config.settings import PWS_API_URL, CWA_PWS_EARTHQUAKE_API
def fetch_latest_pws_info() -> str:
"""
從 MCP PWS 伺服器擷取最新的 PWS (Public Weather Service) 發布情形。
此版本經過優化,能穩定處理 Gradio API 的 Server-Sent Events (SSE) 串流。
"""
final_report = None
try:
# 使用 stream=True 保持連線,接收伺服器持續發送的事件
with requests.get(PWS_API_URL, timeout=20, stream=True) as r:
r.raise_for_status() # 確保 HTTP 連線成功
# 迭代處理從伺服器接收到的每一行資料
for line in r.iter_lines():
if line:
decoded_line = line.decode('utf-8')
# SSE 事件的資料以 "data: " 開頭
if decoded_line.startswith('data:'):
json_data_str = decoded_line[len('data:'):].strip()
try:
data = json.loads(json_data_str)
# Gradio API 在完成時會發送 'process_completed' 訊息
if data.get("msg") == "process_completed":
# 最終結果通常在 output['data'][0] 中
output_data = data.get("output", {}).get("data", [])
if output_data:
final_report = output_data[0]
break # 收到最終結果,跳出迴圈
except (json.JSONDecodeError, KeyError, IndexError):
# 如果中途的訊息格式不符或不含所需資料,靜默忽略並繼續等待
continue
# 迴圈結束後,檢查是否成功取得最終報告
if final_report:
header = "🛰️ 最新 PWS 發布情形:"
separator = "-" * 20
return f"{header}\n{separator}\n{final_report}"
else:
return "❌ PWS 查詢失敗:已連接伺服器,但未收到有效的最終報告資料。"
except requests.exceptions.Timeout:
return f"❌ PWS 查詢失敗:連線超時,伺服器沒有在 20 秒內回應。"
except requests.exceptions.RequestException as e:
return f"❌ PWS 查詢失敗:網路連線錯誤。\n錯誤訊息:{e}"
except Exception as e:
return f"❌ PWS 查詢失敗:發生未預期的錯誤。\n錯誤訊息:{e}"
def fetch_cwa_pws_earthquake_info() -> str:
"""從指定的 API 端點擷取最新的 CWA PWS 地震訊息並格式化輸出。"""
try:
response = requests.get(CWA_PWS_EARTHQUAKE_API, timeout=15)
response.raise_for_status()
data = response.json()
source = data.get("source", "未知來源")
last_updated_raw = data.get("last_updated")
# 格式化資料更新時間
if last_updated_raw:
try:
last_updated_dt = datetime.fromisoformat(last_updated_raw)
last_updated_str = last_updated_dt.strftime('%Y-%m-%d %H:%M')
except (ValueError, TypeError):
last_updated_str = last_updated_raw
else:
last_updated_str = "未知"
# 建立訊息標頭
lines = [
f"📢 最新地震PWS警報 ({source})",
f" (資料更新時間: {last_updated_str})",
"─" * 25
]
alerts = data.get("alerts", [])
if not alerts:
lines.append("\n✅ 目前無最新的地震PWS警報。")
return "\n".join(lines)
# 逐條解析並格式化警報
for i, alert in enumerate(alerts):
title = alert.get("title", "無標題")
publish_time_raw = alert.get("publish_time")
# 格式化警報發布時間
if publish_time_raw:
try:
publish_time_dt = datetime.fromisoformat(publish_time_raw)
publish_time_str = publish_time_dt.strftime('%Y-%m-%d %H:%M')
except (ValueError, TypeError):
publish_time_str = publish_time_raw
else:
publish_time_str = "未知"
summary = alert.get("summary", "無摘要。").strip()
affected_counties = alert.get("affected_counties", [])
affected_str = ", ".join(affected_counties) if affected_counties else "未提供"
# 組合單條警報訊息
alert_lines = [
f"🚨 {title} ({publish_time_str})",
f"🗺️ 影響地區: {affected_str}",
f"💬 內容: {summary}"
]
# 將單條警報加入主訊息列表
lines.append("\n".join(alert_lines))
# 在警報之間加入分隔線
if i < len(alerts) - 1:
lines.append("-" * 15)
return "\n\n".join(lines)
except requests.exceptions.Timeout:
return "❌ PWS 地震訊息查詢失敗:連線超時。"
except requests.exceptions.RequestException as e:
return f"❌ PWS 地震訊息查詢失敗:網路連線錯誤。\n錯誤訊息:{e}"
except json.JSONDecodeError:
return "❌ PWS 地震訊息查詢失敗:無法解析伺服器回傳的資料格式。"
except Exception as e:
return f"❌ PWS 地震訊息查詢失敗:發生未預期的錯誤。\n錯誤訊息:{e}" |