Spaces:
Running
Running
"""Tools for starting and stopping Docker containers on Kubernetes. | |
A test setup for this feature: | |
```bash | |
# Start minikube with GPU support: | |
minikube start --driver docker --container-runtime docker --gpus all | |
# Make the services accessible: | |
minikube tunnel | |
``` | |
Use `k8s.needs()` to declare a Kubernetes dependency for an operation. For example: | |
```python | |
@op("Ask LLM", slow=True) | |
@k8s.needs( | |
name="vllm-for-ask-llm-op", | |
image="vllm/vllm-openai:latest", | |
port=8000, | |
args=["--model", "google/gemma-3-1b-it"], | |
health_probe="/health", | |
forward_env=["HUGGING_FACE_HUB_TOKEN"], | |
storage_path="/root/.cache/huggingface", | |
storage_size="10Gi", | |
) | |
def ask_llm(df: pd.DataFrame, *, question: ops.LongStr): | |
ip = k8s.get_ip("vllm-for-ask-llm-op") | |
client = openai.OpenAI(api_key="EMPTY", base_url=f"http://{ip}/v1") | |
# ... | |
``` | |
""" | |
import functools | |
import os | |
import queue | |
import threading | |
import time | |
import httpx | |
from kubernetes import client, config | |
from kubernetes.client.rest import ApiException | |
def _run( | |
*, | |
name, | |
image, | |
port, | |
namespace, | |
storage_size, | |
storage_path, | |
health_probe, | |
forward_env, | |
**kwargs, | |
): | |
print(f"Starting {name} in namespace {namespace}...") | |
volume_mounts = [] | |
volumes = [] | |
if storage_size: | |
pvc_name = f"{name}-data-volume" | |
if not _pvc_exists(pvc_name, namespace): | |
_create_pvc(pvc_name, size=storage_size, namespace=namespace) | |
volume_mounts.append( | |
client.V1VolumeMount( | |
name=pvc_name, | |
mount_path=storage_path, | |
) | |
) | |
volumes.append( | |
client.V1Volume( | |
name=pvc_name, | |
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( | |
claim_name=pvc_name, | |
), | |
) | |
) | |
# Forward local environment variables to the container. | |
kwargs.setdefault("env", []).extend( | |
[{"name": name, "value": os.environ[name]} for name in forward_env] | |
) | |
container = client.V1Container( | |
name=name, | |
image=image, | |
ports=[client.V1ContainerPort(container_port=port)], | |
volume_mounts=volume_mounts, | |
**kwargs, | |
) | |
if health_probe: | |
container.readiness_probe = client.V1Probe( | |
http_get=client.V1HTTPGetAction(path=health_probe, port=port, scheme="HTTP"), | |
) | |
deployment = client.V1Deployment( | |
metadata=client.V1ObjectMeta(name=name), | |
spec=client.V1DeploymentSpec( | |
replicas=1, | |
selector=client.V1LabelSelector(match_labels={"app": name}), | |
template=client.V1PodTemplateSpec( | |
metadata=client.V1ObjectMeta(labels={"app": name}), | |
spec=client.V1PodSpec( | |
volumes=volumes, | |
containers=[container], | |
), | |
), | |
), | |
) | |
apps_v1 = client.AppsV1Api() | |
apps_v1.create_namespaced_deployment(namespace=namespace, body=deployment) | |
service_name = f"{name}-service" | |
service = client.V1Service( | |
metadata=client.V1ObjectMeta(name=service_name, labels={"app": name}), | |
spec=client.V1ServiceSpec( | |
selector={"app": name}, | |
ports=[client.V1ServicePort(protocol="TCP", port=80, target_port=port)], | |
type="LoadBalancer", | |
), | |
) | |
core_v1 = client.CoreV1Api() | |
core_v1.create_namespaced_service(namespace=namespace, body=service) | |
def _stop(name, namespace="default"): | |
print(f"Stopping {name} in namespace {namespace}...") | |
apps_v1 = client.AppsV1Api() | |
apps_v1.delete_namespaced_deployment(name, namespace) | |
service_name = f"{name}-service" | |
core_v1 = client.CoreV1Api() | |
core_v1.delete_namespaced_service(service_name, namespace) | |
def get_ip(name: str, namespace: str = "default", timeout: int = 3600, interval: int = 1) -> str: | |
"""Look up the IP address where the operation can access the service.""" | |
service_name = f"{name}-service" | |
core_v1 = client.CoreV1Api() | |
end_time = time.time() + timeout | |
while time.time() < end_time: | |
try: | |
svc = core_v1.read_namespaced_service(service_name, namespace) | |
ingress = svc.status.load_balancer.ingress | |
if ingress: | |
ip = ingress[0].ip or ingress[0].hostname | |
if ip: | |
if _can_connect(ip): | |
return ip | |
except ApiException as e: | |
if e.status != 404: | |
raise | |
time.sleep(interval) | |
raise TimeoutError(f"Timed out waiting for external IP of service '{service_name}'") | |
def _can_connect(ip: str) -> bool: | |
try: | |
httpx.get(f"http://{ip}/") | |
return True | |
except httpx.RequestError: | |
return False | |
def _is_running(name: str, namespace: str = "default") -> bool: | |
apps_v1 = client.AppsV1Api() | |
try: | |
apps_v1.read_namespaced_deployment(name, namespace) | |
return True | |
except ApiException as e: | |
if e.status == 404: | |
return False | |
else: | |
raise | |
def _stop_if_running(name, namespace="default"): | |
if _is_running(name, namespace): | |
_stop(name, namespace) | |
def _create_pvc(name, size="1Gi", namespace="default"): | |
core_v1 = client.CoreV1Api() | |
pvc = client.V1PersistentVolumeClaim( | |
metadata=client.V1ObjectMeta(name=name), | |
spec=client.V1PersistentVolumeClaimSpec( | |
access_modes=["ReadWriteOnce"], | |
resources=client.V1ResourceRequirements(requests={"storage": size}), | |
), | |
) | |
core_v1.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc) | |
def _pvc_exists(name: str, namespace: str = "default") -> bool: | |
core_v1 = client.CoreV1Api() | |
try: | |
core_v1.read_namespaced_persistent_volume_claim(name=name, namespace=namespace) | |
return True | |
except ApiException as e: | |
if e.status == 404: | |
return False | |
else: | |
raise | |
def needs( | |
name: str, | |
image: str, | |
port: int, | |
args: list = None, | |
env: list = None, | |
forward_env: list = None, | |
health_probe: str = None, | |
storage_size: str = None, | |
storage_path: str = "/data", | |
namespace: str = "default", | |
): | |
"""Use this decorator to configure a microservice that the operation depends on. | |
LynxKite will manage the lifecycle of the microservice for you. | |
""" | |
def decorator(func): | |
def wrapper(*func_args, **func_kwargs): | |
config.load_kube_config() | |
_using( | |
name=name, | |
image=image, | |
port=port, | |
args=args or [], | |
env=env or [], | |
forward_env=forward_env or [], | |
health_probe=health_probe, | |
storage_size=storage_size, | |
storage_path=storage_path, | |
namespace=namespace, | |
) | |
try: | |
return func(*func_args, **func_kwargs) | |
finally: | |
_stop_using(name, namespace) | |
return wrapper | |
return decorator | |
_USER_COUNTERS = {} | |
def _using(name, **kwargs): | |
q = _USER_COUNTERS.setdefault(name, queue.Queue(-1)) | |
q.put(1) | |
try: | |
if not _is_running(name): | |
_run(name=name, **kwargs) | |
except Exception as e: | |
q.get() | |
raise e | |
def _stop_using(name, namespace): | |
q = _USER_COUNTERS[name] | |
q.get() | |
if q.empty(): | |
_stop_later(name, namespace) | |
def _stop_later(name, namespace): | |
q = _USER_COUNTERS[name] | |
def stop(): | |
time.sleep(6000) | |
if q.empty(): | |
# Nobody started the service in the meantime. | |
_stop(name, namespace) | |
t = threading.Thread(target=stop, daemon=True) | |
t.start() | |