| |
| """ |
| Benchmark script for BackgroundFX Pro. |
| Tests performance across different configurations and hardware. |
| """ |
|
|
| import time |
| import psutil |
| import torch |
| import cv2 |
| import numpy as np |
| from pathlib import Path |
| import json |
| import argparse |
| from typing import Dict, List, Any |
| import statistics |
| from datetime import datetime |
|
|
| |
| import sys |
| sys.path.append(str(Path(__file__).parent.parent)) |
|
|
| from api import ProcessingPipeline, PipelineConfig |
| from models import ModelRegistry, ModelLoader |
|
|
|
|
| class Benchmarker: |
| """Performance benchmarking tool.""" |
| |
| def __init__(self, output_file: str = None): |
| """Initialize benchmarker.""" |
| self.results = { |
| 'timestamp': datetime.now().isoformat(), |
| 'system_info': self._get_system_info(), |
| 'benchmarks': [] |
| } |
| self.output_file = output_file or f"benchmark_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
| |
| def _get_system_info(self) -> Dict[str, Any]: |
| """Collect system information.""" |
| info = { |
| 'cpu': { |
| 'count': psutil.cpu_count(), |
| 'frequency': psutil.cpu_freq().current if psutil.cpu_freq() else 0, |
| 'model': self._get_cpu_model() |
| }, |
| 'memory': { |
| 'total_gb': psutil.virtual_memory().total / (1024**3), |
| 'available_gb': psutil.virtual_memory().available / (1024**3) |
| }, |
| 'gpu': self._get_gpu_info(), |
| 'python_version': sys.version, |
| 'torch_version': torch.__version__, |
| 'cuda_available': torch.cuda.is_available() |
| } |
| return info |
| |
| def _get_cpu_model(self) -> str: |
| """Get CPU model name.""" |
| try: |
| import platform |
| return platform.processor() |
| except: |
| return "Unknown" |
| |
| def _get_gpu_info(self) -> Dict[str, Any]: |
| """Get GPU information.""" |
| if torch.cuda.is_available(): |
| return { |
| 'name': torch.cuda.get_device_name(0), |
| 'memory_gb': torch.cuda.get_device_properties(0).total_memory / (1024**3), |
| 'compute_capability': torch.cuda.get_device_capability(0) |
| } |
| return {'available': False} |
| |
| def benchmark_image_processing(self, |
| sizes: List[tuple] = None, |
| qualities: List[str] = None, |
| num_iterations: int = 5) -> Dict[str, Any]: |
| """Benchmark image processing performance.""" |
| print("\n=== Image Processing Benchmark ===") |
| |
| sizes = sizes or [(512, 512), (1024, 1024), (1920, 1080)] |
| qualities = qualities or ['low', 'medium', 'high'] |
| |
| results = { |
| 'test': 'image_processing', |
| 'iterations': num_iterations, |
| 'results': [] |
| } |
| |
| for size in sizes: |
| for quality in qualities: |
| print(f"Testing {size[0]}x{size[1]} @ {quality} quality...") |
| |
| |
| image = np.random.randint(0, 255, (*size, 3), dtype=np.uint8) |
| |
| |
| config = PipelineConfig( |
| quality_preset=quality, |
| use_gpu=torch.cuda.is_available(), |
| enable_cache=False |
| ) |
| |
| try: |
| pipeline = ProcessingPipeline(config) |
| |
| |
| pipeline.process_image(image, None) |
| |
| |
| times = [] |
| memory_usage = [] |
| |
| for _ in range(num_iterations): |
| start_mem = psutil.Process().memory_info().rss / (1024**2) |
| start_time = time.time() |
| |
| result = pipeline.process_image(image, None) |
| |
| elapsed = time.time() - start_time |
| end_mem = psutil.Process().memory_info().rss / (1024**2) |
| |
| times.append(elapsed) |
| memory_usage.append(end_mem - start_mem) |
| |
| |
| result_data = { |
| 'size': f"{size[0]}x{size[1]}", |
| 'quality': quality, |
| 'avg_time': statistics.mean(times), |
| 'std_time': statistics.stdev(times) if len(times) > 1 else 0, |
| 'min_time': min(times), |
| 'max_time': max(times), |
| 'fps': 1.0 / statistics.mean(times), |
| 'avg_memory_mb': statistics.mean(memory_usage) |
| } |
| |
| results['results'].append(result_data) |
| print(f" Average: {result_data['avg_time']:.3f}s ({result_data['fps']:.1f} FPS)") |
| |
| except Exception as e: |
| print(f" Failed: {str(e)}") |
| results['results'].append({ |
| 'size': f"{size[0]}x{size[1]}", |
| 'quality': quality, |
| 'error': str(e) |
| }) |
| |
| self.results['benchmarks'].append(results) |
| return results |
| |
| def benchmark_model_loading(self) -> Dict[str, Any]: |
| """Benchmark model loading times.""" |
| print("\n=== Model Loading Benchmark ===") |
| |
| results = { |
| 'test': 'model_loading', |
| 'results': [] |
| } |
| |
| registry = ModelRegistry() |
| loader = ModelLoader(registry, device='cuda' if torch.cuda.is_available() else 'cpu') |
| |
| |
| models_to_test = ['rmbg-1.4', 'u2netp', 'modnet'] |
| |
| for model_id in models_to_test: |
| print(f"Loading {model_id}...") |
| |
| |
| loader.unload_all() |
| |
| |
| start_time = time.time() |
| start_mem = psutil.Process().memory_info().rss / (1024**2) |
| |
| try: |
| loaded = loader.load_model(model_id) |
| |
| elapsed = time.time() - start_time |
| end_mem = psutil.Process().memory_info().rss / (1024**2) |
| |
| if loaded: |
| result_data = { |
| 'model': model_id, |
| 'load_time': elapsed, |
| 'memory_usage_mb': end_mem - start_mem, |
| 'device': loaded.device |
| } |
| print(f" Loaded in {elapsed:.2f}s, Memory: {end_mem - start_mem:.1f}MB") |
| else: |
| result_data = { |
| 'model': model_id, |
| 'error': 'Failed to load' |
| } |
| print(f" Failed to load") |
| |
| except Exception as e: |
| result_data = { |
| 'model': model_id, |
| 'error': str(e) |
| } |
| print(f" Error: {str(e)}") |
| |
| results['results'].append(result_data) |
| |
| self.results['benchmarks'].append(results) |
| return results |
| |
| def benchmark_video_processing(self, |
| duration: int = 5, |
| fps: int = 30, |
| size: tuple = (1280, 720)) -> Dict[str, Any]: |
| """Benchmark video processing performance.""" |
| print("\n=== Video Processing Benchmark ===") |
| |
| results = { |
| 'test': 'video_processing', |
| 'video_specs': { |
| 'duration': duration, |
| 'fps': fps, |
| 'size': f"{size[0]}x{size[1]}", |
| 'total_frames': duration * fps |
| }, |
| 'results': [] |
| } |
| |
| |
| import tempfile |
| video_path = Path(tempfile.mkdtemp()) / "test_video.mp4" |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| out = cv2.VideoWriter(str(video_path), fourcc, fps, size) |
| |
| print(f"Creating test video: {duration}s @ {fps}fps, {size[0]}x{size[1]}") |
| for i in range(duration * fps): |
| frame = np.random.randint(0, 255, (*size[::-1], 3), dtype=np.uint8) |
| |
| x = int((i / (duration * fps)) * size[0]) |
| cv2.rectangle(frame, (x, 100), (x + 100, 200), (0, 255, 0), -1) |
| out.write(frame) |
| out.release() |
| |
| |
| for quality in ['low', 'medium', 'high']: |
| print(f"Processing at {quality} quality...") |
| |
| from api import VideoProcessorAPI |
| processor = VideoProcessorAPI() |
| |
| start_time = time.time() |
| start_mem = psutil.Process().memory_info().rss / (1024**2) |
| |
| try: |
| output_path = video_path.parent / f"output_{quality}.mp4" |
| stats = processor.process_video( |
| str(video_path), |
| str(output_path), |
| background=None |
| ) |
| |
| elapsed = time.time() - start_time |
| end_mem = psutil.Process().memory_info().rss / (1024**2) |
| |
| result_data = { |
| 'quality': quality, |
| 'total_time': elapsed, |
| 'frames_processed': stats.frames_processed, |
| 'processing_fps': stats.processing_fps, |
| 'time_per_frame': elapsed / stats.frames_processed if stats.frames_processed > 0 else 0, |
| 'memory_usage_mb': end_mem - start_mem |
| } |
| |
| print(f" Processed in {elapsed:.2f}s @ {stats.processing_fps:.1f} FPS") |
| |
| except Exception as e: |
| result_data = { |
| 'quality': quality, |
| 'error': str(e) |
| } |
| print(f" Failed: {str(e)}") |
| |
| results['results'].append(result_data) |
| |
| |
| video_path.unlink(missing_ok=True) |
| |
| self.results['benchmarks'].append(results) |
| return results |
| |
| def benchmark_batch_processing(self, |
| batch_sizes: List[int] = None, |
| num_workers_list: List[int] = None) -> Dict[str, Any]: |
| """Benchmark batch processing performance.""" |
| print("\n=== Batch Processing Benchmark ===") |
| |
| batch_sizes = batch_sizes or [1, 5, 10, 20] |
| num_workers_list = num_workers_list or [1, 2, 4, 8] |
| |
| results = { |
| 'test': 'batch_processing', |
| 'results': [] |
| } |
| |
| |
| test_images = [] |
| for i in range(max(batch_sizes)): |
| img = np.random.randint(0, 255, (512, 512, 3), dtype=np.uint8) |
| test_images.append(img) |
| |
| for batch_size in batch_sizes: |
| for num_workers in num_workers_list: |
| print(f"Testing batch_size={batch_size}, workers={num_workers}...") |
| |
| config = PipelineConfig( |
| batch_size=batch_size, |
| num_workers=num_workers, |
| use_gpu=torch.cuda.is_available(), |
| enable_cache=False |
| ) |
| |
| try: |
| pipeline = ProcessingPipeline(config) |
| |
| start_time = time.time() |
| results_batch = pipeline.process_batch(test_images[:batch_size]) |
| elapsed = time.time() - start_time |
| |
| successful = sum(1 for r in results_batch if r.success) |
| |
| result_data = { |
| 'batch_size': batch_size, |
| 'num_workers': num_workers, |
| 'total_time': elapsed, |
| 'time_per_image': elapsed / batch_size, |
| 'throughput': batch_size / elapsed, |
| 'successful': successful |
| } |
| |
| print(f" {elapsed:.2f}s total, {result_data['throughput']:.1f} images/sec") |
| |
| except Exception as e: |
| result_data = { |
| 'batch_size': batch_size, |
| 'num_workers': num_workers, |
| 'error': str(e) |
| } |
| print(f" Failed: {str(e)}") |
| |
| results['results'].append(result_data) |
| |
| self.results['benchmarks'].append(results) |
| return results |
| |
| def save_results(self): |
| """Save benchmark results to file.""" |
| with open(self.output_file, 'w') as f: |
| json.dump(self.results, f, indent=2) |
| print(f"\nResults saved to: {self.output_file}") |
| |
| def print_summary(self): |
| """Print benchmark summary.""" |
| print("\n" + "="*50) |
| print("BENCHMARK SUMMARY") |
| print("="*50) |
| |
| for benchmark in self.results['benchmarks']: |
| print(f"\n{benchmark['test'].upper()}:") |
| |
| if 'results' in benchmark: |
| for result in benchmark['results']: |
| if 'error' not in result: |
| if benchmark['test'] == 'image_processing': |
| print(f" {result['size']} @ {result['quality']}: {result['fps']:.1f} FPS") |
| elif benchmark['test'] == 'model_loading': |
| print(f" {result['model']}: {result['load_time']:.2f}s") |
| elif benchmark['test'] == 'video_processing': |
| print(f" {result['quality']}: {result['processing_fps']:.1f} FPS") |
| elif benchmark['test'] == 'batch_processing': |
| print(f" Batch {result['batch_size']} x {result['num_workers']} workers: {result['throughput']:.1f} img/s") |
|
|
|
|
| def main(): |
| """Main benchmark function.""" |
| parser = argparse.ArgumentParser(description='BackgroundFX Pro Performance Benchmark') |
| parser.add_argument('--tests', nargs='+', |
| choices=['image', 'model', 'video', 'batch', 'all'], |
| default=['all'], |
| help='Tests to run') |
| parser.add_argument('--output', '-o', help='Output file for results') |
| parser.add_argument('--iterations', '-i', type=int, default=5, |
| help='Number of iterations for each test') |
| |
| args = parser.parse_args() |
| |
| benchmarker = Benchmarker(args.output) |
| |
| tests_to_run = args.tests |
| if 'all' in tests_to_run: |
| tests_to_run = ['image', 'model', 'video', 'batch'] |
| |
| print("BackgroundFX Pro Performance Benchmark") |
| print("="*50) |
| print("System Information:") |
| print(f" CPU: {benchmarker.results['system_info']['cpu']['model']}") |
| print(f" Memory: {benchmarker.results['system_info']['memory']['total_gb']:.1f}GB") |
| if benchmarker.results['system_info']['cuda_available']: |
| print(f" GPU: {benchmarker.results['system_info']['gpu']['name']}") |
| else: |
| print(" GPU: Not available") |
| |
| |
| if 'image' in tests_to_run: |
| benchmarker.benchmark_image_processing(num_iterations=args.iterations) |
| |
| if 'model' in tests_to_run: |
| benchmarker.benchmark_model_loading() |
| |
| if 'video' in tests_to_run: |
| benchmarker.benchmark_video_processing() |
| |
| if 'batch' in tests_to_run: |
| benchmarker.benchmark_batch_processing() |
| |
| |
| benchmarker.save_results() |
| benchmarker.print_summary() |
|
|
|
|
| if __name__ == "__main__": |
| main() |