liuyuelintop's picture
Upload folder using huggingface_hub
8e7f687 verified
# Pushover notification service extracted from app.py
import time
import requests
from collections import deque
from config.settings import PUSH_WINDOW_SECONDS, PUSH_MAX_IN_WINDOW, PUSH_DEDUPE_SECONDS
class PushoverService:
"""Rate-limited, de-duplicated Pushover notification service"""
def __init__(self, token: str, user: str):
self.token = token
self.user = user
# Rate limiting and deduplication state
self._recent_pushes = deque() # (timestamp, message)
self._last_seen = {} # message -> last_ts
def _should_push(self, message: str) -> bool:
"""Check if message should be sent based on rate limits and deduplication"""
now = time.time()
# De-dupe identical messages
last = self._last_seen.get(message)
if last and now - last < PUSH_DEDUPE_SECONDS:
return False
# Windowed rate limit
while self._recent_pushes and now - self._recent_pushes[0][0] > PUSH_WINDOW_SECONDS:
self._recent_pushes.popleft()
if len(self._recent_pushes) >= PUSH_MAX_IN_WINDOW:
return False
self._recent_pushes.append((now, message))
self._last_seen[message] = now
return True
def send(self, message: str) -> bool:
"""Send notification if rate limits allow. Returns True if sent, False if skipped."""
if not self._should_push(message):
return False
try:
response = requests.post(
"https://api.pushover.net/1/messages.json",
data={
"token": self.token,
"user": self.user,
"message": message[:1024], # Pushover message limit
},
timeout=10,
)
return response.status_code == 200
except Exception:
# Never crash chat due to notification errors
return False