|
|
|
""" |
|
make_cyclegan_dataset.py |
|
|
|
Create paired datasets (setA, setB) for CycleGAN training from your dataset. |
|
|
|
What it does: |
|
- Walks the dataset root (e.g. ../jpeg_stage1Just0) |
|
- Finds scene directories that contain both a `source/` subfolder with >= min_images |
|
and an `output/` subfolder with at least one image. |
|
- For each scene: selects the best LDR from `source/` (using metrics: clipped, coverage, |
|
exposure centering, sharpness, noise), copies that chosen source image into outdir/setA/, |
|
copies the scene's output image into outdir/setB/ but renames it to the chosen source filename. |
|
- Writes CSV and JSON reports with metric breakdowns. |
|
|
|
Usage: |
|
python make_cyclegan_dataset.py --root ../jpeg_stage1Just0 --outdir ./cyclegan_data |
|
|
|
Dependencies: |
|
pip install opencv-python pillow numpy |
|
|
|
Author: ChatGPT (opinionated: default weights favor low clipping and good coverage) |
|
""" |
|
|
|
import argparse |
|
import os |
|
from pathlib import Path |
|
import json |
|
import csv |
|
import shutil |
|
from math import fabs |
|
|
|
import numpy as np |
|
import cv2 |
|
from PIL import Image, ExifTags |
|
|
|
IMG_EXTS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".webp"} |
|
|
|
|
|
|
|
def is_image_file(p: Path): |
|
return p.suffix.lower() in IMG_EXTS and p.is_file() |
|
|
|
def list_images(folder: Path): |
|
if not folder.exists(): |
|
return [] |
|
return sorted([p for p in folder.iterdir() if is_image_file(p)]) |
|
|
|
def read_image_gray(path: Path, resize_max=None): |
|
"""Read color then convert to grayscale float32 [0,1]. Uses cv2.imdecode to handle weird filenames.""" |
|
arr = np.fromfile(str(path), dtype=np.uint8) |
|
img = cv2.imdecode(arr, cv2.IMREAD_COLOR) |
|
if img is None: |
|
raise IOError(f"Failed to read image {path}") |
|
if resize_max: |
|
h, w = img.shape[:2] |
|
scale = resize_max / max(h, w) if max(h, w) > resize_max else 1.0 |
|
if scale != 1.0: |
|
img = cv2.resize(img, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA) |
|
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0 |
|
return gray |
|
|
|
def clipped_ratio(gray): |
|
total = gray.size |
|
high = np.count_nonzero(gray >= 0.992) |
|
low = np.count_nonzero(gray <= 0.008) |
|
return float(high + low) / float(total) |
|
|
|
def histogram_coverage(gray, bins=256, min_frac=0.001): |
|
hist, _ = np.histogram((gray * 255).astype(np.uint8), bins=bins, range=(0,255)) |
|
threshold = max(1, int(min_frac * gray.size)) |
|
covered = np.count_nonzero(hist >= threshold) |
|
return float(covered) / float(bins) |
|
|
|
def exposure_distance(gray): |
|
return float(abs(float(np.mean(gray)) - 0.5)) |
|
|
|
def sharpness_metric(gray): |
|
lap = cv2.Laplacian((gray * 255).astype(np.uint8), cv2.CV_64F) |
|
return float(np.var(lap)) |
|
|
|
def noise_estimate(gray): |
|
blur = cv2.GaussianBlur(gray, (3,3), 0) |
|
hf = gray - blur |
|
return float(np.std(hf)) |
|
|
|
def minmax_normalize(vals, eps=1e-8): |
|
arr = np.array(vals, dtype=np.float64) |
|
mn = float(arr.min()) |
|
mx = float(arr.max()) |
|
if mx - mn < eps: |
|
|
|
return np.zeros_like(arr) |
|
return (arr - mn) / (mx - mn) |
|
|
|
|
|
|
|
def compute_metrics_for_images(image_paths, resize_max): |
|
records = [] |
|
for p in image_paths: |
|
try: |
|
g = read_image_gray(p, resize_max=resize_max) |
|
except Exception as e: |
|
print(f" WARNING: cannot read {p}: {e}") |
|
continue |
|
rec = { |
|
"path": str(p), |
|
"name": p.name, |
|
"clipped": clipped_ratio(g), |
|
"coverage": histogram_coverage(g), |
|
"exposure_dist": exposure_distance(g), |
|
"sharpness": sharpness_metric(g), |
|
"noise": noise_estimate(g) |
|
} |
|
records.append(rec) |
|
return records |
|
|
|
def score_records(records, weights): |
|
if not records: |
|
return [] |
|
clipped_vals = [r["clipped"] for r in records] |
|
cov_vals = [r["coverage"] for r in records] |
|
exp_vals = [r["exposure_dist"] for r in records] |
|
sharp_vals = [r["sharpness"] for r in records] |
|
noise_vals = [r["noise"] for r in records] |
|
|
|
clipped_n = minmax_normalize(clipped_vals) |
|
cov_n = minmax_normalize(cov_vals) |
|
exp_n = minmax_normalize(exp_vals) |
|
sharp_n = minmax_normalize(sharp_vals) |
|
noise_n = minmax_normalize(noise_vals) |
|
|
|
scored = [] |
|
for i, r in enumerate(records): |
|
score = 0.0 |
|
score += weights["clipped"] * (1.0 - float(clipped_n[i])) |
|
score += weights["coverage"] * float(cov_n[i]) |
|
score += weights["exposure"] * (1.0 - float(exp_n[i])) |
|
score += weights["sharpness"] * float(sharp_n[i]) |
|
score += weights["noise"] * (1.0 - float(noise_n[i])) |
|
|
|
rec = dict(r) |
|
rec.update({ |
|
"clipped_n": float(clipped_n[i]), |
|
"coverage_n": float(cov_n[i]), |
|
"exposure_n": float(exp_n[i]), |
|
"sharpness_n": float(sharp_n[i]), |
|
"noise_n": float(noise_n[i]), |
|
"score": float(score) |
|
}) |
|
scored.append(rec) |
|
scored_sorted = sorted(scored, key=lambda x: x["score"], reverse=True) |
|
return scored_sorted |
|
|
|
def find_output_image(output_folder: Path): |
|
imgs = list_images(output_folder) |
|
if not imgs: |
|
return None |
|
|
|
parent_name = output_folder.parent.name |
|
for p in imgs: |
|
if p.stem == parent_name: |
|
return p |
|
|
|
imgs_sorted = sorted(imgs, key=lambda x: x.stat().st_size, reverse=True) |
|
return imgs_sorted[0] |
|
|
|
|
|
|
|
def make_dataset(root: Path, outdir: Path, min_images: int, |
|
resize_max: int, weights: dict, copy_method="copy"): |
|
scenes_found = 0 |
|
results = [] |
|
setA = outdir / "setA" |
|
setB = outdir / "setB" |
|
os.makedirs(setA, exist_ok=True) |
|
os.makedirs(setB, exist_ok=True) |
|
|
|
|
|
for dirpath, dirnames, filenames in os.walk(root): |
|
d = Path(dirpath) |
|
src_dir = d / "source" |
|
out_dir = d / "output" |
|
if not src_dir.exists() or not out_dir.exists(): |
|
continue |
|
src_imgs = list_images(src_dir) |
|
if len(src_imgs) < min_images: |
|
|
|
continue |
|
|
|
scenes_found += 1 |
|
print(f"[{scenes_found}] Scene: {d} ({len(src_imgs)} source images)") |
|
|
|
|
|
records = compute_metrics_for_images(src_imgs, resize_max=resize_max) |
|
if not records: |
|
print(" No readable source images, skipping.") |
|
continue |
|
scored = score_records(records, weights) |
|
chosen = scored[0] |
|
chosen_path = Path(chosen["path"]) |
|
chosen_name = chosen_path.name |
|
|
|
|
|
out_img = find_output_image(out_dir) |
|
if out_img is None: |
|
print(f" WARNING: no output image found in {out_dir}; skipping copying pair.") |
|
out_img_path = None |
|
else: |
|
out_img_path = out_img |
|
|
|
|
|
destA = setA / chosen_name |
|
destB = setB / chosen_name |
|
|
|
|
|
try: |
|
if copy_method == "symlink": |
|
if destA.exists(): |
|
destA.unlink() |
|
os.symlink(os.path.abspath(chosen_path), destA) |
|
else: |
|
shutil.copy2(chosen_path, destA) |
|
except Exception as e: |
|
print(f" ERROR copying source -> {destA}: {e}") |
|
|
|
if out_img_path is not None: |
|
try: |
|
if copy_method == "symlink": |
|
if destB.exists(): |
|
destB.unlink() |
|
os.symlink(os.path.abspath(out_img_path), destB) |
|
else: |
|
shutil.copy2(out_img_path, destB) |
|
except Exception as e: |
|
print(f" ERROR copying output -> {destB}: {e}") |
|
|
|
|
|
result = { |
|
"scene_dir": str(d), |
|
"source_dir": str(src_dir), |
|
"output_dir": str(out_dir), |
|
"chosen_source_path": str(chosen_path), |
|
"chosen_source_name": chosen_name, |
|
"chosen_score": chosen["score"], |
|
"metrics": { |
|
"clipped": chosen["clipped"], |
|
"coverage": chosen["coverage"], |
|
"exposure_dist": chosen["exposure_dist"], |
|
"sharpness": chosen["sharpness"], |
|
"noise": chosen["noise"], |
|
"clipped_n": chosen["clipped_n"], |
|
"coverage_n": chosen["coverage_n"], |
|
"exposure_n": chosen["exposure_n"], |
|
"sharpness_n": chosen["sharpness_n"], |
|
"noise_n": chosen["noise_n"], |
|
}, |
|
"output_image_used": str(out_img_path) if out_img_path is not None else None, |
|
"destA": str(destA), |
|
"destB": str(destB) if out_img_path is not None else None |
|
} |
|
results.append(result) |
|
|
|
|
|
print(" Top candidates:") |
|
for c in scored[:3]: |
|
print(f" {c['score']:.4f} clipped={c['clipped']:.4f} cov={c['coverage']:.4f} expd={c['exposure_dist']:.4f} sharp={c['sharpness']:.1f} noise={c['noise']:.5f} -> {Path(c['path']).name}") |
|
|
|
|
|
out_csv = outdir / "paired_selection.csv" |
|
out_json = outdir / "paired_selection.json" |
|
with open(out_json, "w", encoding="utf-8") as jf: |
|
json.dump(results, jf, indent=2) |
|
with open(out_csv, "w", newline="", encoding="utf-8") as cf: |
|
writer = csv.writer(cf) |
|
header = ["scene_dir", "source_dir", "output_dir", "chosen_source_name", "chosen_source_path", |
|
"chosen_score", "output_image_used", "destA", "destB"] |
|
writer.writerow(header) |
|
for r in results: |
|
writer.writerow([r.get(h, "") for h in header]) |
|
|
|
print(f"\nDone. Scenes processed: {scenes_found}") |
|
print(f"Paired data saved to:\n {setA}\n {setB}") |
|
print(f"Reports: {out_csv} , {out_json}") |
|
return results |
|
|
|
|
|
|
|
def parse_weights(s): |
|
parts = [float(x.strip()) for x in s.split(",")] |
|
if len(parts) != 5: |
|
raise argparse.ArgumentTypeError("weights must be 5 comma-separated numbers") |
|
ssum = sum(parts) |
|
if ssum == 0: |
|
raise argparse.ArgumentTypeError("weights sum must be > 0") |
|
return [p / ssum for p in parts] |
|
|
|
def main(): |
|
ap = argparse.ArgumentParser(description="Make paired CycleGAN dataset from your LDR/HDR scene layout.") |
|
ap.add_argument("--root", "-r", required=True, help="Root of dataset (e.g. ../jpeg_stage1Just0)") |
|
ap.add_argument("--outdir", "-o", default="./cyclegan_data", help="Output folder for paired dataset") |
|
ap.add_argument("--min_images", type=int, default=2, help="Minimum images in source/ to consider scene") |
|
ap.add_argument("--resize_max", type=int, default=1024, help="Resize longest side for metric calc (speeds up)") |
|
ap.add_argument("--weights", type=parse_weights, default="0.35,0.25,0.15,0.15,0.10", |
|
help="5 weights: clipped,coverage,exposure,sharpness,noise (will be normalized)") |
|
ap.add_argument("--copy_method", choices=["copy", "symlink"], default="copy", |
|
help="copy files or create symlinks (symlink saves disk space)") |
|
args = ap.parse_args() |
|
|
|
root = Path(args.root).expanduser().resolve() |
|
outdir = Path(args.outdir).expanduser().resolve() |
|
w = args.weights if isinstance(args.weights, list) else args.weights |
|
weights = { |
|
"clipped": w[0], |
|
"coverage": w[1], |
|
"exposure": w[2], |
|
"sharpness": w[3], |
|
"noise": w[4] |
|
} |
|
print("Using weights:", weights) |
|
outdir.mkdir(parents=True, exist_ok=True) |
|
|
|
make_dataset(root, outdir, min_images=args.min_images, |
|
resize_max=args.resize_max, weights=weights, copy_method=args.copy_method) |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|