# pws_service.py (Robust Version) import requests import json from datetime import datetime # ✨ CORRECTED IMPORT from config.settings import PWS_API_URL, CWA_PWS_EARTHQUAKE_API def fetch_latest_pws_info() -> str: """ 從 MCP PWS 伺服器擷取最新的 PWS (Public Weather Service) 發布情形。 此版本經過優化,能穩定處理 Gradio API 的 Server-Sent Events (SSE) 串流。 """ final_report = None try: # 使用 stream=True 保持連線,接收伺服器持續發送的事件 with requests.get(PWS_API_URL, timeout=20, stream=True) as r: r.raise_for_status() # 確保 HTTP 連線成功 # 迭代處理從伺服器接收到的每一行資料 for line in r.iter_lines(): if line: decoded_line = line.decode('utf-8') # SSE 事件的資料以 "data: " 開頭 if decoded_line.startswith('data:'): json_data_str = decoded_line[len('data:'):].strip() try: data = json.loads(json_data_str) # Gradio API 在完成時會發送 'process_completed' 訊息 if data.get("msg") == "process_completed": # 最終結果通常在 output['data'][0] 中 output_data = data.get("output", {}).get("data", []) if output_data: final_report = output_data[0] break # 收到最終結果,跳出迴圈 except (json.JSONDecodeError, KeyError, IndexError): # 如果中途的訊息格式不符或不含所需資料,靜默忽略並繼續等待 continue # 迴圈結束後,檢查是否成功取得最終報告 if final_report: header = "🛰️ 最新 PWS 發布情形:" separator = "-" * 20 return f"{header}\n{separator}\n{final_report}" else: return "❌ PWS 查詢失敗:已連接伺服器,但未收到有效的最終報告資料。" except requests.exceptions.Timeout: return f"❌ PWS 查詢失敗:連線超時,伺服器沒有在 20 秒內回應。" except requests.exceptions.RequestException as e: return f"❌ PWS 查詢失敗:網路連線錯誤。\n錯誤訊息:{e}" except Exception as e: return f"❌ PWS 查詢失敗:發生未預期的錯誤。\n錯誤訊息:{e}" def fetch_cwa_pws_earthquake_info() -> str: """從指定的 API 端點擷取最新的 CWA PWS 地震訊息並格式化輸出。""" try: response = requests.get(CWA_PWS_EARTHQUAKE_API, timeout=15) response.raise_for_status() data = response.json() source = data.get("source", "未知來源") last_updated_raw = data.get("last_updated") # 格式化資料更新時間 if last_updated_raw: try: last_updated_dt = datetime.fromisoformat(last_updated_raw) last_updated_str = last_updated_dt.strftime('%Y-%m-%d %H:%M') except (ValueError, TypeError): last_updated_str = last_updated_raw else: last_updated_str = "未知" # 建立訊息標頭 lines = [ f"📢 最新地震PWS警報 ({source})", f" (資料更新時間: {last_updated_str})", "─" * 25 ] alerts = data.get("alerts", []) if not alerts: lines.append("\n✅ 目前無最新的地震PWS警報。") return "\n".join(lines) # 逐條解析並格式化警報 for i, alert in enumerate(alerts): title = alert.get("title", "無標題") publish_time_raw = alert.get("publish_time") # 格式化警報發布時間 if publish_time_raw: try: publish_time_dt = datetime.fromisoformat(publish_time_raw) publish_time_str = publish_time_dt.strftime('%Y-%m-%d %H:%M') except (ValueError, TypeError): publish_time_str = publish_time_raw else: publish_time_str = "未知" summary = alert.get("summary", "無摘要。").strip() affected_counties = alert.get("affected_counties", []) affected_str = ", ".join(affected_counties) if affected_counties else "未提供" # 組合單條警報訊息 alert_lines = [ f"🚨 {title} ({publish_time_str})", f"🗺️ 影響地區: {affected_str}", f"💬 內容: {summary}" ] # 將單條警報加入主訊息列表 lines.append("\n".join(alert_lines)) # 在警報之間加入分隔線 if i < len(alerts) - 1: lines.append("-" * 15) return "\n\n".join(lines) except requests.exceptions.Timeout: return "❌ PWS 地震訊息查詢失敗:連線超時。" except requests.exceptions.RequestException as e: return f"❌ PWS 地震訊息查詢失敗:網路連線錯誤。\n錯誤訊息:{e}" except json.JSONDecodeError: return "❌ PWS 地震訊息查詢失敗:無法解析伺服器回傳的資料格式。" except Exception as e: return f"❌ PWS 地震訊息查詢失敗:發生未預期的錯誤。\n錯誤訊息:{e}"