Spaces:
Runtime error
Runtime error
File size: 6,522 Bytes
c19ca42 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
import base64
from typing import Optional, Tuple
import cv2
import numpy as np
import navi
from nodes.base_output import BaseOutput, OutputKind
from ...impl.image_utils import normalize, to_uint8
from ...impl.pil_utils import InterpolationMethod, resize
from ...utils.format import format_image_with_channels
from ...utils.utils import get_h_w_c, round_half_up
class NumPyOutput(BaseOutput):
"""Output a NumPy array"""
def __init__(
self,
output_type: navi.ExpressionJson,
label: str,
kind: OutputKind = "generic",
has_handle: bool = True,
):
super().__init__(
output_type,
label,
kind=kind,
has_handle=has_handle,
associated_type=np.ndarray,
)
def enforce(self, value) -> np.ndarray:
assert isinstance(value, np.ndarray)
return value
def AudioOutput():
"""Output a 1D Audio NumPy array"""
return NumPyOutput("Audio", "Audio")
class ImageOutput(NumPyOutput):
def __init__(
self,
label: str = "Image",
image_type: navi.ExpressionJson = "Image",
kind: OutputKind = "image",
has_handle: bool = True,
channels: Optional[int] = None,
assume_normalized: bool = False,
):
super().__init__(
navi.intersect(image_type, navi.Image(channels=channels)),
label,
kind=kind,
has_handle=has_handle,
)
self.channels: Optional[int] = channels
self.assume_normalized: bool = assume_normalized
def get_broadcast_data(self, value: np.ndarray):
h, w, c = get_h_w_c(value)
return {
"height": h,
"width": w,
"channels": c,
}
def get_broadcast_type(self, value: np.ndarray):
h, w, c = get_h_w_c(value)
return navi.Image(width=w, height=h, channels=c)
def enforce(self, value) -> np.ndarray:
assert isinstance(value, np.ndarray)
h, w, c = get_h_w_c(value)
if h == 0 or w == 0:
raise ValueError(
f"The output {self.label} returned an empty image (w={w} h={h})."
f" This is a bug in the implementation of the node."
f" Please report this bug."
)
if self.channels is not None and c != self.channels:
expected = format_image_with_channels([self.channels])
actual = format_image_with_channels([c])
raise ValueError(
f"The output {self.label} was supposed to return {expected} but actually returned {actual}."
f" This is a bug in the implementation of the node."
f" Please report this bug."
)
# flatting 3D single-channel images to 2D
if c == 1 and value.ndim == 3:
value = value[:, :, 0]
if not self.assume_normalized:
value = normalize(value)
assert value.dtype == np.float32, (
f"The output {self.label} did not return a normalized image."
f" This is a bug in the implementation of the node."
f" Please report this bug."
f"\n\nTo the author of this node: Either use `normalize` or remove `assume_normalized=True` from this output."
)
# make image readonly
value.setflags(write=False)
return value
def preview_encode(
img: np.ndarray,
target_size: int = 512,
grace: float = 1.2,
lossless: bool = False,
) -> Tuple[str, np.ndarray]:
"""
resize the image, so the preview loads faster and doesn't lag the UI
512 was chosen as the default target because a 512x512 RGBA 8bit PNG is at most 1MB in size
"""
h, w, c = get_h_w_c(img)
max_size = target_size * grace
if w > max_size or h > max_size:
f = max(w / target_size, h / target_size)
t = (max(1, round_half_up(w / f)), max(1, round_half_up(h / f)))
if c == 4:
# https://github.com/chaiNNer-org/chaiNNer/issues/1321
img = resize(img, t, InterpolationMethod.BOX)
else:
img = cv2.resize(img, t, interpolation=cv2.INTER_AREA)
image_format = "png" if c > 3 or lossless else "jpg"
_, encoded_img = cv2.imencode(f".{image_format}", to_uint8(img, normalized=True)) # type: ignore
base64_img = base64.b64encode(encoded_img).decode("utf8") # type: ignore
return f"data:image/{image_format};base64,{base64_img}", img
class LargeImageOutput(ImageOutput):
def __init__(
self,
label: str = "Image",
image_type: navi.ExpressionJson = "Image",
kind: OutputKind = "large-image",
has_handle: bool = True,
assume_normalized: bool = False,
):
super().__init__(
label,
image_type,
kind=kind,
has_handle=has_handle,
assume_normalized=assume_normalized,
)
def get_broadcast_data(self, value: np.ndarray):
img = value
h, w, c = get_h_w_c(img)
image_size = max(h, w)
preview_sizes = [2048, 1024, 512, 256]
preview_size_grace = 1.2
start_index = len(preview_sizes) - 1
for i, size in enumerate(preview_sizes):
if size <= image_size and image_size <= size * preview_size_grace:
# this preview size will perfectly fit the image
start_index = i
break
if image_size > size:
# the image size is larger than the preview size, so try to pick the previous size
start_index = max(0, i - 1)
break
previews = []
# Encode for multiple scales. Use the preceding scale to save time encoding the smaller sizes.
last_encoded = img
for size in preview_sizes[start_index:]:
largest_preview = size == preview_sizes[start_index]
url, last_encoded = preview_encode(
last_encoded,
target_size=size,
grace=preview_size_grace,
lossless=largest_preview,
)
le_h, le_w, _ = get_h_w_c(last_encoded)
previews.append({"width": le_w, "height": le_h, "url": url})
return {
"previews": previews,
"height": h,
"width": w,
"channels": c,
}
def VideoOutput():
"""Output a 3D Video NumPy array"""
return NumPyOutput("Video", "Video")
|