File size: 42,113 Bytes
a091733 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 |
from huggingface_hub import snapshot_download
import sys
import os
import torch
import numpy as np
import pandas as pd
import json
import re
import pydicom
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from pathlib import Path
import threading
import multiprocessing as mp
# Download and setup model
model_path = snapshot_download(repo_id="Lab-Rasool/sybil")
sys.path.append(model_path)
from modeling_sybil_hf import SybilHFWrapper
from configuration_sybil import SybilConfig
def load_model(device_id=0):
"""
Load and initialize the Sybil model once.
Args:
device_id: GPU device ID to load model on
Returns:
Initialized SybilHFWrapper model
"""
print(f"Loading Sybil model on GPU {device_id}...")
config = SybilConfig()
model = SybilHFWrapper(config)
# Move model to specific GPU
device = torch.device(f'cuda:{device_id}')
# CRITICAL: Set the model's internal device attribute
# This ensures preprocessing moves data to the correct GPU
model.device = device
# Move all ensemble models to the correct GPU
for m in model.models:
m.to(device)
m.eval()
print(f"Model loaded successfully on GPU {device_id}!")
print(f" Model internal device: {model.device}")
return model, device
def is_localizer_scan(dicom_folder):
"""
Check if a DICOM folder contains a localizer/scout scan.
Based on preprocessing.py logic.
Returns:
Tuple of (is_localizer, reason)
"""
folder_path = Path(dicom_folder)
folder_name = folder_path.name.lower()
localizer_keywords = ['localizer', 'scout', 'topogram', 'surview', 'scanogram']
# Check folder name
if any(keyword in folder_name for keyword in localizer_keywords):
return True, f"Folder name contains localizer keyword: {folder_name}"
try:
dcm_files = list(folder_path.glob("*.dcm"))
if not dcm_files:
return False, "No DICOM files found"
# Check first few DICOM files for localizer metadata
sample_files = dcm_files[:min(3, len(dcm_files))]
for dcm_file in sample_files:
try:
dcm = pydicom.dcmread(str(dcm_file), stop_before_pixels=True)
# Check ImageType field
if hasattr(dcm, 'ImageType'):
image_type_str = ' '.join(str(val).lower() for val in dcm.ImageType)
if any(keyword in image_type_str for keyword in localizer_keywords):
return True, f"ImageType indicates localizer: {dcm.ImageType}"
# Check SeriesDescription field
if hasattr(dcm, 'SeriesDescription'):
if any(keyword in dcm.SeriesDescription.lower() for keyword in localizer_keywords):
return True, f"SeriesDescription indicates localizer: {dcm.SeriesDescription}"
except Exception as e:
continue
except Exception as e:
pass
return False, "Not a localizer scan"
def extract_timepoint_from_path(scan_dir):
"""
Extract timepoint from scan directory path based on year.
1999 -> T0, 2000 -> T1, 2001 -> T2, etc.
Looks for year patterns in folder names in date format MM-DD-YYYY.
Args:
scan_dir: Directory path string
Returns:
Timepoint string (e.g., 'T0', 'T1', 'T2') or None if not found
"""
# Split path into components
path_parts = scan_dir.split('/')
# Look for date patterns like "01-02-2000-NLST-LSS"
# Pattern: Date format MM-DD-YYYY at the start of a folder name
date_pattern = r'^\d{2}-\d{2}-(19\d{2}|20\d{2})'
base_year = 1999
for part in path_parts:
# Check for date pattern (e.g., "01-02-2000-NLST-LSS-50335")
match = re.match(date_pattern, part)
if match:
year = int(match.group(1))
if 1999 <= year <= 2010: # Reasonable range for NLST
timepoint_num = year - base_year
print(f" DEBUG: Found year {year} in '{part}' -> T{timepoint_num}")
return f'T{timepoint_num}'
return None
def extract_embedding_single_model(model_idx, ensemble_model, pixel_values, device):
"""
Extract embedding from a single ensemble model.
Args:
model_idx: Index of the model in the ensemble
ensemble_model: Single model from the ensemble
pixel_values: Preprocessed pixel values tensor (already on correct device)
device: Device to run on (e.g., cuda:0, cuda:1)
Returns:
numpy array of embeddings from this model
"""
embeddings_buffer = []
def create_hook(buffer):
def hook(module, input, output):
# Capture the output of ReLU layer (before dropout)
buffer.append(output.detach().cpu())
return hook
# Register hook on the ReLU layer (this is AFTER pooling, BEFORE dropout/classification)
hook_handle = ensemble_model.relu.register_forward_hook(create_hook(embeddings_buffer))
# Run forward pass on THIS model only with keyword argument
with torch.no_grad():
_ = ensemble_model(pixel_values=pixel_values)
# Remove hook
hook_handle.remove()
# Get the embeddings (should be shape [1, 512])
if embeddings_buffer:
embedding = embeddings_buffer[0].numpy().squeeze()
print(f"Model {model_idx + 1}: Embedding shape = {embedding.shape}")
return embedding
return None
def extract_embeddings(model, dicom_paths, device, use_parallel=True):
"""
Extract embeddings from the layer after ReLU, before Dropout.
Processes ensemble models in parallel for speed.
Args:
model: Pre-loaded SybilHFWrapper model
dicom_paths: List of DICOM file paths
device: Device to run on (e.g., cuda:0, cuda:1)
use_parallel: If True, process ensemble models in parallel
Returns:
numpy array of shape (512,) - averaged embeddings across ensemble
"""
# Preprocess ONCE (not 5 times!)
# The model's preprocessing handles moving data to the correct device
with torch.no_grad():
# Get the preprocessed input by calling the wrapper's preprocess_dicom method
# This returns the tensor that would be fed to each ensemble model
pixel_values = model.preprocess_dicom(dicom_paths)
if use_parallel:
# Process all ensemble models in parallel using ThreadPoolExecutor
all_embeddings = []
with ThreadPoolExecutor(max_workers=len(model.models)) as executor:
# Submit all models for parallel processing with the SAME preprocessed input
futures = [
executor.submit(extract_embedding_single_model, model_idx, ensemble_model, pixel_values, device)
for model_idx, ensemble_model in enumerate(model.models)
]
# Collect results as they complete
for future in futures:
embedding = future.result()
if embedding is not None:
all_embeddings.append(embedding)
else:
# Sequential processing (original implementation)
all_embeddings = []
for model_idx, ensemble_model in enumerate(model.models):
embedding = extract_embedding_single_model(model_idx, ensemble_model, pixel_values, device)
if embedding is not None:
all_embeddings.append(embedding)
# Average embeddings across ensemble
averaged_embedding = np.mean(all_embeddings, axis=0)
return averaged_embedding
def check_directory_for_dicoms(dirpath):
"""
Check a single directory for valid DICOM files.
Returns (dirpath, num_files, subject_id, filter_reason) or None if invalid.
"""
try:
# Quick check: does this directory have .dcm files?
dcm_files = [f for f in os.listdir(dirpath)
if f.endswith('.dcm') and os.path.isfile(os.path.join(dirpath, f))]
if not dcm_files:
return None
num_files = len(dcm_files)
# Filter out scans with 1-2 DICOM files (likely localizers)
if num_files <= 2:
return (dirpath, num_files, None, 'too_few_slices')
# Check if it's a localizer scan
is_loc, _ = is_localizer_scan(dirpath)
if is_loc:
return (dirpath, num_files, None, 'localizer')
# Extract subject ID (PID) from path
# Path structure: /NLST/<PID>/<date-info>/<scan-info>
# Example: /NLST/106639/01-02-1999-NLST-LSS-45699/1.000000-0OPLGEHSQXAnullna...
path_parts = dirpath.rstrip('/').split('/')
# Find the PID: it's the part after 'NLST' directory
try:
nlst_idx = path_parts.index('NLST')
subject_id = path_parts[nlst_idx + 1] # PID is right after 'NLST'
except (ValueError, IndexError):
# Fallback to old logic if path structure is different
subject_id = path_parts[-3] if len(path_parts) >= 3 else path_parts[-1]
return (dirpath, num_files, subject_id, 'valid')
except Exception as e:
return None
def save_directory_cache(dicom_dirs, cache_file):
"""
Save the list of DICOM directories to a cache file.
Args:
dicom_dirs: List of directory paths
cache_file: Path to cache file
"""
print(f"\n๐พ Saving directory cache to {cache_file}...")
cache_data = {
"timestamp": datetime.now().isoformat(),
"num_directories": len(dicom_dirs),
"directories": dicom_dirs
}
with open(cache_file, 'w') as f:
json.dump(cache_data, f, indent=2)
print(f"โ Cache saved with {len(dicom_dirs)} directories\n")
def load_directory_cache(cache_file):
"""
Load the list of DICOM directories from a cache file.
Args:
cache_file: Path to cache file
Returns:
List of directory paths, or None if cache doesn't exist or is invalid
"""
if not os.path.exists(cache_file):
return None
try:
with open(cache_file, 'r') as f:
cache_data = json.load(f)
dicom_dirs = cache_data.get("directories", [])
timestamp = cache_data.get("timestamp", "unknown")
print(f"\nโ Loaded directory cache from {cache_file}")
print(f" Cache created: {timestamp}")
print(f" Directories: {len(dicom_dirs)}\n")
return dicom_dirs
except Exception as e:
print(f"โ ๏ธ Failed to load cache: {e}")
return None
def find_dicom_directories(root_dir, max_subjects=None, num_workers=12, cache_file=None, filter_pids=None):
"""
Walk through directory tree and find all directories containing DICOM files.
Uses parallel processing for much faster scanning of large directory trees.
Only returns leaf directories (directories with .dcm files, not their parents).
Filters out localizer scans with 1-2 DICOM files.
Args:
root_dir: Root directory to search
max_subjects: Optional maximum number of unique subjects to process (None = all)
num_workers: Number of parallel workers for directory scanning (default: 12)
cache_file: Optional path to cache file for saving/loading directory list
filter_pids: Optional set of PIDs to filter (only include these subjects)
Returns:
List of directory paths containing .dcm files
"""
# Try to load from cache first
if cache_file:
cached_dirs = load_directory_cache(cache_file)
if cached_dirs is not None:
print("โ Using cached directory list (skipping scan)")
# Apply PID filter if specified
if filter_pids:
print(f" Filtering to {len(filter_pids)} PIDs from CSV...")
filtered_dirs = []
for d in cached_dirs:
# Extract PID from path: /NLST/<PID>/<date>/<scan>
path_parts = d.rstrip('/').split('/')
try:
nlst_idx = path_parts.index('NLST')
subject_id = path_parts[nlst_idx + 1]
except (ValueError, IndexError):
subject_id = path_parts[-3] if len(path_parts) >= 3 else path_parts[-1]
if subject_id in filter_pids:
filtered_dirs.append(d)
print(f" โ Found {len(filtered_dirs)} scans matching PIDs")
return filtered_dirs
# Still apply max_subjects limit if specified
if max_subjects:
subjects_seen = set()
filtered_dirs = []
for d in cached_dirs:
# Extract PID from path: /NLST/<PID>/<date>/<scan>
path_parts = d.rstrip('/').split('/')
try:
nlst_idx = path_parts.index('NLST')
subject_id = path_parts[nlst_idx + 1]
except (ValueError, IndexError):
subject_id = path_parts[-3] if len(path_parts) >= 3 else path_parts[-1]
# Check if we should include this scan
# Add scan if: (1) already collecting this subject, OR (2) under subject limit
if subject_id in subjects_seen:
# Already collecting this subject - add this scan
filtered_dirs.append(d)
elif len(subjects_seen) < max_subjects:
# New subject and under limit - start collecting this subject
subjects_seen.add(subject_id)
filtered_dirs.append(d)
# Stop once we have enough subjects
if len(subjects_seen) >= max_subjects:
# Count remaining scans from these subjects
remaining_count = 0
for remaining_d in cached_dirs[cached_dirs.index(d)+1:]:
remaining_parts = remaining_d.rstrip('/').split('/')
try:
remaining_nlst_idx = remaining_parts.index('NLST')
remaining_subject_id = remaining_parts[remaining_nlst_idx + 1]
except (ValueError, IndexError):
remaining_subject_id = remaining_parts[-3] if len(remaining_parts) >= 3 else remaining_parts[-1]
if remaining_subject_id in subjects_seen:
filtered_dirs.append(remaining_d)
break
print(f" โ Limited to {len(subjects_seen)} subjects ({len(filtered_dirs)} total scans)")
return filtered_dirs
return cached_dirs
print(f"Starting parallel directory scan with {num_workers} workers...")
if filter_pids:
print(f"โก FAST MODE: Only scanning {len(filter_pids)} PIDs (skipping others)")
else:
print("Scanning ALL subjects (this may take a while)")
# Phase 1: Fast parallel scan to find all directories with DICOM files
# BUT: Skip subject directories not in filter_pids for MASSIVE speedup
print("\nPhase 1: Scanning filesystem for DICOM directories...")
start_time = datetime.now()
# Collect all directories first (fast) - WITH EARLY FILTERING
all_dirs = []
for dirpath, dirnames, filenames in os.walk(root_dir):
# EARLY FILTER: If we have filter_pids, only descend into matching PID directories
if filter_pids:
path_parts = dirpath.rstrip('/').split('/')
try:
nlst_idx = path_parts.index('NLST')
# If this is a subject directory (one level below NLST)
if len(path_parts) == nlst_idx + 2:
subject_id = path_parts[nlst_idx + 1]
# Skip this subject if not in filter list
if subject_id not in filter_pids:
dirnames.clear() # Don't descend into this subject's subdirs
continue
except (ValueError, IndexError):
pass
# Quick check: if directory has .dcm files, add to list
if any(f.endswith('.dcm') for f in filenames):
all_dirs.append(dirpath)
print(f"Found {len(all_dirs)} potential DICOM directories in {(datetime.now() - start_time).total_seconds():.1f}s")
# Phase 2: Parallel validation and filtering
print(f"\nPhase 2: Validating directories in parallel ({num_workers} workers)...")
from concurrent.futures import ProcessPoolExecutor, as_completed
dicom_dirs = []
subjects_found = set()
filtered_stats = {'localizers': 0, 'too_few_slices': 0}
with ProcessPoolExecutor(max_workers=num_workers) as executor:
# Submit all directories for checking
future_to_dir = {executor.submit(check_directory_for_dicoms, d): d for d in all_dirs}
# Process results as they complete
for i, future in enumerate(as_completed(future_to_dir), 1):
# Print progress every 1000 dirs (more frequent for visibility)
if i % 1000 == 0:
elapsed = (datetime.now() - start_time).total_seconds()
rate = i / elapsed if elapsed > 0 else 0
remaining = (len(all_dirs) - i) / rate if rate > 0 else 0
print(f" [{i}/{len(all_dirs)}] Found: {len(dicom_dirs)} scans from {len(subjects_found)} PIDs | "
f"Filtered: {filtered_stats['localizers'] + filtered_stats['too_few_slices']} | "
f"ETA: {remaining/60:.1f} min")
try:
result = future.result()
if result is None:
continue
dirpath, num_files, subject_id, status = result
if status == 'too_few_slices':
filtered_stats['too_few_slices'] += 1
elif status == 'localizer':
filtered_stats['localizers'] += 1
elif status == 'valid':
# Check PID filter
if filter_pids is not None and subject_id not in filter_pids:
continue
# Check subject limit
if max_subjects is not None and subject_id not in subjects_found and len(subjects_found) >= max_subjects:
continue
subjects_found.add(subject_id)
dicom_dirs.append(dirpath)
# Print when we find a new PID match (helpful for filtered runs)
if filter_pids and len(dicom_dirs) % 100 == 1:
print(f" โ Found {len(dicom_dirs)} scans so far ({len(subjects_found)} unique PIDs)")
# Stop if we've hit subject limit
if max_subjects is not None and len(subjects_found) >= max_subjects:
print(f"\nโ Reached limit of {max_subjects} subjects. Stopping search.")
# Cancel remaining futures
for f in future_to_dir:
f.cancel()
break
except Exception as e:
continue
scan_time = (datetime.now() - start_time).total_seconds()
print(f"\n{'='*80}")
print(f"Directory Scan Complete in {scan_time:.1f}s ({scan_time/60:.1f} minutes)")
print(f"{'='*80}")
print(f"Filtering Summary:")
print(f" โ
Valid scans found: {len(dicom_dirs)}")
print(f" ๐ซ Localizers filtered: {filtered_stats['localizers']}")
print(f" โญ๏ธ Too few slices (โค2) filtered: {filtered_stats['too_few_slices']}")
print(f" ๐ Unique subjects: {len(subjects_found)}")
print(f" โก Speed: {len(all_dirs)/scan_time:.0f} dirs/second")
print(f"{'='*80}\n")
# Save to cache if specified
if cache_file:
save_directory_cache(dicom_dirs, cache_file)
return dicom_dirs
def prepare_scan_metadata(scan_dir):
"""
Prepare metadata for a scan without processing.
Args:
scan_dir: Directory containing DICOM files for one scan
Returns:
tuple: (dicom_file_paths, num_files, subject_id, scan_id)
"""
# Count DICOM files (ensure they are actual files, not directories)
dicom_files = [f for f in os.listdir(scan_dir)
if f.endswith('.dcm') and os.path.isfile(os.path.join(scan_dir, f))]
num_dicom_files = len(dicom_files)
if num_dicom_files == 0:
raise ValueError("No valid DICOM files found")
# Create list of full paths to DICOM files
dicom_file_paths = [os.path.join(scan_dir, f) for f in dicom_files]
# Parse directory path to extract identifiers
# Path structure: /NLST/<PID>/<date-info>/<scan-info>
path_parts = scan_dir.rstrip('/').split('/')
scan_id = path_parts[-1] if path_parts[-1] else path_parts[-2]
# Extract PID from path
try:
nlst_idx = path_parts.index('NLST')
subject_id = path_parts[nlst_idx + 1] # PID is right after 'NLST'
except (ValueError, IndexError):
# Fallback to old logic
subject_id = path_parts[-3] if len(path_parts) >= 3 else path_parts[-1]
return dicom_file_paths, num_dicom_files, subject_id, scan_id
def save_checkpoint(all_embeddings, all_metadata, failed, output_dir, checkpoint_num):
"""
Save a checkpoint of embeddings and metadata.
Args:
all_embeddings: List of embedding arrays
all_metadata: List of metadata dictionaries
failed: List of failed scans
output_dir: Output directory
checkpoint_num: Checkpoint number
"""
print(f"\n๐พ Saving checkpoint {checkpoint_num}...")
# Convert embeddings to array
embeddings_array = np.array(all_embeddings)
embedding_dim = int(embeddings_array.shape[1]) if len(embeddings_array.shape) > 1 else int(embeddings_array.shape[0])
# Create DataFrame
df_data = {
'case_number': [m['case_number'] for m in all_metadata],
'subject_id': [m['subject_id'] for m in all_metadata],
'scan_id': [m['scan_id'] for m in all_metadata],
'timepoint': [m.get('timepoint') for m in all_metadata],
'dicom_directory': [m['dicom_directory'] for m in all_metadata],
'num_dicom_files': [m['num_dicom_files'] for m in all_metadata],
'embedding_index': [m['embedding_index'] for m in all_metadata],
'embedding': list(embeddings_array)
}
df = pd.DataFrame(df_data)
# Save checkpoint parquet
checkpoint_path = os.path.join(output_dir, f"checkpoint_{checkpoint_num}_embeddings.parquet")
df.to_parquet(checkpoint_path, index=False, compression='snappy')
print(f" โ Saved embeddings checkpoint: {checkpoint_path}")
# Save checkpoint metadata
checkpoint_metadata = {
"checkpoint_num": checkpoint_num,
"timestamp": datetime.now().isoformat(),
"total_scans": len(all_embeddings),
"failed_scans": len(failed),
"embedding_shape": list(embeddings_array.shape),
"scans": all_metadata,
"failed_scans": failed
}
metadata_path = os.path.join(output_dir, f"checkpoint_{checkpoint_num}_metadata.json")
with open(metadata_path, 'w') as f:
json.dump(checkpoint_metadata, f, indent=2)
print(f" โ Saved metadata checkpoint: {metadata_path}")
print(f"๐พ Checkpoint {checkpoint_num} complete!\n")
def process_scan(model, device, scan_dir):
"""
Process a single scan directory and extract embeddings.
Args:
model: Pre-loaded SybilHFWrapper model
device: Device to run on (e.g., cuda:0, cuda:1)
scan_dir: Directory containing DICOM files for one scan
Returns:
tuple: (embeddings, scan_metadata)
"""
dicom_file_paths, num_dicom_files, subject_id, scan_id = prepare_scan_metadata(scan_dir)
print(f"\nProcessing: {scan_dir}")
print(f"DICOM files: {num_dicom_files}")
# Extract embeddings
embeddings = extract_embeddings(model, dicom_file_paths, device)
print(f"Embedding shape: {embeddings.shape}")
# Extract timepoint from path (e.g., 1999 -> T0, 2000 -> T1)
timepoint = extract_timepoint_from_path(scan_dir)
if timepoint:
print(f"Timepoint: {timepoint}")
else:
print(f"Timepoint: Not detected")
# Create metadata for this scan
scan_metadata = {
"case_number": subject_id, # Case number (e.g., 205749)
"subject_id": subject_id,
"scan_id": scan_id,
"timepoint": timepoint, # T0, T1, T2, etc. or None
"dicom_directory": scan_dir,
"num_dicom_files": num_dicom_files,
"embedding_index": None, # Will be set later
"statistics": {
"mean": float(np.mean(embeddings)),
"std": float(np.std(embeddings)),
"min": float(np.min(embeddings)),
"max": float(np.max(embeddings))
}
}
return embeddings, scan_metadata
# Main execution
if __name__ == "__main__":
import argparse
# Parse command line arguments
parser = argparse.ArgumentParser(description='Extract Sybil embeddings from DICOM scans')
# Input/Output
parser.add_argument('--root-dir', type=str, required=True,
help='Root directory containing DICOM files (e.g., /path/to/NLST)')
parser.add_argument('--pid-csv', type=str, default=None,
help='CSV file with "pid" column to filter subjects (e.g., subsets/hybridModels-train.csv)')
parser.add_argument('--output-dir', type=str, default='embeddings_output',
help='Output directory for embeddings (default: embeddings_output)')
parser.add_argument('--max-subjects', type=int, default=None,
help='Maximum number of subjects to process (for testing)')
# Performance tuning
parser.add_argument('--num-gpus', type=int, default=1,
help='Number of GPUs to use (default: 1)')
parser.add_argument('--num-parallel', type=int, default=1,
help='Number of parallel scans to process simultaneously (default: 1, recommended: 1-4 depending on GPU memory)')
parser.add_argument('--num-workers', type=int, default=4,
help='Number of parallel workers for directory scanning (default: 4, recommended: 4-12 depending on storage speed)')
parser.add_argument('--checkpoint-interval', type=int, default=1000,
help='Save checkpoint every N scans (default: 1000)')
args = parser.parse_args()
# ==========================================
# CONFIGURATION
# ==========================================
root_dir = args.root_dir
output_dir = args.output_dir
max_subjects = args.max_subjects
num_gpus = args.num_gpus
num_parallel_scans = args.num_parallel
num_scan_workers = args.num_workers
checkpoint_interval = args.checkpoint_interval
# Always use the main cache file from the full run
main_cache = "embeddings_output_full/directory_cache.json"
if os.path.exists(main_cache):
cache_file = main_cache
print(f"โ Found main directory cache: {main_cache}")
else:
cache_file = os.path.join(output_dir, "directory_cache.json")
# Verify root directory exists
if not os.path.exists(root_dir):
raise ValueError(f"Root directory does not exist: {root_dir}")
# Load PIDs from CSV if provided
filter_pids = None
if args.pid_csv:
print(f"Loading subject PIDs from: {args.pid_csv}")
import pandas as pd
csv_data = pd.read_csv(args.pid_csv)
filter_pids = set(str(pid) for pid in csv_data['pid'].unique())
print(f" Found {len(filter_pids)} unique PIDs to extract")
print(f" Examples: {list(filter_pids)[:5]}")
# Create output directory
os.makedirs(output_dir, exist_ok=True)
# Print configuration
print(f"\n{'='*80}")
print(f"CONFIGURATION")
print(f"{'='*80}")
print(f"Root directory: {root_dir}")
print(f"Output directory: {output_dir}")
print(f"Number of GPUs: {num_gpus}")
print(f"Parallel scans: {num_parallel_scans} (recommended: 1-4 depending on GPU memory)")
print(f"Directory scan workers: {num_scan_workers} (recommended: 4-12 depending on storage)")
print(f"Checkpoint interval: {checkpoint_interval} scans")
if filter_pids:
print(f"Filtering to: {len(filter_pids)} PIDs from CSV")
if max_subjects:
print(f"Max subjects: {max_subjects}")
print(f"{'='*80}\n")
# Warning about memory requirements
if num_parallel_scans > 1:
estimated_vram = (num_parallel_scans // num_gpus) * 10
print(f"โ ๏ธ MEMORY WARNING:")
print(f" Parallel processing requires ~{estimated_vram}GB VRAM per GPU")
print(f" If you encounter OOM errors, reduce --num-parallel to 1-2")
print(f" Current: {num_parallel_scans} scans across {num_gpus} GPU(s)\n")
# Find all directories containing DICOM files (FAST with parallel processing!)
# Will use cached directory list if available, otherwise scan and save cache
dicom_dirs = find_dicom_directories(root_dir, max_subjects=max_subjects,
num_workers=num_scan_workers, cache_file=cache_file,
filter_pids=filter_pids)
if len(dicom_dirs) == 0:
raise ValueError(f"No directories with DICOM files found in {root_dir}")
print(f"\n{'='*80}")
print(f"Found {len(dicom_dirs)} directories containing DICOM files")
print(f"{'='*80}\n")
# Detect and load models on multiple GPUs
print(f"๐ฎ Detected {num_gpus} GPU(s)")
print(f"๐ Will process {num_parallel_scans} scans in parallel ({num_parallel_scans // num_gpus} per GPU)")
print(f"๐พ Checkpoints will be saved every {checkpoint_interval} scans\n")
# Load models on each GPU
models_and_devices = []
for gpu_id in range(num_gpus):
model, device = load_model(gpu_id)
models_and_devices.append((model, device, gpu_id))
# Process each scan directory and collect all embeddings
all_embeddings = []
all_metadata = []
failed = []
checkpoint_counter = 0
if num_parallel_scans > 1:
# Parallel processing of multiple scans across multiple GPUs
print(f"Processing {num_parallel_scans} scans in parallel across {num_gpus} GPU(s)...")
print(f"Note: This requires ~{(num_parallel_scans // num_gpus) * 10}GB VRAM per GPU.\n")
from functools import partial
from concurrent.futures import as_completed
# Process scans in batches for checkpoint saving
batch_size = checkpoint_interval
num_batches = (len(dicom_dirs) + batch_size - 1) // batch_size
for batch_idx in range(num_batches):
start_idx = batch_idx * batch_size
end_idx = min(start_idx + batch_size, len(dicom_dirs))
batch_dirs = dicom_dirs[start_idx:end_idx]
print(f"\n{'='*80}")
print(f"Processing batch {batch_idx + 1}/{num_batches} (scans {start_idx + 1} to {end_idx})")
print(f"{'='*80}\n")
# Use ThreadPoolExecutor for parallel scan processing
# IMPORTANT: max_workers limits concurrent execution to prevent OOM
with ThreadPoolExecutor(max_workers=num_parallel_scans) as executor:
# Submit scans in controlled batches to avoid memory issues
# We submit only max_workers scans at once, then submit more as they complete
future_to_info = {}
scan_queue = list(enumerate(batch_dirs))
scans_submitted = 0
# Submit initial batch (up to max_workers scans)
while scan_queue and scans_submitted < num_parallel_scans:
i, scan_dir = scan_queue.pop(0)
# Select GPU in round-robin fashion
gpu_idx = i % num_gpus
model, device, gpu_id = models_and_devices[gpu_idx]
# Create partial function with model and device
process_func = partial(process_scan, model, device)
future = executor.submit(process_func, scan_dir)
future_to_info[future] = (start_idx + i + 1, scan_dir, gpu_id)
scans_submitted += 1
# Process results as they complete and submit new scans
while future_to_info:
# Wait for next completion
done_futures = []
for future in list(future_to_info.keys()):
if future.done():
done_futures.append(future)
if not done_futures:
import time
time.sleep(0.1)
continue
# Process completed futures
for future in done_futures:
scan_num, scan_dir, gpu_id = future_to_info.pop(future)
try:
print(f"[{scan_num}/{len(dicom_dirs)}] Processing on GPU {gpu_id}...")
embeddings, scan_metadata = future.result()
# Set the index for this scan
scan_metadata["embedding_index"] = len(all_embeddings)
# Collect embeddings and metadata
all_embeddings.append(embeddings)
all_metadata.append(scan_metadata)
except Exception as e:
print(f"ERROR processing {scan_dir}: {e}")
failed.append({"scan_dir": scan_dir, "error": str(e)})
# Submit next scan from queue
if scan_queue:
i, next_scan_dir = scan_queue.pop(0)
gpu_idx = i % num_gpus
model, device, gpu_id = models_and_devices[gpu_idx]
process_func = partial(process_scan, model, device)
new_future = executor.submit(process_func, next_scan_dir)
future_to_info[new_future] = (start_idx + i + 1, next_scan_dir, gpu_id)
# Save checkpoint after each batch
checkpoint_counter += 1
save_checkpoint(all_embeddings, all_metadata, failed, output_dir, checkpoint_counter)
print(f"Progress: {len(all_embeddings)}/{len(dicom_dirs)} scans completed "
f"({len(all_embeddings)/len(dicom_dirs)*100:.1f}%)\n")
else:
# Sequential processing (original behavior)
model, device, gpu_id = models_and_devices[0] # Use first GPU
for i, scan_dir in enumerate(dicom_dirs, 1):
try:
print(f"\n[{i}/{len(dicom_dirs)}] Processing scan...")
# Process scan and get results
embeddings, scan_metadata = process_scan(model, device, scan_dir)
# Set the index for this scan
scan_metadata["embedding_index"] = len(all_embeddings)
# Collect embeddings and metadata
all_embeddings.append(embeddings)
all_metadata.append(scan_metadata)
# Save checkpoint every checkpoint_interval scans
if i % checkpoint_interval == 0:
checkpoint_counter += 1
save_checkpoint(all_embeddings, all_metadata, failed, output_dir, checkpoint_counter)
except Exception as e:
print(f"ERROR processing {scan_dir}: {e}")
failed.append({"scan_dir": scan_dir, "error": str(e)})
# Convert embeddings list to numpy array
# Shape will be (num_scans, embedding_dim)
embeddings_array = np.array(all_embeddings)
embedding_dim = int(embeddings_array.shape[1]) if len(embeddings_array.shape) > 1 else int(embeddings_array.shape[0])
# Create DataFrame with embeddings and metadata for Parquet
# Store embeddings as a single array column
df_data = {
'case_number': [m['case_number'] for m in all_metadata],
'subject_id': [m['subject_id'] for m in all_metadata],
'scan_id': [m['scan_id'] for m in all_metadata],
'timepoint': [m.get('timepoint') for m in all_metadata], # T0, T1, T2, etc.
'dicom_directory': [m['dicom_directory'] for m in all_metadata],
'num_dicom_files': [m['num_dicom_files'] for m in all_metadata],
'embedding_index': [m['embedding_index'] for m in all_metadata],
'embedding': list(embeddings_array) # Store as list of arrays
}
# Create DataFrame
df = pd.DataFrame(df_data)
# Save final complete file as Parquet
embeddings_filename = "all_embeddings.parquet"
embeddings_path = os.path.join(output_dir, embeddings_filename)
df.to_parquet(embeddings_path, index=False, compression='snappy')
print(f"\nโ
Saved FINAL embeddings to Parquet: {embeddings_path}")
# Create comprehensive metadata JSON
dataset_metadata = {
"dataset_info": {
"root_directory": root_dir,
"total_scans": len(all_embeddings),
"failed_scans": len(failed),
"embedding_shape": list(embeddings_array.shape),
"embedding_dim": embedding_dim,
"extraction_timestamp": datetime.now().isoformat(),
"file_format": "parquet"
},
"model_info": {
"model": "Lab-Rasool/sybil",
"layer": "after_relu_before_dropout",
"ensemble_averaged": True,
"num_ensemble_models": 5
},
"embeddings_file": embeddings_filename,
"parquet_schema": {
"metadata_columns": ["case_number", "subject_id", "scan_id", "timepoint", "dicom_directory", "num_dicom_files", "embedding_index"],
"embedding_column": "embedding",
"embedding_shape": f"({embedding_dim},)",
"total_columns": 8,
"timepoint_info": "T0=1999, T1=2000, T2=2001, etc. Extracted from year in path. Can be None if not detected."
},
"filtering_info": {
"localizer_detection": "Scans identified as localizers (by folder name or DICOM metadata) are filtered out",
"min_slices": "Scans with โค2 DICOM files are filtered out (likely localizers)",
"accepted_scans": len(all_embeddings)
},
"scans": all_metadata,
"failed_scans": failed
}
metadata_filename = "dataset_metadata.json"
metadata_path = os.path.join(output_dir, metadata_filename)
with open(metadata_path, 'w') as f:
json.dump(dataset_metadata, f, indent=2)
print(f"โ
Saved FINAL metadata: {metadata_path}")
# Summary
print(f"\n{'='*80}")
print(f"PROCESSING COMPLETE")
print(f"{'='*80}")
print(f"Successfully processed: {len(all_embeddings)}/{len(dicom_dirs)} scans")
print(f"Failed: {len(failed)}/{len(dicom_dirs)} scans")
print(f"\nEmbeddings array shape: {embeddings_array.shape}")
print(f"Saved embeddings to: {embeddings_path}")
print(f"Saved metadata to: {metadata_path}")
# Timepoint summary
timepoint_counts = {}
for m in all_metadata:
tp = m.get('timepoint', 'Unknown')
timepoint_counts[tp] = timepoint_counts.get(tp, 0) + 1
if timepoint_counts:
print(f"\n๐
Timepoint Distribution:")
for tp in sorted(timepoint_counts.keys(), key=lambda x: (x is None, x)):
count = timepoint_counts[tp]
if tp is None:
print(f" Unknown/Not detected: {count} scans")
else:
print(f" {tp}: {count} scans")
if failed:
print(f"\nFailed scans: {len(failed)}")
for fail_info in failed[:5]: # Show first 5 failures
print(f" - {fail_info['scan_dir']}")
print(f" Error: {fail_info['error']}")
if len(failed) > 5:
print(f" ... and {len(failed) - 5} more failures")
print(f"\n{'='*80}")
print(f"For downstream training, load embeddings with:")
print(f" import pandas as pd")
print(f" import numpy as np")
print(f" df = pd.read_parquet('{embeddings_path}')")
print(f" # Total rows: {len(df)}, Total columns: {len(df.columns)}")
print(f" # Extract embeddings array: embeddings = np.stack(df['embedding'].values)")
print(f" # Shape: {embeddings_array.shape}")
print(f" # Access individual: df.loc[0, 'embedding'] -> array of shape ({embedding_dim},)")
print(f"{'='*80}")
|