jebin2's picture
inf all
fbf6388
import os
import shutil
import random
from pathlib import Path
from dotenv import load_dotenv
from tqdm import tqdm
from .config import Config
load_dotenv()
SOURCE_PATHS = os.getenv('SOURCE_PATH')
if not SOURCE_PATHS:
raise ValueError("SOURCE_PATH not set")
# Split by comma and strip whitespace
source_paths = [Path(p.strip()) for p in SOURCE_PATHS.split(',')]
images_dir = Path(f'{Config.current_path}/images')
dataset_dir = Path(f'{Config.current_path}/dataset')
image_exts = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff', '.webp'}
label_exts = {'.txt'}
# Copy images from all source paths with tqdm progress
for source_path in source_paths:
if not source_path.exists():
print(f"Warning: source path {source_path} does not exist, skipping.")
continue
# Count total image files first for progress bar
total_files = 0
for root, dirs, files in os.walk(source_path):
total_files += sum(1 for f in files if Path(f).suffix.lower() in image_exts)
with tqdm(total=total_files, desc=f"Copying images from {source_path}", unit="img") as pbar:
for root, dirs, files in os.walk(source_path):
root_path = Path(root)
if root_path == source_path:
prefix = 'root'
else:
rel_path = root_path.relative_to(source_path)
prefix = '_'.join(rel_path.parts)
for file in files:
if Path(file).suffix.lower() in image_exts:
src_file = root_path / file
dst_file = images_dir / f"{prefix}_{file}"
shutil.copy2(src_file, dst_file)
pbar.update(1)
# Delete old dataset if exists
if dataset_dir.exists():
shutil.rmtree(dataset_dir)
# Create dataset folders for images and labels splits
for split in ['train', 'val', 'test']:
(dataset_dir / 'images' / split).mkdir(parents=True, exist_ok=True)
(dataset_dir / 'labels' / split).mkdir(parents=True, exist_ok=True)
# List all images in images_dir
all_images = [f for f in images_dir.iterdir() if f.suffix.lower() in image_exts]
# Shuffle and split (80% train, 10% val, 10% test)
random.seed(42)
random.shuffle(all_images)
n = len(all_images)
train_end = int(0.8 * n)
val_end = train_end + int(0.1 * n)
splits = {
'train': all_images[:train_end],
'val': all_images[train_end:val_end],
'test': all_images[val_end:]
}
label_src_dir = Path(f'{Config.current_path}/image_labels')
# Move/copy images and labels to their split folders with tqdm
for split, files in splits.items():
print(f"Processing split '{split}' with {len(files)} images...")
for img_path in tqdm(files, desc=f"Copying {split}", unit="img"):
# Copy image
dst_img_path = dataset_dir / 'images' / split / img_path.name
shutil.copy2(img_path, dst_img_path)
# Copy label if exists
stem = img_path.stem
for ext in label_exts:
label_file = label_src_dir / f"{stem}{ext}"
if label_file.exists():
dst_label_path = dataset_dir / 'labels' / split / label_file.name
shutil.copy2(label_file, dst_label_path)
break
print("Done!")