# oa_server.py from __future__ import annotations import json, time, uuid, logging from typing import Any, Dict, List, AsyncIterable, Optional from backends_base import ChatBackend, ImagesBackend from rabbit_repo import RabbitRepo logger = logging.getLogger(__name__) # ------------------ helpers ------------------ def _now() -> int: return int(time.time()) def _chunk_text(s: str, sz: int = 140) -> List[str]: return [s[i:i+sz] for i in range(0, len(s or ""), sz)] if s else [] def _last_user_text(messages: List[Dict[str, Any]]) -> str: for m in reversed(messages or []): if (m or {}).get("role") == "user": c = m.get("content", "") if isinstance(c, str): return c if isinstance(c, list): texts = [p.get("text","") for p in c if p.get("type") == "text"] return " ".join([t for t in texts if t]) return "" # ------------------ handler class ------------------ class OpenAIServers: """ Handlers you can register in RabbitListenerBase: - 'oaChatCreate' -> handle_chat_create - 'oaImagesGenerate' -> handle_images_generate Uses RabbitRepo.publish(...) to emit CloudEvent-wrapped OpenAI JSON. """ def __init__(self, publisher: RabbitRepo, *, chat_backend: Optional[ChatBackend] = None, images_backend: Optional[ImagesBackend] = None): self._pub = publisher self._chat = chat_backend self._img = images_backend # -------- Chat Completions -------- async def handle_chat_create(self, data: Dict[str, Any]) -> None: """ data: OpenAI chat request + 'reply_key' (string) Server publishes to exchange 'oa.chat.reply' with routing_key = reply_key. """ if not isinstance(data, dict): logger.warning("oaChatCreate: data is not a dict") return reply_key = data.get("reply_key") if not reply_key: logger.error("oaChatCreate: missing reply_key") return try: async for chunk in self._chat.stream(data): try: await self._pub.publish("oa.chat.reply", chunk, routing_key=reply_key) except Exception: logger.exception("oaChatCreate: publish failed") break # stop streaming on publish failure # Optional sentinel try: await self._pub.publish("oa.chat.reply", {"object": "stream.end"}, routing_key=reply_key) except Exception: logger.exception("oaChatCreate: publish sentinel failed") except Exception: logger.exception("oaChatCreate: streaming failed") # -------- Images (generations) -------- async def handle_images_generate(self, data: Dict[str, Any]) -> None: """ data: OpenAI images.generate request + 'reply_key' (string) """ if not isinstance(data, dict): logger.warning("oaImagesGenerate: data is not a dict") return reply_key = data.get("reply_key") if not reply_key: logger.error("oaImagesGenerate: missing reply_key") return try: b64 = await self._img.generate_b64(data) resp = {"created": _now(), "data":[{"b64_json": b64}]} try: await self._pub.publish("oa.images.reply", resp, routing_key=reply_key) except Exception: logger.exception("oaImagesGenerate: publish failed") except Exception: logger.exception("oaImagesGenerate: generation failed") # --- at the bottom of oa_server.py --- # Provide aliases expected by vllm_backend.py try: ChatBackend # type: ignore[name-defined] except NameError: try: from typing import TYPE_CHECKING # If your actual names differ, map them here: ChatBackend = ChatCompletionsBackend # noqa: F821 except Exception: pass try: ImagesBackend # type: ignore[name-defined] except NameError: try: ImagesBackend = ImageGenerationsBackend # noqa: F821 except Exception: pass