Spaces:
Running
Running
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Helpers for encoding/decoding server_message_data values. | |
These are Base64URL-encoded proto3 messages with shape: | |
- field 1: string UUID (36 chars) | |
- field 3: google.protobuf.Timestamp (1=seconds, 2=nanos) | |
Supports UUID_ONLY, TIMESTAMP_ONLY, and UUID_AND_TIMESTAMP. | |
""" | |
from typing import Dict, Optional, Tuple | |
import base64 | |
from datetime import datetime, timezone | |
try: | |
from zoneinfo import ZoneInfo # Python 3.9+ | |
except Exception: | |
ZoneInfo = None # type: ignore | |
def _b64url_decode_padded(s: str) -> bytes: | |
t = s.replace("-", "+").replace("_", "/") | |
pad = (-len(t)) % 4 | |
if pad: | |
t += "=" * pad | |
return base64.b64decode(t) | |
def _b64url_encode_nopad(b: bytes) -> str: | |
return base64.urlsafe_b64encode(b).decode("ascii").rstrip("=") | |
def _read_varint(buf: bytes, i: int) -> Tuple[int, int]: | |
shift = 0 | |
val = 0 | |
while i < len(buf): | |
b = buf[i] | |
i += 1 | |
val |= (b & 0x7F) << shift | |
if not (b & 0x80): | |
return val, i | |
shift += 7 | |
if shift > 63: | |
break | |
raise ValueError("invalid varint") | |
def _write_varint(v: int) -> bytes: | |
out = bytearray() | |
vv = int(v) | |
while True: | |
to_write = vv & 0x7F | |
vv >>= 7 | |
if vv: | |
out.append(to_write | 0x80) | |
else: | |
out.append(to_write) | |
break | |
return bytes(out) | |
def _make_key(field_no: int, wire_type: int) -> bytes: | |
return _write_varint((field_no << 3) | wire_type) | |
def _decode_timestamp(buf: bytes) -> Tuple[Optional[int], Optional[int]]: | |
i = 0 | |
seconds: Optional[int] = None | |
nanos: Optional[int] = None | |
while i < len(buf): | |
key, i = _read_varint(buf, i) | |
field_no = key >> 3 | |
wt = key & 0x07 | |
if wt == 0: | |
val, i = _read_varint(buf, i) | |
if field_no == 1: | |
seconds = int(val) | |
elif field_no == 2: | |
nanos = int(val) | |
elif wt == 2: | |
ln, i2 = _read_varint(buf, i) | |
i = i2 + ln | |
elif wt == 1: | |
i += 8 | |
elif wt == 5: | |
i += 4 | |
else: | |
break | |
return seconds, nanos | |
def _encode_timestamp(seconds: Optional[int], nanos: Optional[int]) -> bytes: | |
parts = bytearray() | |
if seconds is not None: | |
parts += _make_key(1, 0) | |
parts += _write_varint(int(seconds)) | |
if nanos is not None: | |
parts += _make_key(2, 0) | |
parts += _write_varint(int(nanos)) | |
return bytes(parts) | |
def decode_server_message_data(b64url: str) -> Dict: | |
try: | |
raw = _b64url_decode_padded(b64url) | |
except Exception as e: | |
return {"error": f"base64url decode failed: {e}"} | |
i = 0 | |
uuid: Optional[str] = None | |
seconds: Optional[int] = None | |
nanos: Optional[int] = None | |
while i < len(raw): | |
key, i = _read_varint(raw, i) | |
field_no = key >> 3 | |
wt = key & 0x07 | |
if wt == 2: | |
ln, i2 = _read_varint(raw, i) | |
i = i2 | |
data = raw[i:i+ln] | |
i += ln | |
if field_no == 1: | |
try: | |
uuid = data.decode("utf-8") | |
except Exception: | |
uuid = None | |
elif field_no == 3: | |
s, n = _decode_timestamp(data) | |
if s is not None: | |
seconds = s | |
if n is not None: | |
nanos = n | |
elif wt == 0: | |
_, i = _read_varint(raw, i) | |
elif wt == 1: | |
i += 8 | |
elif wt == 5: | |
i += 4 | |
else: | |
break | |
iso_utc: Optional[str] = None | |
iso_ny: Optional[str] = None | |
if seconds is not None: | |
micros = int((nanos or 0) / 1000) | |
dt = datetime.fromtimestamp(int(seconds), tz=timezone.utc).replace(microsecond=micros) | |
iso_utc = dt.isoformat().replace("+00:00", "Z") | |
if ZoneInfo is not None: | |
try: | |
iso_ny = dt.astimezone(ZoneInfo("America/New_York")).isoformat() | |
except Exception: | |
iso_ny = None | |
if uuid and (seconds is not None or nanos is not None): | |
t = "UUID_AND_TIMESTAMP" | |
elif uuid: | |
t = "UUID_ONLY" | |
elif seconds is not None or nanos is not None: | |
t = "TIMESTAMP_ONLY" | |
else: | |
t = "UNKNOWN" | |
return { | |
"uuid": uuid, | |
"seconds": seconds, | |
"nanos": nanos, | |
"iso_utc": iso_utc, | |
"iso_ny": iso_ny, | |
"type": t, | |
} | |
def encode_server_message_data(uuid: Optional[str] = None, | |
seconds: Optional[int] = None, | |
nanos: Optional[int] = None) -> str: | |
parts = bytearray() | |
if uuid: | |
b = uuid.encode("utf-8") | |
parts += _make_key(1, 2) | |
parts += _write_varint(len(b)) | |
parts += b | |
if seconds is not None or nanos is not None: | |
ts = _encode_timestamp(seconds, nanos) | |
parts += _make_key(3, 2) | |
parts += _write_varint(len(ts)) | |
parts += ts | |
return _b64url_encode_nopad(bytes(parts)) |