Spaces:
Configuration error
Configuration error
import os | |
import json | |
import websocket | |
import uuid | |
import random | |
import httpx | |
import time | |
import numpy as np | |
import ldm_patched.modules.model_management as model_management | |
from io import BytesIO | |
from PIL import Image | |
from . import utils | |
def upload_mask(mask): | |
with BytesIO() as output: | |
mask.save(output) | |
output.seek(0) | |
files = {'mask': ('mask.jpg', output)} | |
data = {'overwrite': 'true', 'type': 'example_type'} | |
response = httpx.post("http://{}/upload/mask".format(server_address), files=files, data=data) | |
return response.json() | |
def queue_prompt(prompt): | |
p = {"prompt": prompt, "client_id": client_id} | |
data = json.dumps(p).encode('utf-8') | |
try: | |
with httpx.Client() as client: | |
response = client.post("http://{}/prompt".format(server_address), data=data) | |
return json.loads(response.read()) | |
except httpx.RequestError as e: | |
print(f"httpx.RequestError: {e}") | |
return None | |
def get_image(filename, subfolder, folder_type): | |
params = httpx.QueryParams({ | |
"filename": filename, | |
"subfolder": subfolder, | |
"type": folder_type | |
}) | |
with httpx.Client() as client: | |
response = client.get(f"http://{server_address}/view", params=params) | |
return response.read() | |
def get_history(prompt_id): | |
with httpx.Client() as client: | |
response = client.get("http://{}/history/{}".format(server_address, prompt_id)) | |
return json.loads(response.read()) | |
def get_images(ws, prompt, callback=None): | |
prompt_id = queue_prompt(prompt)['prompt_id'] | |
print('[ComfyClient] Request and get ComfyTask_id:{}'.format(prompt_id)) | |
output_images = {} | |
current_node = '' | |
last_node = None | |
preview_image = [] | |
last_step = None | |
current_step = None | |
current_total_steps = None | |
while True: | |
model_management.throw_exception_if_processing_interrupted() | |
try: | |
out = ws.recv() | |
except ConnectionResetError as e: | |
print(f'[ComfyClient] The connect was exception, restart and try again: {e}') | |
ws = websocket.WebSocket() | |
ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id)) | |
out = ws.recv() | |
if isinstance(out, str): | |
message = json.loads(out) | |
current_type = message['type'] | |
#print(f'current_message={message}') | |
if message['type'] == 'executing': | |
data = message['data'] | |
if data['node'] is None and data['prompt_id'] == prompt_id: | |
break | |
else: | |
current_node = data['node'] | |
elif message['type'] == 'progress': | |
current_step = message["data"]["value"] | |
current_total_steps = message["data"]["max"] | |
else: | |
if current_type == 'progress': | |
if prompt[current_node]['class_type'] in ['KSampler', 'SamplerCustomAdvanced', 'TiledKSampler'] and callback is not None: | |
if current_step == last_step: | |
preview_image.append(out[8:]) | |
else: | |
if last_step is not None: | |
callback(last_step, current_total_steps, Image.open(BytesIO(preview_image[0]))) | |
preview_image = [] | |
preview_image.append(out[8:]) | |
last_step = current_step | |
if prompt[current_node]['class_type'] == 'SaveImageWebsocket': | |
images_output = output_images.get(prompt[current_node]['_meta']['title'], []) | |
images_output.append(out[8:]) | |
output_images[prompt[current_node]['_meta']['title']] = images_output[0] | |
continue | |
output_images = {k: np.array(Image.open(BytesIO(v))) for k, v in output_images.items()} | |
print(f'[ComfyClient] The ComfyTask:{prompt_id} has finished: {len(output_images)}') | |
return output_images | |
def images_upload(images): | |
result = {} | |
if images is None: | |
return result | |
for k,np_image in images.items(): | |
pil_image = Image.fromarray(np_image) | |
with BytesIO() as output: | |
pil_image.save(output, format="PNG") | |
output.seek(0) | |
files = {'image': (f'image_{client_id}_{random.randint(1000, 9999)}.png', output)} | |
data = {'overwrite': 'true', 'type': 'input'} | |
response = httpx.post("http://{}/upload/image".format(server_address), files=files, data=data) | |
result.update({k: response.json()["name"]}) | |
print(f'[ComfyClient] The ComfyTask:upload_input_images has finished: {len(result)}') | |
return result | |
def process_flow(flow_name, params, images, callback=None): | |
global ws | |
flow_file = os.path.join(WORKFLOW_DIR, f'{flow_name}_api.json') | |
if ws is None or ws.status != 101: | |
if ws is not None: | |
print(f'[ComfyClient] websocket status: {ws.status}, timeout:{ws.timeout}s.') | |
ws.close() | |
try: | |
ws = websocket.WebSocket() | |
ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id)) | |
except ConnectionRefusedError as e: | |
print(f'[ComfyClient] The connect_to_server has failed, sleep and try again: {e}') | |
time.sleep(8) | |
try: | |
ws = websocket.WebSocket() | |
ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id)) | |
except ConnectionRefusedError as e: | |
print(f'[ComfyClient] The connect_to_server has failed, restart and try again: {e}') | |
time.sleep(12) | |
ws = websocket.WebSocket() | |
ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id)) | |
images_map = images_upload(images) | |
params.update_params(images_map) | |
with open(flow_file, 'r', encoding="utf-8") as workflow_api_file: | |
flowdata = json.load(workflow_api_file) | |
print(f'[ComfyClient] Ready ComfyTask to process: workflow={flow_name}') | |
for k,v in params.params.items(): | |
print(f' {k} = {v}') | |
try: | |
prompt_str = params.convert2comfy(flowdata) | |
if not utils.echo_off: | |
print(f'[ComfyClient] ComfyTask prompt: {prompt_str}') | |
images = get_images(ws, prompt_str, callback=callback) | |
except websocket.WebSocketException as e: | |
print(f'[ComfyClient] The connection has been closed, restart and try again: {e}') | |
ws = None | |
images_keys = sorted(images.keys(), reverse=True) | |
imgs = [images[key] for key in images_keys] | |
return imgs | |
def interrupt(): | |
try: | |
with httpx.Client() as client: | |
response = client.post("http://{}/interrupt".format(server_address)) | |
return | |
except httpx.RequestError as e: | |
print(f"httpx.RequestError: {e}") | |
return | |
def free(all=False): | |
p = {"unload_models": all==True, "free_memory": True} | |
data = json.dumps(p).encode('utf-8') | |
try: | |
with httpx.Client() as client: | |
response = client.post("http://{}/free".format(server_address), data=data) | |
return | |
except httpx.RequestError as e: | |
print(f"httpx.RequestError: {e}") | |
return | |
WORKFLOW_DIR = 'workflows' | |
COMFYUI_ENDPOINT_IP = '127.0.0.1' | |
COMFYUI_ENDPOINT_PORT = '8187' | |
server_address = f'{COMFYUI_ENDPOINT_IP}:{COMFYUI_ENDPOINT_PORT}' | |
client_id = str(uuid.uuid4()) | |
ws = None | |