Spaces:
Build error
Build error
import os | |
import shutil | |
import random | |
from pathlib import Path | |
from dotenv import load_dotenv | |
from tqdm import tqdm | |
from .config import Config | |
load_dotenv() | |
SOURCE_PATHS = os.getenv('SOURCE_PATH') | |
if not SOURCE_PATHS: | |
raise ValueError("SOURCE_PATH not set") | |
# Split by comma and strip whitespace | |
source_paths = [Path(p.strip()) for p in SOURCE_PATHS.split(',')] | |
images_dir = Path(f'{Config.current_path}/images') | |
dataset_dir = Path(f'{Config.current_path}/dataset') | |
image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff', '.webp'} | |
label_exts = {'.txt'} | |
# Copy images from all source paths with tqdm progress | |
for source_path in source_paths: | |
if not source_path.exists(): | |
print(f"Warning: source path {source_path} does not exist, skipping.") | |
continue | |
# Count total image files first for progress bar | |
total_files = 0 | |
for root, dirs, files in os.walk(source_path): | |
total_files += sum(1 for f in files if Path(f).suffix.lower() in image_exts) | |
with tqdm(total=total_files, desc=f"Copying images from {source_path}", unit="img") as pbar: | |
for root, dirs, files in os.walk(source_path): | |
root_path = Path(root) | |
if root_path == source_path: | |
prefix = 'root' | |
else: | |
rel_path = root_path.relative_to(source_path) | |
prefix = '_'.join(rel_path.parts) | |
for file in files: | |
if Path(file).suffix.lower() in image_exts: | |
src_file = root_path / file | |
dst_file = images_dir / f"{prefix}_{file}" | |
shutil.copy2(src_file, dst_file) | |
pbar.update(1) | |
# Delete old dataset if exists | |
if dataset_dir.exists(): | |
shutil.rmtree(dataset_dir) | |
# Create dataset folders for images and labels splits | |
for split in ['train', 'val', 'test']: | |
(dataset_dir / 'images' / split).mkdir(parents=True, exist_ok=True) | |
(dataset_dir / 'labels' / split).mkdir(parents=True, exist_ok=True) | |
# List all images in images_dir | |
all_images = [f for f in images_dir.iterdir() if f.suffix.lower() in image_exts] | |
# Shuffle and split (80% train, 10% val, 10% test) | |
random.seed(42) | |
random.shuffle(all_images) | |
n = len(all_images) | |
train_end = int(0.8 * n) | |
val_end = train_end + int(0.1 * n) | |
splits = { | |
'train': all_images[:train_end], | |
'val': all_images[train_end:val_end], | |
'test': all_images[val_end:] | |
} | |
label_src_dir = Path(f'{Config.current_path}/image_labels') | |
# Move/copy images and labels to their split folders with tqdm | |
for split, files in splits.items(): | |
print(f"Processing split '{split}' with {len(files)} images...") | |
for img_path in tqdm(files, desc=f"Copying {split}", unit="img"): | |
# Copy image | |
dst_img_path = dataset_dir / 'images' / split / img_path.name | |
shutil.copy2(img_path, dst_img_path) | |
# Copy label if exists | |
stem = img_path.stem | |
for ext in label_exts: | |
label_file = label_src_dir / f"{stem}{ext}" | |
if label_file.exists(): | |
dst_label_path = dataset_dir / 'labels' / split / label_file.name | |
shutil.copy2(label_file, dst_label_path) | |
break | |
print("Done!") | |