Spaces:
Running
Running
import os | |
import cv2 | |
import shutil | |
from PIL import Image | |
import numpy as np | |
import torch | |
from torch.autograd import Variable | |
from torchvision import transforms | |
import torch.nn.functional as F | |
from flask import Flask, request, jsonify, render_template, send_from_directory | |
import warnings | |
warnings.filterwarnings("ignore") | |
app = Flask(__name__) | |
# 一時ファイル保存用ディレクトリ | |
UPLOAD_FOLDER = 'uploads' | |
RESULT_FOLDER = 'results' | |
EXAMPLES_FOLDER = 'examples' | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
os.makedirs(RESULT_FOLDER, exist_ok=True) | |
os.makedirs(EXAMPLES_FOLDER, exist_ok=True) | |
# モデル関連のインポートと初期化 | |
def initialize_model(): | |
# Clean up previous installations | |
if os.path.exists("DIS"): | |
shutil.rmtree("DIS") | |
if os.path.exists("saved_models"): | |
shutil.rmtree("saved_models") | |
# Clone repository and setup model | |
os.system("git clone https://github.com/xuebinqin/DIS") | |
os.system("mv DIS/IS-Net/* .") | |
# Import after setup | |
from data_loader_cache import normalize, im_reader, im_preprocess | |
from models import ISNetDIS | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
# Setup model directories | |
if not os.path.exists("saved_models"): | |
os.mkdir("saved_models") | |
os.system("mv isnet.pth saved_models/") | |
# Set Parameters | |
hypar = { | |
"model_path": "./saved_models", | |
"restore_model": "isnet.pth", | |
"interm_sup": False, | |
"model_digit": "full", | |
"seed": 0, | |
"cache_size": [1024, 1024], | |
"input_size": [1024, 1024], | |
"crop_size": [1024, 1024], | |
"model": ISNetDIS() | |
} | |
# Build Model | |
net = build_model(hypar, device) | |
return net, hypar, device | |
class GOSNormalize(object): | |
def __init__(self, mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]): | |
self.mean = mean | |
self.std = std | |
def __call__(self, image): | |
image = normalize(image, self.mean, self.std) | |
return image | |
transform = transforms.Compose([GOSNormalize([0.5,0.5,0.5],[1.0,1.0,1.0])]) | |
def load_image(im_path, hypar): | |
im = im_reader(im_path) | |
im, im_shp = im_preprocess(im, hypar["cache_size"]) | |
im = torch.divide(im, 255.0) | |
shape = torch.from_numpy(np.array(im_shp)) | |
return transform(im).unsqueeze(0), shape.unsqueeze(0) | |
def build_model(hypar, device): | |
net = hypar["model"] | |
if hypar["model_digit"] == "half": | |
net.half() | |
for layer in net.modules(): | |
if isinstance(layer, nn.BatchNorm2d): | |
layer.float() | |
net.to(device) | |
if hypar["restore_model"] != "": | |
net.load_state_dict(torch.load(hypar["model_path"]+"/"+hypar["restore_model"], map_location=device)) | |
net.to(device) | |
net.eval() | |
return net | |
def predict(net, inputs_val, shapes_val, hypar, device): | |
net.eval() | |
if hypar["model_digit"] == "full": | |
inputs_val = inputs_val.type(torch.FloatTensor) | |
else: | |
inputs_val = inputs_val.type(torch.HalfTensor) | |
inputs_val_v = Variable(inputs_val, requires_grad=False).to(device) | |
ds_val = net(inputs_val_v)[0] | |
pred_val = ds_val[0][0,:,:,:] | |
pred_val = torch.squeeze(F.upsample(torch.unsqueeze(pred_val,0),(shapes_val[0][0],shapes_val[0][1]),mode='bilinear')) | |
ma = torch.max(pred_val) | |
mi = torch.min(pred_val) | |
pred_val = (pred_val-mi)/(ma-mi) | |
if device == 'cuda': torch.cuda.empty_cache() | |
return (pred_val.detach().cpu().numpy()*255).astype(np.uint8) | |
def index(): | |
return render_template('index.html') | |
def serve_example(filename): | |
# サンプル画像がなければダウンロード | |
example_path = os.path.join(EXAMPLES_FOLDER, filename) | |
if not os.path.exists(example_path): | |
if filename == 'robot.png': | |
os.system(f"wget https://raw.githubusercontent.com/xuebinqin/DIS/main/IS-Net/robot.png -O {example_path}") | |
elif filename == 'ship.png': | |
os.system(f"wget https://raw.githubusercontent.com/xuebinqin/DIS/main/IS-Net/ship.png -O {example_path}") | |
return send_from_directory(EXAMPLES_FOLDER, filename) | |
def process_image(): | |
if 'image' not in request.files: | |
return jsonify({"error": "No image provided"}), 400 | |
file = request.files['image'] | |
if file.filename == '': | |
return jsonify({"error": "No selected file"}), 400 | |
# 毎回モデルを初期化 | |
net, hypar, device = initialize_model() | |
# ファイルを保存 | |
upload_path = os.path.join(UPLOAD_FOLDER, file.filename) | |
file.save(upload_path) | |
try: | |
# 画像処理 | |
image_tensor, orig_size = load_image(upload_path, hypar) | |
mask = predict(net, image_tensor, orig_size, hypar, device) | |
# 結果を保存 | |
original_filename = os.path.splitext(file.filename)[0] | |
result_rgba_path = os.path.join(RESULT_FOLDER, f"{original_filename}_rgba.png") | |
result_mask_path = os.path.join(RESULT_FOLDER, f"{original_filename}_mask.png") | |
pil_mask = Image.fromarray(mask).convert('L') | |
im_rgb = Image.open(upload_path).convert("RGB") | |
im_rgba = im_rgb.copy() | |
im_rgba.putalpha(pil_mask) | |
im_rgba.save(result_rgba_path) | |
pil_mask.save(result_mask_path) | |
# 結果のURLを返す | |
return jsonify({ | |
"original": f"/{UPLOAD_FOLDER}/{file.filename}", | |
"rgba": f"/{RESULT_FOLDER}/{original_filename}_rgba.png", | |
"mask": f"/{RESULT_FOLDER}/{original_filename}_mask.png", | |
"filename": file.filename | |
}) | |
except Exception as e: | |
return jsonify({"error": str(e)}), 500 | |
def serve_upload(filename): | |
return send_from_directory(UPLOAD_FOLDER, filename) | |
def serve_result(filename): | |
return send_from_directory(RESULT_FOLDER, filename) | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860, debug=True) |