Spaces:
Running
Running
# services/huggingface_service.py | |
from __future__ import annotations | |
from .vlm_service import VLMService, ModelType, ServiceStatus | |
from typing import Dict, Any, List, Optional | |
import aiohttp | |
import base64 | |
import time | |
import re | |
import json | |
import imghdr | |
import os | |
def _env_token() -> Optional[str]: | |
return ( | |
os.getenv("HF_API_KEY") | |
or os.getenv("HF_TOKEN") | |
or os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
) | |
def _providers_url_default() -> str: | |
# OpenAI-compatible gateway on HF Inference Providers | |
return os.getenv("HF_PROVIDERS_URL", "https://api-inference.huggingface.co/providers/openai") | |
class HuggingFaceService(VLMService): | |
""" | |
HuggingFace Inference Providers service implementation (OpenAI-compatible). | |
- No network in __init__ | |
- Short, safe probe() | |
- Lazy use during generate_* | |
""" | |
def __init__(self, api_key: str, model_id: str, providers_url: str, public_name: str | None = None): | |
super().__init__( | |
public_name or (model_id or "HUGGINGFACE"), | |
ModelType.CUSTOM, | |
provider="huggingface", | |
lazy_init=True, | |
) | |
self.api_key = api_key | |
self.model_id = model_id | |
self.providers_url = providers_url | |
# also keep model_name aligned | |
self.model_name = public_name or (model_id or "HUGGINGFACE") | |
if not self.api_key or not self.model_id: | |
self.is_available = False | |
self.status = ServiceStatus.DEGRADED | |
# ---------- helpers ---------- | |
def _guess_mime(self, image_bytes: bytes) -> str: | |
kind = imghdr.what(None, h=image_bytes) | |
if kind == "jpeg": | |
return "image/jpeg" | |
if kind == "png": | |
return "image/png" | |
if kind == "gif": | |
return "image/gif" | |
if kind == "webp": | |
return "image/webp" | |
return "image/jpeg" | |
# ---------- lifecycle ---------- | |
async def probe(self) -> bool: | |
""" | |
Lightweight reachability check. | |
- Validates token with whoami | |
- Checks model endpoint exists/reachable | |
Never raises, returns bool. | |
""" | |
if not self.api_key or not self.model_id: | |
return False | |
try: | |
timeout = aiohttp.ClientTimeout(total=5) | |
headers_auth = {"Authorization": f"Bearer {self.api_key}"} | |
async with aiohttp.ClientSession(timeout=timeout) as session: | |
# Token check | |
r1 = await session.get("https://huggingface.co/api/whoami-v2", headers=headers_auth) | |
if r1.status != 200: | |
return False | |
# Model reachability (Inference API — GET is fine) | |
r2 = await session.get(f"https://api-inference.huggingface.co/models/{self.model_id}", headers=headers_auth) | |
# Consider 200, 503 (loading), 403/404 (exists but gated/private) as "reachable" | |
if r2.status in (200, 503, 403, 404): | |
return True | |
return False | |
except Exception: | |
return False | |
async def ensure_ready(self) -> bool: | |
# Nothing to warm here; we keep it trivial. | |
self._initialized = True | |
return True | |
# ---------- caption APIs ---------- | |
async def generate_caption( | |
self, | |
image_bytes: bytes, | |
prompt: str, | |
metadata_instructions: str = "", | |
) -> Dict[str, Any]: | |
""" | |
Generate caption using HF Inference Providers (OpenAI-style chat). | |
""" | |
if not self.api_key or not self.model_id: | |
raise Exception("MODEL_UNAVAILABLE: HuggingFace credentials or model_id missing.") | |
start_time = time.time() | |
instruction = (prompt or "").strip() | |
if metadata_instructions: | |
instruction += "\n\n" + metadata_instructions.strip() | |
mime = self._guess_mime(image_bytes) | |
data_url = f"data:{mime};base64,{base64.b64encode(image_bytes).decode('utf-8')}" | |
headers = { | |
"Authorization": f"Bearer {self.api_key}", | |
"Content-Type": "application/json", | |
} | |
payload = { | |
"model": self.model_id, | |
"messages": [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": instruction}, | |
{"type": "image_url", "image_url": {"url": data_url}}, | |
], | |
} | |
], | |
"max_tokens": 512, | |
"temperature": 0.2, | |
} | |
try: | |
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60)) as session: | |
async with session.post( | |
self.providers_url, | |
headers=headers, | |
json=payload, | |
) as resp: | |
raw_text = await resp.text() | |
if resp.status != 200: | |
# Surface a consistent, catchable error for fallback | |
raise Exception(f"MODEL_UNAVAILABLE: {self.model_name} unavailable (HTTP {resp.status}).") | |
result = await resp.json() | |
except Exception as e: | |
# Never leak aiohttp exceptions outward as-is; normalize to your fallback signal | |
if "MODEL_UNAVAILABLE" not in str(e): | |
raise Exception(f"MODEL_UNAVAILABLE: {self.model_name} is unavailable due to a network/error.") | |
raise | |
# ----- Parse response ----- | |
message = (result.get("choices") or [{}])[0].get("message", {}) | |
content = message.get("content", "") | |
# GLM models sometimes put content in reasoning_content | |
if not content and message.get("reasoning_content"): | |
content = message.get("reasoning_content", "") | |
if isinstance(content, list): | |
parts = [] | |
for block in content: | |
if isinstance(block, dict): | |
parts.append(block.get("text") or block.get("content") or "") | |
else: | |
parts.append(str(block)) | |
content = "\n".join([p for p in parts if p]) | |
caption = (content or "").strip() | |
# Strip accidental fenced JSON | |
if caption.startswith("```json"): | |
caption = re.sub(r"^```json\s*", "", caption) | |
caption = re.sub(r"\s*```$", "", caption) | |
metadata = {} | |
description = "" | |
analysis = caption | |
recommended_actions = "" | |
try: | |
parsed = json.loads(caption) | |
description = parsed.get("description", "") | |
analysis = parsed.get("analysis", caption) | |
recommended_actions = parsed.get("recommended_actions", "") | |
metadata = parsed.get("metadata", {}) | |
caption_text = f"Description: {description}\n\nAnalysis: {analysis}\n\nRecommended Actions: {recommended_actions}" | |
except json.JSONDecodeError: | |
parsed = None | |
caption_text = caption | |
elapsed = time.time() - start_time | |
return { | |
"caption": caption_text, | |
"metadata": metadata, | |
"confidence": None, | |
"processing_time": elapsed, | |
"raw_response": { | |
"model": self.model_id, | |
"content": content, | |
"parsed": parsed, | |
}, | |
"description": description, | |
"analysis": analysis, | |
"recommended_actions": recommended_actions, | |
} | |
async def generate_multi_image_caption( | |
self, | |
image_bytes_list: List[bytes], | |
prompt: str, | |
metadata_instructions: str = "", | |
) -> Dict[str, Any]: | |
""" | |
Generate caption for multiple images using HF Inference Providers (OpenAI-style chat). | |
""" | |
if not self.api_key or not self.model_id: | |
raise Exception("MODEL_UNAVAILABLE: HuggingFace credentials or model_id missing.") | |
start_time = time.time() | |
instruction = (prompt or "").strip() | |
if metadata_instructions: | |
instruction += "\n\n" + metadata_instructions.strip() | |
headers = { | |
"Authorization": f"Bearer {self.api_key}", | |
"Content-Type": "application/json", | |
} | |
content = [{"type": "text", "text": instruction}] | |
for image_bytes in image_bytes_list: | |
mime = self._guess_mime(image_bytes) | |
data_url = f"data:{mime};base64,{base64.b64encode(image_bytes).decode('utf-8')}" | |
content.append({"type": "image_url", "image_url": {"url": data_url}}) | |
payload = { | |
"model": self.model_id, | |
"messages": [{"role": "user", "content": content}], | |
"max_tokens": 800, | |
"temperature": 0.2, | |
} | |
try: | |
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60)) as session: | |
async with session.post( | |
self.providers_url, | |
headers=headers, | |
json=payload, | |
) as resp: | |
raw_text = await resp.text() | |
if resp.status != 200: | |
raise Exception(f"MODEL_UNAVAILABLE: {self.model_name} unavailable (HTTP {resp.status}).") | |
result = await resp.json() | |
except Exception as e: | |
if "MODEL_UNAVAILABLE" not in str(e): | |
raise Exception(f"MODEL_UNAVAILABLE: {self.model_name} is unavailable due to a network/error.") | |
raise | |
message = (result.get("choices") or [{}])[0].get("message", {}) | |
content_out = message.get("content", "") | |
if not content_out and message.get("reasoning_content"): | |
content_out = message.get("reasoning_content", "") | |
if isinstance(content_out, list): | |
parts = [] | |
for block in content_out: | |
if isinstance(block, dict): | |
parts.append(block.get("text") or block.get("content") or "") | |
else: | |
parts.append(str(block)) | |
content_out = "\n".join([p for p in parts if p]) | |
caption = (content_out or "").strip() | |
if caption.startswith("```json"): | |
caption = re.sub(r"^```json\s*", "", caption) | |
caption = re.sub(r"\s*```$", "", caption) | |
metadata = {} | |
description = "" | |
analysis = caption | |
recommended_actions = "" | |
try: | |
parsed = json.loads(caption) | |
description = parsed.get("description", "") | |
analysis = parsed.get("analysis", caption) | |
recommended_actions = parsed.get("recommended_actions", "") | |
metadata = parsed.get("metadata", {}) | |
caption_text = f"Description: {description}\n\nAnalysis: {analysis}\n\nRecommended Actions: {recommended_actions}" | |
except json.JSONDecodeError: | |
parsed = None | |
caption_text = caption | |
elapsed = time.time() - start_time | |
return { | |
"caption": caption_text, | |
"metadata": metadata, | |
"confidence": None, | |
"processing_time": elapsed, | |
"raw_response": { | |
"model": self.model_id, | |
"content": content_out, | |
"parsed": parsed, | |
"image_count": len(image_bytes_list), | |
}, | |
"description": description, | |
"analysis": analysis, | |
"recommended_actions": recommended_actions, | |
} | |
# --- Generic wrapper for easy dynamic registration --- | |
class ProvidersGenericVLMService(HuggingFaceService): | |
""" | |
Generic wrapper so you can register ANY Providers VLM by model_id from config/DB. | |
Example: | |
ProvidersGenericVLMService(None, "Qwen/Qwen2.5-VL-32B-Instruct", "QWEN2_5_VL_32B") | |
""" | |
def __init__(self, api_key: str, model_id: str, public_name: str | None = None): | |
providers_url = "https://api-inference.huggingface.co/providers/openai" | |
super().__init__( | |
api_key=api_key, | |
model_id=model_id, | |
providers_url=providers_url, | |
public_name=public_name or model_id.replace("/", "_").upper(), | |
) | |
if not self.api_key or not self.model_id: | |
self.is_available = False | |
self.status = ServiceStatus.DEGRADED | |