Spaces:
Running
Running
""" | |
DICOM Handler for medical image processing | |
Optimized for memory-constrained environments | |
""" | |
import os | |
import logging | |
import numpy as np | |
from typing import Dict, Any, Optional, Tuple, List | |
from pathlib import Path | |
import torch | |
from PIL import Image | |
import cv2 | |
logger = logging.getLogger(__name__) | |
# Try to import medical libraries with fallbacks | |
try: | |
import pydicom | |
PYDICOM_AVAILABLE = True | |
except ImportError: | |
PYDICOM_AVAILABLE = False | |
logger.warning("pydicom not available - DICOM support limited") | |
try: | |
import SimpleITK as sitk | |
SIMPLEITK_AVAILABLE = True | |
except ImportError: | |
SIMPLEITK_AVAILABLE = False | |
logger.warning("SimpleITK not available - advanced medical image processing limited") | |
class DicomHandler: | |
""" | |
DICOM file handler with memory optimization | |
""" | |
def __init__(self, memory_limit_mb: float = 1000.0): | |
""" | |
Initialize DICOM handler | |
Args: | |
memory_limit_mb: Memory limit for DICOM processing in MB | |
""" | |
self.memory_limit_mb = memory_limit_mb | |
self.memory_limit_bytes = memory_limit_mb * 1024**2 | |
# Default DICOM processing settings | |
self.default_window_center = 40 | |
self.default_window_width = 400 | |
self.default_output_size = (512, 512) | |
logger.info(f"DICOM Handler initialized with {memory_limit_mb}MB limit") | |
logger.info(f"pydicom available: {PYDICOM_AVAILABLE}") | |
logger.info(f"SimpleITK available: {SIMPLEITK_AVAILABLE}") | |
def read_dicom_file(self, file_path: str) -> Optional[Dict[str, Any]]: | |
""" | |
Read DICOM file and extract image data and metadata | |
Args: | |
file_path: Path to DICOM file | |
Returns: | |
Dictionary containing image data and metadata | |
""" | |
if not PYDICOM_AVAILABLE: | |
logger.error("pydicom not available - cannot read DICOM files") | |
return None | |
try: | |
file_path = Path(file_path) | |
if not file_path.exists(): | |
logger.error(f"DICOM file not found: {file_path}") | |
return None | |
# Check file size | |
file_size_mb = file_path.stat().st_size / (1024**2) | |
if file_size_mb > self.memory_limit_mb: | |
logger.warning(f"DICOM file too large: {file_size_mb:.1f}MB > {self.memory_limit_mb}MB") | |
return self._read_large_dicom_file(file_path) | |
# Read DICOM file | |
dicom_data = pydicom.dcmread(str(file_path)) | |
# Extract image data | |
image_array = dicom_data.pixel_array | |
# Extract metadata | |
metadata = self._extract_dicom_metadata(dicom_data) | |
# Process image | |
processed_image = self._process_dicom_image(image_array, metadata) | |
return { | |
'image': processed_image, | |
'metadata': metadata, | |
'original_shape': image_array.shape, | |
'file_path': str(file_path), | |
'file_size_mb': file_size_mb | |
} | |
except Exception as e: | |
logger.error(f"Error reading DICOM file {file_path}: {e}") | |
return None | |
def _read_large_dicom_file(self, file_path: Path) -> Optional[Dict[str, Any]]: | |
"""Read large DICOM file with memory optimization""" | |
try: | |
# Read only metadata first | |
dicom_data = pydicom.dcmread(str(file_path), stop_before_pixels=True) | |
metadata = self._extract_dicom_metadata(dicom_data) | |
# Read image data in chunks if possible | |
if SIMPLEITK_AVAILABLE: | |
return self._read_dicom_with_sitk(file_path, metadata) | |
else: | |
# Fallback: read with reduced resolution | |
dicom_data = pydicom.dcmread(str(file_path)) | |
image_array = dicom_data.pixel_array | |
# Downsample if too large | |
if image_array.nbytes > self.memory_limit_bytes: | |
scale_factor = np.sqrt(self.memory_limit_bytes / image_array.nbytes) | |
new_shape = (int(image_array.shape[0] * scale_factor), | |
int(image_array.shape[1] * scale_factor)) | |
image_array = cv2.resize(image_array, new_shape) | |
logger.info(f"Downsampled DICOM image to {new_shape}") | |
processed_image = self._process_dicom_image(image_array, metadata) | |
return { | |
'image': processed_image, | |
'metadata': metadata, | |
'original_shape': dicom_data.pixel_array.shape, | |
'file_path': str(file_path), | |
'downsampled': True | |
} | |
except Exception as e: | |
logger.error(f"Error reading large DICOM file: {e}") | |
return None | |
def _read_dicom_with_sitk(self, file_path: Path, metadata: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
"""Read DICOM using SimpleITK for better memory management""" | |
try: | |
# Read with SimpleITK | |
image = sitk.ReadImage(str(file_path)) | |
image_array = sitk.GetArrayFromImage(image) | |
# Process image | |
processed_image = self._process_dicom_image(image_array, metadata) | |
return { | |
'image': processed_image, | |
'metadata': metadata, | |
'original_shape': image_array.shape, | |
'file_path': str(file_path), | |
'reader': 'SimpleITK' | |
} | |
except Exception as e: | |
logger.error(f"Error reading DICOM with SimpleITK: {e}") | |
return None | |
def _extract_dicom_metadata(self, dicom_data) -> Dict[str, Any]: | |
"""Extract relevant metadata from DICOM data""" | |
metadata = {} | |
try: | |
# Patient information | |
metadata['patient_id'] = getattr(dicom_data, 'PatientID', 'Unknown') | |
metadata['patient_age'] = getattr(dicom_data, 'PatientAge', 'Unknown') | |
metadata['patient_sex'] = getattr(dicom_data, 'PatientSex', 'Unknown') | |
# Study information | |
metadata['study_date'] = getattr(dicom_data, 'StudyDate', 'Unknown') | |
metadata['study_description'] = getattr(dicom_data, 'StudyDescription', 'Unknown') | |
metadata['modality'] = getattr(dicom_data, 'Modality', 'Unknown') | |
# Image information | |
metadata['rows'] = getattr(dicom_data, 'Rows', 0) | |
metadata['columns'] = getattr(dicom_data, 'Columns', 0) | |
metadata['pixel_spacing'] = getattr(dicom_data, 'PixelSpacing', [1.0, 1.0]) | |
metadata['slice_thickness'] = getattr(dicom_data, 'SliceThickness', 1.0) | |
# Window/Level information for display | |
metadata['window_center'] = getattr(dicom_data, 'WindowCenter', self.default_window_center) | |
metadata['window_width'] = getattr(dicom_data, 'WindowWidth', self.default_window_width) | |
# Ensure window values are scalars | |
if isinstance(metadata['window_center'], (list, tuple)): | |
metadata['window_center'] = metadata['window_center'][0] | |
if isinstance(metadata['window_width'], (list, tuple)): | |
metadata['window_width'] = metadata['window_width'][0] | |
except Exception as e: | |
logger.warning(f"Error extracting DICOM metadata: {e}") | |
return metadata | |
def _process_dicom_image(self, image_array: np.ndarray, | |
metadata: Dict[str, Any]) -> torch.Tensor: | |
"""Process DICOM image array to tensor""" | |
try: | |
# Handle different image dimensions | |
if len(image_array.shape) == 3: | |
# 3D volume - take middle slice for 2D processing | |
middle_slice = image_array.shape[0] // 2 | |
image_array = image_array[middle_slice] | |
# Apply windowing for better contrast | |
window_center = metadata.get('window_center', self.default_window_center) | |
window_width = metadata.get('window_width', self.default_window_width) | |
image_array = self._apply_windowing(image_array, window_center, window_width) | |
# Normalize to 0-1 range | |
image_array = self._normalize_image(image_array) | |
# Resize to standard size | |
if image_array.shape != self.default_output_size: | |
image_array = cv2.resize(image_array, self.default_output_size) | |
# Convert to tensor | |
image_tensor = torch.from_numpy(image_array).float() | |
# Add channel dimension if needed | |
if len(image_tensor.shape) == 2: | |
image_tensor = image_tensor.unsqueeze(0) # Add channel dimension | |
return image_tensor | |
except Exception as e: | |
logger.error(f"Error processing DICOM image: {e}") | |
# Return dummy tensor on error | |
return torch.zeros(1, *self.default_output_size) | |
def _apply_windowing(self, image_array: np.ndarray, | |
window_center: float, window_width: float) -> np.ndarray: | |
"""Apply windowing to DICOM image for better contrast""" | |
try: | |
window_min = window_center - window_width / 2 | |
window_max = window_center + window_width / 2 | |
# Apply windowing | |
windowed_image = np.clip(image_array, window_min, window_max) | |
return windowed_image | |
except Exception as e: | |
logger.warning(f"Error applying windowing: {e}") | |
return image_array | |
def _normalize_image(self, image_array: np.ndarray) -> np.ndarray: | |
"""Normalize image to 0-1 range""" | |
try: | |
# Handle different data types | |
if image_array.dtype == np.uint8: | |
return image_array.astype(np.float32) / 255.0 | |
elif image_array.dtype == np.uint16: | |
return image_array.astype(np.float32) / 65535.0 | |
else: | |
# For other types, normalize to min-max | |
img_min = image_array.min() | |
img_max = image_array.max() | |
if img_max > img_min: | |
return (image_array - img_min) / (img_max - img_min) | |
else: | |
return np.zeros_like(image_array, dtype=np.float32) | |
except Exception as e: | |
logger.warning(f"Error normalizing image: {e}") | |
return image_array.astype(np.float32) | |
def batch_process_dicom_files(self, file_paths: List[str]) -> List[Dict[str, Any]]: | |
"""Process multiple DICOM files with memory management""" | |
results = [] | |
for i, file_path in enumerate(file_paths): | |
logger.info(f"Processing DICOM file {i+1}/{len(file_paths)}: {file_path}") | |
result = self.read_dicom_file(file_path) | |
if result: | |
results.append(result) | |
# Memory cleanup every 10 files | |
if (i + 1) % 10 == 0: | |
import gc | |
gc.collect() | |
logger.debug(f"Memory cleanup after {i+1} files") | |
return results | |
def convert_dicom_to_standard_format(self, dicom_result: Dict[str, Any], | |
output_format: str = 'png') -> Optional[str]: | |
"""Convert processed DICOM to standard image format""" | |
try: | |
image_tensor = dicom_result['image'] | |
# Convert tensor to numpy | |
if isinstance(image_tensor, torch.Tensor): | |
image_array = image_tensor.squeeze().numpy() | |
else: | |
image_array = image_tensor | |
# Convert to 8-bit | |
image_8bit = (image_array * 255).astype(np.uint8) | |
# Create PIL image | |
pil_image = Image.fromarray(image_8bit, mode='L') # Grayscale | |
# Generate output filename | |
input_path = Path(dicom_result['file_path']) | |
output_path = input_path.with_suffix(f'.{output_format}') | |
# Save image | |
pil_image.save(output_path) | |
logger.info(f"Converted DICOM to {output_format}: {output_path}") | |
return str(output_path) | |
except Exception as e: | |
logger.error(f"Error converting DICOM to {output_format}: {e}") | |
return None | |
def get_dicom_statistics(self, dicom_results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
"""Get statistics from processed DICOM files""" | |
if not dicom_results: | |
return {} | |
try: | |
modalities = [r['metadata'].get('modality', 'Unknown') for r in dicom_results] | |
file_sizes = [r.get('file_size_mb', 0) for r in dicom_results] | |
stats = { | |
'total_files': len(dicom_results), | |
'modalities': list(set(modalities)), | |
'modality_counts': {mod: modalities.count(mod) for mod in set(modalities)}, | |
'total_size_mb': sum(file_sizes), | |
'average_size_mb': np.mean(file_sizes) if file_sizes else 0, | |
'size_range_mb': (min(file_sizes), max(file_sizes)) if file_sizes else (0, 0) | |
} | |
return stats | |
except Exception as e: | |
logger.error(f"Error calculating DICOM statistics: {e}") | |
return {} | |