import json import logging from typing import Callable, Awaitable, Dict, Any, List import aio_pika from aiormq.exceptions import ChannelInvalidStateError Handler = Callable[[Any], Awaitable[None]] logger = logging.getLogger(__name__) class RabbitListenerBase: def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]): self._base = base self._instance_name = instance_name self._handlers = handlers self._consumers: List[aio_pika.abc.AbstractRobustQueue] = [] def _qname(self, exchange: str, routing_keys: List[str]) -> str: rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or "" suffix = f"-{rk_part}" if rk_part else "" return f"{self._instance_name}-{exchange}{suffix}" async def start(self, declarations: List[dict]) -> None: for d in declarations: exch = d["ExchangeName"] ttl = d.get("MessageTimeout") or None rks = d.get("RoutingKeys") or [""] qname = self._qname(exch, rks) q = await self._base.declare_queue_bind( exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl ) # manual ack, parity with .NET (autoAck: false) await q.consume(self._make_consumer(d["FuncName"]), no_ack=False) self._consumers.append(q) def _make_consumer(self, func_name: str): handler = self._handlers.get(func_name) async def _on_msg(msg: aio_pika.IncomingMessage): # decode try: raw_body = msg.body.decode("utf-8", errors="replace") logger.info("Received message for handler '%s': %s", func_name, raw_body) try: envelope = json.loads(raw_body) except Exception: logger.exception("Invalid JSON for '%s'", func_name) envelope = {"data": None} data = envelope.get("data", None) except Exception: # if we cannot decode, ack to drop (matches .NET non-requeue behavior) try: await msg.ack() except Exception: pass return # ACK FIRST (like C#) try: await msg.ack() except ChannelInvalidStateError: # channel died; message may be redelivered; avoid loops logger.warning("Ack failed: channel invalid for '%s'. Skipping ack.", func_name) return except Exception: # swallow ack errors to avoid crash; mirrors resilient .NET behavior logger.exception("Ack error for '%s'", func_name) return # run handler after ack; if it fails, caller handles own idempotency if handler: try: await handler(data) except Exception: logger.exception("Handler error for '%s'", func_name) else: logger.error("No handler bound for '%s'", func_name) return _on_msg