Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
make_cyclegan_dataset.py | |
Create paired datasets (setA, setB) for CycleGAN training from your dataset. | |
What it does: | |
- Walks the dataset root (e.g. ../jpeg_stage1Just0) | |
- Finds scene directories that contain both a `source/` subfolder with >= min_images | |
and an `output/` subfolder with at least one image. | |
- For each scene: selects the best LDR from `source/` (using metrics: clipped, coverage, | |
exposure centering, sharpness, noise), copies that chosen source image into outdir/setA/, | |
copies the scene's output image into outdir/setB/ but renames it to the chosen source filename. | |
- Writes CSV and JSON reports with metric breakdowns. | |
Usage: | |
python make_cyclegan_dataset.py --root ../jpeg_stage1Just0 --outdir ./cyclegan_data | |
Dependencies: | |
pip install opencv-python pillow numpy | |
Author: ChatGPT (opinionated: default weights favor low clipping and good coverage) | |
""" | |
import argparse | |
import os | |
from pathlib import Path | |
import json | |
import csv | |
import shutil | |
from math import fabs | |
import numpy as np | |
import cv2 | |
from PIL import Image, ExifTags | |
IMG_EXTS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".webp"} | |
# ------------------ Image / metric helpers ------------------ | |
def is_image_file(p: Path): | |
return p.suffix.lower() in IMG_EXTS and p.is_file() | |
def list_images(folder: Path): | |
if not folder.exists(): | |
return [] | |
return sorted([p for p in folder.iterdir() if is_image_file(p)]) | |
def read_image_gray(path: Path, resize_max=None): | |
"""Read color then convert to grayscale float32 [0,1]. Uses cv2.imdecode to handle weird filenames.""" | |
arr = np.fromfile(str(path), dtype=np.uint8) | |
img = cv2.imdecode(arr, cv2.IMREAD_COLOR) | |
if img is None: | |
raise IOError(f"Failed to read image {path}") | |
if resize_max: | |
h, w = img.shape[:2] | |
scale = resize_max / max(h, w) if max(h, w) > resize_max else 1.0 | |
if scale != 1.0: | |
img = cv2.resize(img, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA) | |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY).astype(np.float32) / 255.0 | |
return gray | |
def clipped_ratio(gray): | |
total = gray.size | |
high = np.count_nonzero(gray >= 0.992) | |
low = np.count_nonzero(gray <= 0.008) | |
return float(high + low) / float(total) | |
def histogram_coverage(gray, bins=256, min_frac=0.001): | |
hist, _ = np.histogram((gray * 255).astype(np.uint8), bins=bins, range=(0,255)) | |
threshold = max(1, int(min_frac * gray.size)) | |
covered = np.count_nonzero(hist >= threshold) | |
return float(covered) / float(bins) | |
def exposure_distance(gray): | |
return float(abs(float(np.mean(gray)) - 0.5)) | |
def sharpness_metric(gray): | |
lap = cv2.Laplacian((gray * 255).astype(np.uint8), cv2.CV_64F) | |
return float(np.var(lap)) | |
def noise_estimate(gray): | |
blur = cv2.GaussianBlur(gray, (3,3), 0) | |
hf = gray - blur | |
return float(np.std(hf)) | |
def minmax_normalize(vals, eps=1e-8): | |
arr = np.array(vals, dtype=np.float64) | |
mn = float(arr.min()) | |
mx = float(arr.max()) | |
if mx - mn < eps: | |
# all equal -> zeros | |
return np.zeros_like(arr) | |
return (arr - mn) / (mx - mn) | |
# ------------------ Selection & scene processing ------------------ | |
def compute_metrics_for_images(image_paths, resize_max): | |
records = [] | |
for p in image_paths: | |
try: | |
g = read_image_gray(p, resize_max=resize_max) | |
except Exception as e: | |
print(f" WARNING: cannot read {p}: {e}") | |
continue | |
rec = { | |
"path": str(p), | |
"name": p.name, | |
"clipped": clipped_ratio(g), | |
"coverage": histogram_coverage(g), | |
"exposure_dist": exposure_distance(g), | |
"sharpness": sharpness_metric(g), | |
"noise": noise_estimate(g) | |
} | |
records.append(rec) | |
return records | |
def score_records(records, weights): | |
if not records: | |
return [] | |
clipped_vals = [r["clipped"] for r in records] | |
cov_vals = [r["coverage"] for r in records] | |
exp_vals = [r["exposure_dist"] for r in records] | |
sharp_vals = [r["sharpness"] for r in records] | |
noise_vals = [r["noise"] for r in records] | |
clipped_n = minmax_normalize(clipped_vals) | |
cov_n = minmax_normalize(cov_vals) | |
exp_n = minmax_normalize(exp_vals) | |
sharp_n = minmax_normalize(sharp_vals) | |
noise_n = minmax_normalize(noise_vals) | |
scored = [] | |
for i, r in enumerate(records): | |
score = 0.0 | |
score += weights["clipped"] * (1.0 - float(clipped_n[i])) # less clipping -> better | |
score += weights["coverage"] * float(cov_n[i]) # more coverage -> better | |
score += weights["exposure"] * (1.0 - float(exp_n[i])) # closer to mid gray -> better | |
score += weights["sharpness"] * float(sharp_n[i]) # sharper -> better | |
score += weights["noise"] * (1.0 - float(noise_n[i])) # less noise -> better | |
rec = dict(r) | |
rec.update({ | |
"clipped_n": float(clipped_n[i]), | |
"coverage_n": float(cov_n[i]), | |
"exposure_n": float(exp_n[i]), | |
"sharpness_n": float(sharp_n[i]), | |
"noise_n": float(noise_n[i]), | |
"score": float(score) | |
}) | |
scored.append(rec) | |
scored_sorted = sorted(scored, key=lambda x: x["score"], reverse=True) | |
return scored_sorted | |
def find_output_image(output_folder: Path): | |
imgs = list_images(output_folder) | |
if not imgs: | |
return None | |
# Prefer file with same name as parent folder (if present), else pick largest file | |
parent_name = output_folder.parent.name | |
for p in imgs: | |
if p.stem == parent_name: | |
return p | |
# otherwise pick largest by file size (likely the main image) | |
imgs_sorted = sorted(imgs, key=lambda x: x.stat().st_size, reverse=True) | |
return imgs_sorted[0] | |
# ------------------ Main procedure ------------------ | |
def make_dataset(root: Path, outdir: Path, min_images: int, | |
resize_max: int, weights: dict, copy_method="copy"): | |
scenes_found = 0 | |
results = [] | |
setA = outdir / "setA" | |
setB = outdir / "setB" | |
os.makedirs(setA, exist_ok=True) | |
os.makedirs(setB, exist_ok=True) | |
# Walk the tree and find directories that contain both source/ and output/ | |
for dirpath, dirnames, filenames in os.walk(root): | |
d = Path(dirpath) | |
src_dir = d / "source" | |
out_dir = d / "output" | |
if not src_dir.exists() or not out_dir.exists(): | |
continue | |
src_imgs = list_images(src_dir) | |
if len(src_imgs) < min_images: | |
# skip small scenes | |
continue | |
scenes_found += 1 | |
print(f"[{scenes_found}] Scene: {d} ({len(src_imgs)} source images)") | |
# compute metrics and choose best | |
records = compute_metrics_for_images(src_imgs, resize_max=resize_max) | |
if not records: | |
print(" No readable source images, skipping.") | |
continue | |
scored = score_records(records, weights) | |
chosen = scored[0] | |
chosen_path = Path(chosen["path"]) | |
chosen_name = chosen_path.name # used for setA filename (and setB target name) | |
# find output image for this scene | |
out_img = find_output_image(out_dir) | |
if out_img is None: | |
print(f" WARNING: no output image found in {out_dir}; skipping copying pair.") | |
out_img_path = None | |
else: | |
out_img_path = out_img | |
# destination paths | |
destA = setA / chosen_name | |
destB = setB / chosen_name | |
# copy or symlink | |
try: | |
if copy_method == "symlink": | |
if destA.exists(): | |
destA.unlink() | |
os.symlink(os.path.abspath(chosen_path), destA) | |
else: | |
shutil.copy2(chosen_path, destA) | |
except Exception as e: | |
print(f" ERROR copying source -> {destA}: {e}") | |
if out_img_path is not None: | |
try: | |
if copy_method == "symlink": | |
if destB.exists(): | |
destB.unlink() | |
os.symlink(os.path.abspath(out_img_path), destB) | |
else: | |
shutil.copy2(out_img_path, destB) | |
except Exception as e: | |
print(f" ERROR copying output -> {destB}: {e}") | |
# record result | |
result = { | |
"scene_dir": str(d), | |
"source_dir": str(src_dir), | |
"output_dir": str(out_dir), | |
"chosen_source_path": str(chosen_path), | |
"chosen_source_name": chosen_name, | |
"chosen_score": chosen["score"], | |
"metrics": { | |
"clipped": chosen["clipped"], | |
"coverage": chosen["coverage"], | |
"exposure_dist": chosen["exposure_dist"], | |
"sharpness": chosen["sharpness"], | |
"noise": chosen["noise"], | |
"clipped_n": chosen["clipped_n"], | |
"coverage_n": chosen["coverage_n"], | |
"exposure_n": chosen["exposure_n"], | |
"sharpness_n": chosen["sharpness_n"], | |
"noise_n": chosen["noise_n"], | |
}, | |
"output_image_used": str(out_img_path) if out_img_path is not None else None, | |
"destA": str(destA), | |
"destB": str(destB) if out_img_path is not None else None | |
} | |
results.append(result) | |
# print top 3 for quick audit | |
print(" Top candidates:") | |
for c in scored[:3]: | |
print(f" {c['score']:.4f} clipped={c['clipped']:.4f} cov={c['coverage']:.4f} expd={c['exposure_dist']:.4f} sharp={c['sharpness']:.1f} noise={c['noise']:.5f} -> {Path(c['path']).name}") | |
# write reports | |
out_csv = outdir / "paired_selection.csv" | |
out_json = outdir / "paired_selection.json" | |
with open(out_json, "w", encoding="utf-8") as jf: | |
json.dump(results, jf, indent=2) | |
with open(out_csv, "w", newline="", encoding="utf-8") as cf: | |
writer = csv.writer(cf) | |
header = ["scene_dir", "source_dir", "output_dir", "chosen_source_name", "chosen_source_path", | |
"chosen_score", "output_image_used", "destA", "destB"] | |
writer.writerow(header) | |
for r in results: | |
writer.writerow([r.get(h, "") for h in header]) | |
print(f"\nDone. Scenes processed: {scenes_found}") | |
print(f"Paired data saved to:\n {setA}\n {setB}") | |
print(f"Reports: {out_csv} , {out_json}") | |
return results | |
# ------------------ CLI ------------------ | |
def parse_weights(s): | |
parts = [float(x.strip()) for x in s.split(",")] | |
if len(parts) != 5: | |
raise argparse.ArgumentTypeError("weights must be 5 comma-separated numbers") | |
ssum = sum(parts) | |
if ssum == 0: | |
raise argparse.ArgumentTypeError("weights sum must be > 0") | |
return [p / ssum for p in parts] | |
def main(): | |
ap = argparse.ArgumentParser(description="Make paired CycleGAN dataset from your LDR/HDR scene layout.") | |
ap.add_argument("--root", "-r", required=True, help="Root of dataset (e.g. ../jpeg_stage1Just0)") | |
ap.add_argument("--outdir", "-o", default="./cyclegan_data", help="Output folder for paired dataset") | |
ap.add_argument("--min_images", type=int, default=2, help="Minimum images in source/ to consider scene") | |
ap.add_argument("--resize_max", type=int, default=1024, help="Resize longest side for metric calc (speeds up)") | |
ap.add_argument("--weights", type=parse_weights, default="0.35,0.25,0.15,0.15,0.10", | |
help="5 weights: clipped,coverage,exposure,sharpness,noise (will be normalized)") | |
ap.add_argument("--copy_method", choices=["copy", "symlink"], default="copy", | |
help="copy files or create symlinks (symlink saves disk space)") | |
args = ap.parse_args() | |
root = Path(args.root).expanduser().resolve() | |
outdir = Path(args.outdir).expanduser().resolve() | |
w = args.weights if isinstance(args.weights, list) else args.weights # parse_weights returns list | |
weights = { | |
"clipped": w[0], | |
"coverage": w[1], | |
"exposure": w[2], | |
"sharpness": w[3], | |
"noise": w[4] | |
} | |
print("Using weights:", weights) | |
outdir.mkdir(parents=True, exist_ok=True) | |
make_dataset(root, outdir, min_images=args.min_images, | |
resize_max=args.resize_max, weights=weights, copy_method=args.copy_method) | |
if __name__ == "__main__": | |
main() | |