#!/usr/bin/env python3 """ Machine learning pose classification script. Features: 1. Train classifiers on pose landmark inputs 2. Use selected landmark coordinates as features 3. Use folder names as class labels 4. Train and evaluate models Usage: python ml_pose_classifier.py [--data DATA_DIR] [--model MODEL_TYPE] [--test-size RATIO] """ import json import argparse import numpy as np import time from pathlib import Path from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.svm import SVC from sklearn.linear_model import LogisticRegression from sklearn.model_selection import train_test_split, cross_val_score from sklearn.metrics import classification_report, confusion_matrix, accuracy_score from sklearn.preprocessing import StandardScaler, LabelEncoder # from sklearn.pipeline import Pipeline # not used from sklearn.neural_network import MLPRegressor import joblib import matplotlib.pyplot as plt # seaborn is optional; used only for confusion matrix plotting try: import seaborn as sns SEABORN_AVAILABLE = True except ImportError: SEABORN_AVAILABLE = False # ONNX related imports try: from skl2onnx import convert_sklearn from skl2onnx.common.data_types import FloatTensorType # onnx is not required here; we import it lazily where needed ONNX_AVAILABLE = True except ImportError: ONNX_AVAILABLE = False # ONNX Runtime import try: # onnxruntime is optional and not required unless ONNX runtime testing is implemented ONNX_RUNTIME_AVAILABLE = False except ImportError: ONNX_RUNTIME_AVAILABLE = False class PoseClassifier: def __init__(self, model_type='random_forest'): """ Initialize the pose classifier. Args: model_type: model type ('random_forest', 'svm', 'gradient_boost', 'logistic', 'distilled_rf') """ self.model_type = model_type self.model = None self.student_model = None # If distillation is used, save student (MLP) model self.scaler = StandardScaler() self.label_encoder = LabelEncoder() # Define joints we want to use (based on MediaPipe keypoint indices) self.target_joints = [ 'nose', # Head (nose as reference, but will actually be 0,0,0) 'left_shoulder', # Left shoulder 'right_shoulder', # Right shoulder 'left_elbow', # Left elbow 'right_elbow', # Right elbow 'left_wrist', # Left wrist 'right_wrist', # Right wrist 'left_hip', # Left hip 'right_hip', # Right hip 'left_knee', # Left knee 'right_knee', # Right knee 'left_ankle', # Left ankle 'right_ankle' # Right ankle ] self.feature_columns = [] for joint in self.target_joints: self.feature_columns.extend([f'{joint}_x', f'{joint}_y', f'{joint}_z']) print(f"Target joints: {len(self.target_joints)}") print(f"Feature dimension: {len(self.feature_columns)}") print("Joint list:", ', '.join(self.target_joints)) def _get_model(self): """Create a classifier based on the selected model type.""" if self.model_type == 'random_forest': return RandomForestClassifier( n_estimators=100, max_depth=15, min_samples_split=5, min_samples_leaf=2, random_state=42, n_jobs=-1 ) elif self.model_type == 'svm': return SVC( C=1.0, kernel='rbf', gamma='scale', random_state=42 ) elif self.model_type == 'gradient_boost': return GradientBoostingClassifier( n_estimators=100, learning_rate=0.1, max_depth=6, random_state=42 ) elif self.model_type == 'logistic': return LogisticRegression( C=10.0, # Increase regularization parameter to improve model complexity max_iter=2000, # Increase maximum iterations solver='lbfgs', # Use L-BFGS solver, suitable for small datasets multi_class='multinomial', # Multi-class strategy random_state=42, n_jobs=-1 ) elif self.model_type == 'distilled_rf': # Teacher uses random forest (returns an RF for training process) return RandomForestClassifier( n_estimators=100, max_depth=15, min_samples_split=5, min_samples_leaf=2, random_state=42, n_jobs=-1 ) else: raise ValueError(f"Unsupported model type: {self.model_type}") def load_data(self, data_dir): """ Load pose data from JSON files Args: data_dir: Data directory containing label folders Returns: tuple: (feature data, labels) """ data_path = Path(data_dir) all_features = [] all_labels = [] print(f"Loading data from: {data_path}") # Iterate over each label directory for label_dir in data_path.iterdir(): if not label_dir.is_dir() or not label_dir.name.startswith('label_'): continue label = label_dir.name json_files = list(label_dir.glob('*.json')) print(f"Processing {label}: {len(json_files)} files") for json_file in json_files: try: with open(json_file, 'r', encoding='utf-8') as f: data = json.load(f) landmarks = data.get('landmarks', {}) # Extract coordinates of target joints features = [] missing_joints = [] for joint in self.target_joints: if joint in landmarks: joint_data = landmarks[joint] features.extend([ joint_data.get('x', 0.0), joint_data.get('y', 0.0), joint_data.get('z', 0.0) ]) else: # If a joint is missing, fill with zeros features.extend([0.0, 0.0, 0.0]) missing_joints.append(joint) if len(features) == len(self.feature_columns): all_features.append(features) all_labels.append(label) else: print(f"Skipping file {json_file}: feature dimension mismatch") if missing_joints: print(f"File {json_file.name} missing joints: {missing_joints}") except Exception as e: print(f"Error reading file {json_file}: {e}") continue print(f"Loaded {len(all_features)} samples") # count samples per label label_counts = {} for label in all_labels: label_counts[label] = label_counts.get(label, 0) + 1 print("Label distribution:") for label, count in sorted(label_counts.items()): print(f" {label}: {count} samples") return np.array(all_features), np.array(all_labels) def train(self, X, y, test_size=0.2): """ Train the classifier. Args: X: feature data y: labels test_size: ratio for test split Returns: dict: a dictionary containing training results """ print(f"\nStarting training for model: {self.model_type}...") print(f"Data shape: {X.shape}") print(f"Number of labels: {len(np.unique(y))}") # Encode labels y_encoded = self.label_encoder.fit_transform(y) # Split data X_train, X_test, y_train, y_test = train_test_split( X, y_encoded, test_size=test_size, random_state=42, stratify=y_encoded ) print(f"Train set size: {X_train.shape[0]}") print(f"Test set size: {X_test.shape[0]}") # standardize features X_train_scaled = self.scaler.fit_transform(X_train) X_test_scaled = self.scaler.transform(X_test) # If using distillation process: train RF teacher first, then train MLPRegressor student to fit teacher's predict_proba if self.model_type == 'distilled_rf': print("Using distillation: train RandomForest teacher, then fit an MLPRegressor student to teacher soft labels") # Train teacher teacher = self._get_model() teacher.fit(X_train_scaled, y_train) # Get teacher's probability distribution as soft labels y_train_proba = teacher.predict_proba(X_train_scaled) # Create and train student (MLPRegressor) to fit probability vectors student = MLPRegressor(hidden_layer_sizes=(128, 64, 32), activation='relu', solver='adam', max_iter=1000, learning_rate_init=0.001, random_state=42, early_stopping=True, validation_fraction=0.1) print("Training student model to fit teacher probability outputs...") print(f"Teacher probability output shape: {y_train_proba.shape}") # Multi-output regression, target is probability vector student.fit(X_train_scaled, y_train_proba) # Save models self.model = teacher self.student_model = student # Use student to predict on train/test sets y_train_pred_proba = student.predict(X_train_scaled) y_test_pred_proba = student.predict(X_test_scaled) # Apply softmax to ensure probabilities sum to 1 def softmax(x): exp_x = np.exp(x - np.max(x, axis=1, keepdims=True)) return exp_x / np.sum(exp_x, axis=1, keepdims=True) y_train_pred_proba = softmax(y_train_pred_proba) y_test_pred_proba = softmax(y_test_pred_proba) y_train_pred = np.argmax(y_train_pred_proba, axis=1) y_test_pred = np.argmax(y_test_pred_proba, axis=1) print(f"Student predicted probability shape: {y_test_pred_proba.shape}") print(f"Student training accuracy: {accuracy_score(y_train, y_train_pred):.4f}") else: # Standard flow: train a single model self.model = self._get_model() self.model.fit(X_train_scaled, y_train) y_train_pred = self.model.predict(X_train_scaled) y_test_pred = self.model.predict(X_test_scaled) # compute accuracies train_accuracy = accuracy_score(y_train, y_train_pred) test_accuracy = accuracy_score(y_test, y_test_pred) # cross validation on the model used for training # if student_model exists, still use teacher for cross-val cv_model = self.model if self.model is not None else None if cv_model is not None: cv_scores = cross_val_score(cv_model, X_train_scaled, y_train, cv=5) else: cv_scores = np.array([]) print("\nTraining results:") print(f"Train accuracy: {train_accuracy:.4f}") print(f"Test accuracy: {test_accuracy:.4f}") print(f"5-fold CV accuracy: {cv_scores.mean():.4f} ± {cv_scores.std():.4f}") # classification report print("\nTest set classification report:") target_names = self.label_encoder.classes_ print(classification_report(y_test, y_test_pred, target_names=target_names)) # confusion matrix cm = confusion_matrix(y_test, y_test_pred) return { 'train_accuracy': train_accuracy, 'test_accuracy': test_accuracy, 'cv_scores': cv_scores, 'confusion_matrix': cm, 'target_names': target_names, 'X_test': X_test_scaled, 'y_test': y_test, 'y_test_pred': y_test_pred } def save_model(self, filepath): """Save trained model to disk.""" model_data = { 'model': self.model, 'scaler': self.scaler, 'label_encoder': self.label_encoder, 'model_type': self.model_type, 'target_joints': self.target_joints, 'feature_columns': self.feature_columns } joblib.dump(model_data, filepath) print(f"Model saved to: {filepath}") def load_model(self, filepath): """Load trained model from disk.""" model_data = joblib.load(filepath) self.model = model_data['model'] self.scaler = model_data['scaler'] self.label_encoder = model_data['label_encoder'] self.model_type = model_data['model_type'] self.target_joints = model_data['target_joints'] self.feature_columns = model_data['feature_columns'] print(f"Model loaded from: {filepath}") def predict(self, X): """Run prediction on input features.""" if self.model is None and self.student_model is None: raise ValueError("Model not trained or loaded") X_scaled = self.scaler.transform(X) # Prefer to use student_model (if exists) to generate probability output if self.student_model is not None: proba = self.student_model.predict(X_scaled) # Returns probability vector preds = np.argmax(proba, axis=1) labels = self.label_encoder.inverse_transform(preds) return labels, proba # Otherwise fall back to original model predictions = self.model.predict(X_scaled) probabilities = None if hasattr(self.model, 'predict_proba'): probabilities = self.model.predict_proba(X_scaled) return self.label_encoder.inverse_transform(predictions), probabilities def predict_single_json(self, json_path): """ Predict pose class for a single JSON file. Args: json_path: path to the JSON file Returns: dict: prediction details or error information """ if self.model is None: raise ValueError("Model not trained or loaded") try: # Read JSON file with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) landmarks = data.get('landmarks', {}) # Extract coordinates of target joints features = [] missing_joints = [] available_joints = [] for joint in self.target_joints: if joint in landmarks: joint_data = landmarks[joint] features.extend([ joint_data.get('x', 0.0), joint_data.get('y', 0.0), joint_data.get('z', 0.0) ]) available_joints.append(joint) else: # If a joint is missing, fill with zeros features.extend([0.0, 0.0, 0.0]) missing_joints.append(joint) if len(features) != len(self.feature_columns): raise ValueError(f"Feature dimension mismatch: expected {len(self.feature_columns)}, got {len(features)}") # Convert to numpy array and predict X = np.array([features]) predictions, probabilities = self.predict(X) # build result dict result = { 'file_path': str(json_path), 'file_name': Path(json_path).name, 'predicted_label': predictions[0], 'confidence_scores': {}, 'available_joints': available_joints, 'missing_joints': missing_joints, 'joint_coverage': f"{len(available_joints)}/{len(self.target_joints)}" } # add per-class confidence scores if probabilities is not None: for i, label in enumerate(self.label_encoder.classes_): result['confidence_scores'][label] = float(probabilities[0][i]) # highest confidence max_prob_idx = np.argmax(probabilities[0]) result['max_confidence'] = float(probabilities[0][max_prob_idx]) return result except Exception as e: return { 'file_path': str(json_path), 'file_name': Path(json_path).name, 'error': str(e), 'predicted_label': None } def evaluate_test_directory(self, test_dir): """ Evaluate all data in a test directory. Args: test_dir: path to the test data directory Returns: dict: dictionary containing detailed evaluation results """ if self.model is None: raise ValueError("Model not trained or loaded") test_path = Path(test_dir) if not test_path.exists(): raise ValueError(f"Test directory does not exist: {test_dir}") # start timing start_time = time.time() print(f"Starting evaluation on test dataset: {test_path}") print(f"Start time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}") # store all prediction results all_results = [] label_stats = {} total_prediction_time = 0.0 prediction_count = 0 # iterate over label folders for label_dir in test_path.iterdir(): if not label_dir.is_dir() or not label_dir.name.startswith('label_'): continue true_label = label_dir.name json_files = list(label_dir.glob('*.json')) print(f"Evaluating {true_label}: {len(json_files)} files") label_stats[true_label] = { 'total': len(json_files), 'correct': 0, 'incorrect': 0, 'errors': 0, 'predictions': {}, 'confidence_scores': [], 'prediction_times': [] } for json_file in json_files: # Single prediction timing pred_start_time = time.time() result = self.predict_single_json(json_file) pred_end_time = time.time() single_prediction_time = pred_end_time - pred_start_time total_prediction_time += single_prediction_time prediction_count += 1 if 'error' in result: label_stats[true_label]['errors'] += 1 print(f" Error: {json_file.name} - {result['error']}") continue predicted_label = result['predicted_label'] is_correct = predicted_label == true_label if is_correct: label_stats[true_label]['correct'] += 1 else: label_stats[true_label]['incorrect'] += 1 # Count prediction distribution if predicted_label not in label_stats[true_label]['predictions']: label_stats[true_label]['predictions'][predicted_label] = 0 label_stats[true_label]['predictions'][predicted_label] += 1 # Record confidence and prediction time if 'max_confidence' in result: label_stats[true_label]['confidence_scores'].append(result['max_confidence']) label_stats[true_label]['prediction_times'].append(single_prediction_time) # Save detailed result all_results.append({ 'file_path': str(json_file), 'file_name': json_file.name, 'true_label': true_label, 'predicted_label': predicted_label, 'is_correct': is_correct, 'confidence': result.get('max_confidence', 0.0), 'confidence_scores': result.get('confidence_scores', {}), 'joint_coverage': result.get('joint_coverage', '0/13'), 'prediction_time': single_prediction_time }) # end timing end_time = time.time() total_execution_time = end_time - start_time # compute aggregate statistics total_samples = sum(stats['total'] for stats in label_stats.values()) total_correct = sum(stats['correct'] for stats in label_stats.values()) total_errors = sum(stats['errors'] for stats in label_stats.values()) total_tested = total_samples - total_errors overall_accuracy = total_correct / total_tested if total_tested > 0 else 0.0 avg_prediction_time = total_prediction_time / prediction_count if prediction_count > 0 else 0.0 # build confusion matrix confusion_matrix = {} for true_label in label_stats.keys(): confusion_matrix[true_label] = {} for predicted_label in label_stats.keys(): confusion_matrix[true_label][predicted_label] = 0 for result in all_results: if result.get('is_correct') is not None: # exclude error cases true_label = result['true_label'] predicted_label = result['predicted_label'] confusion_matrix[true_label][predicted_label] += 1 return { 'label_stats': label_stats, 'overall_accuracy': overall_accuracy, 'total_samples': total_samples, 'total_correct': total_correct, 'total_errors': total_errors, 'total_tested': total_tested, 'confusion_matrix': confusion_matrix, 'detailed_results': all_results, 'timing_stats': { 'total_execution_time': total_execution_time, 'total_prediction_time': total_prediction_time, 'avg_prediction_time': avg_prediction_time, 'prediction_count': prediction_count, 'start_time': start_time, 'end_time': end_time, 'overhead_time': total_execution_time - total_prediction_time } } def print_evaluation_report(self, eval_results): """ Print a detailed evaluation report. Args: eval_results: dictionary returned by evaluate_test_directory """ timing_stats = eval_results.get('timing_stats', {}) print("\n" + "=" * 80) print("Test dataset evaluation report") print("=" * 80) # Overall statistics print(f"Total samples: {eval_results['total_samples']}") print(f"Successfully tested: {eval_results['total_tested']}") print(f"Errors: {eval_results['total_errors']}") print( f"Overall accuracy: {eval_results['overall_accuracy']:.4f} " f"({eval_results['total_correct']}/{eval_results['total_tested']})" ) # Timing statistics if timing_stats: total_time = timing_stats['total_execution_time'] prediction_time = timing_stats['total_prediction_time'] avg_time = timing_stats['avg_prediction_time'] overhead_time = timing_stats['overhead_time'] prediction_count = timing_stats['prediction_count'] print("\nTiming statistics:") print("-" * 50) print(f"Total execution time: {total_time:.4f} s") print(f"Total prediction time: {prediction_time:.4f} s") print(f"Overhead time: {overhead_time:.4f} s") print(f"Average prediction time: {avg_time * 1000:.2f} ms") print(f"Prediction throughput: {prediction_count / total_time:.2f} preds/s") print( f"Prediction efficiency: {(prediction_time / total_time) * 100:.1f}% " f"(prediction time / total)" ) # Per-label detailed statistics print("\nPer-label stats:") print("-" * 80) print( f"{'Label':<10} {'Total':<6} {'Correct':<6} {'Wrong':<6} " f"{'Accuracy':<8} {'AvgConf':<10} {'AvgPredTime':<12}" ) print("-" * 80) for label, stats in sorted(eval_results['label_stats'].items()): accuracy = ( stats['correct'] / (stats['total'] - stats['errors']) if (stats['total'] - stats['errors']) > 0 else 0.0 ) avg_confidence = ( np.mean(stats['confidence_scores']) if stats['confidence_scores'] else 0.0 ) avg_pred_time = ( np.mean(stats['prediction_times']) if 'prediction_times' in stats and stats['prediction_times'] else 0.0 ) print( f"{label:<10} {stats['total']:<6} {stats['correct']:<6} {stats['incorrect']:<6} " f"{accuracy:.4f} {avg_confidence:.4f} {avg_pred_time * 1000:.2f}ms" ) # Confusion matrix print("\nConfusion matrix:") print("-" * 60) labels = sorted(eval_results['label_stats'].keys()) # Header row print(f"{'True\\Pred':<12}", end="") for label in labels: print(f"{label:<10}", end="") print() # Data rows for true_label in labels: print(f"{true_label:<12}", end="") for pred_label in labels: count = eval_results['confusion_matrix'][true_label][pred_label] print(f"{count:<10}", end="") print() # Per-label prediction distribution print("\nPer-label prediction distribution:") print("-" * 80) for true_label, stats in sorted(eval_results['label_stats'].items()): if stats['predictions']: print(f"{true_label}:") total_predictions = sum(stats['predictions'].values()) for pred_label, count in sorted(stats['predictions'].items()): percentage = (count / total_predictions) * 100 print(f" -> {pred_label}: {count} ({percentage:.1f}%)") # Error analysis print("\nError analysis:") print("-" * 40) incorrect_results = [r for r in eval_results['detailed_results'] if not r['is_correct']] if incorrect_results: # Sort by confidence and show top mistaken predictions incorrect_results.sort(key=lambda x: x['confidence'], reverse=True) print("Highest-confidence incorrect predictions (top 10):") for i, result in enumerate(incorrect_results[:10]): pred_time = result.get('prediction_time', 0) * 1000 # ms print( f"{i + 1:2d}. {result['file_name']}: {result['true_label']} -> {result['predicted_label']} " f"(conf: {result['confidence']:.4f}, time: {pred_time:.2f}ms)" ) else: print("No incorrect predictions found.") # Performance analysis if timing_stats and eval_results['detailed_results']: print("\nPerformance analysis:") print("-" * 40) prediction_times = [ r.get('prediction_time', 0) for r in eval_results['detailed_results'] if 'prediction_time' in r ] if prediction_times: min_time = min(prediction_times) * 1000 max_time = max(prediction_times) * 1000 median_time = np.median(prediction_times) * 1000 std_time = np.std(prediction_times) * 1000 print("Prediction time distribution:") print(f" Fastest: {min_time:.2f}ms") print(f" Slowest: {max_time:.2f}ms") print(f" Median: {median_time:.2f}ms") print(f" Stddev: {std_time:.2f}ms") print("\n" + "=" * 80) def plot_confusion_matrix(self, cm, target_names, save_path=None): """Plot confusion matrix.""" plt.figure(figsize=(10, 8)) if SEABORN_AVAILABLE: sns.heatmap( cm, annot=True, fmt='d', cmap='Blues', xticklabels=target_names, yticklabels=target_names, ) else: # Fallback using matplotlib only im = plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues) plt.colorbar(im) tick_marks = np.arange(len(target_names)) plt.xticks(tick_marks, target_names, rotation=45, ha='right') plt.yticks(tick_marks, target_names) # Annotate cells thresh = cm.max() / 2.0 if cm.size else 0 for i in range(cm.shape[0]): for j in range(cm.shape[1]): plt.text(j, i, format(cm[i, j], 'd'), ha="center", va="center", color="white" if cm[i, j] > thresh else "black") plt.title(f"{self.model_type.title()} model confusion matrix") plt.xlabel('Predicted') plt.ylabel('True') if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight') print(f"Confusion matrix saved to: {save_path}") plt.show() def export_to_onnx(self, model_type='random_forest', output_path=None): """ Export the trained model to ONNX format (only models supported by Barracuda). Note: Barracuda does not support LinearClassifier layers (e.g., LogisticRegression/SVM) — only tree models are supported. """ if not ONNX_AVAILABLE: print("Error: ONNX export is unavailable. Please install skl2onnx and onnx packages:") print("pip install skl2onnx onnx") return None if not hasattr(self, 'model') or self.model is None: print("Error: Model is not trained yet. Please train the model first.") return None # Check if current model type matches requested export type if hasattr(self, 'model_type') and self.model_type != model_type: print(f"Warning: Currently trained {self.model_type} model, but requested to export {model_type} model") print(f"Will export currently trained {self.model_type} model") model_name = self.model_type else: model_name = model_type # Barracuda only supports tree models, not LinearClassifier if model_name in ['logistic', 'svm']: print(f"❌ Barracuda/Unity does not support ONNX import for {model_name} models (LinearClassifier layer).") print("Please use random_forest or gradient_boost for export.") return None # If student_model exists -> export student_model (MLP), otherwise export self.model model_to_export = None export_name = None if self.student_model is not None: model_to_export = self.student_model export_name = 'distilled_mlp' print("Detected student_model. Exporting student (MLP) to ONNX (suitable for Unity/Barracuda).") else: model_to_export = self.model export_name = model_name if model_to_export is None: print("Error: No model available for export.") return None # Generate output file path if output_path is None: output_path = f"pose_classifier_{export_name}.onnx" print(f"About to export model to: {output_path}, export target: {export_name}") try: feature_count = len(self.target_joints) * 3 initial_type = [('float_input', FloatTensorType([None, feature_count]))] onnx_model = convert_sklearn( model_to_export, initial_types=initial_type, target_opset=12 ) with open(output_path, "wb") as f: f.write(onnx_model.SerializeToString()) print(f"✅ Successfully exported {export_name} model to ONNX format: {output_path}") # Save label mapping and Scaler parameters label_mapping_path = output_path.replace('.onnx', '_labels.json') label_mapping = { 'label_encoder_classes': self.label_encoder.classes_.tolist(), 'model_type': export_name, 'feature_count': feature_count, 'target_joints': self.target_joints, 'description': f'Pose classifier - {len(self.target_joints)} landmarks with x,y,z coordinates', 'scaler_mean': self.scaler.mean_.tolist(), 'scaler_scale': self.scaler.scale_.tolist() } with open(label_mapping_path, 'w', encoding='utf-8') as f: json.dump(label_mapping, f, ensure_ascii=False, indent=2) print(f"✅ Label mapping and scaler parameters saved to: {label_mapping_path}") print("⚠️ Note: The exported ONNX expects inputs to be standardized with scaler_mean/scaler_scale.") return output_path except Exception as e: print(f"❌ ONNX export failed: {str(e)}") import traceback traceback.print_exc() return None def export_to_tflite(self, output_path=None): """ Export student_model (MLP) to TFLite format. Dependencies: skl2onnx, onnx, onnx-tf, tensorflow """ if self.student_model is None: print("❌ Only exporting student_model (MLPRegressor) to TFLite is supported. Please train with --model distilled_rf first.") return None try: import onnx from skl2onnx import convert_sklearn from skl2onnx.common.data_types import FloatTensorType from onnx_tf.backend import prepare import tensorflow as tf except ImportError: print("❌ You need to install skl2onnx, onnx, onnx-tf, tensorflow.") print("pip install skl2onnx onnx onnx-tf tensorflow") return None feature_count = len(self.target_joints) * 3 initial_type = [('float_input', FloatTensorType([None, feature_count]))] # 1. Export to ONNX print("Exporting student_model to ONNX...") onnx_model = convert_sklearn( self.student_model, initial_types=initial_type, target_opset=12 ) onnx_path = "temp_student.onnx" with open(onnx_path, "wb") as f: f.write(onnx_model.SerializeToString()) print(f"✅ ONNX export successful: {onnx_path}") # 2. ONNX -> TensorFlow SavedModel print("Converting ONNX to TensorFlow SavedModel...") tf_model = prepare(onnx.load(onnx_path)) tf_saved_path = "temp_student_tf" tf_model.export_graph(tf_saved_path) print(f"✅ SavedModel export successful: {tf_saved_path}") # 3. SavedModel -> TFLite print("Converting SavedModel to TFLite...") converter = tf.lite.TFLiteConverter.from_saved_model(tf_saved_path) tflite_model = converter.convert() if output_path is None: output_path = "pose_classifier_distilled_mlp.tflite" with open(output_path, "wb") as f: f.write(tflite_model) print(f"✅ TFLite export successful: {output_path}") # Cleanup temporary files (optional) import os os.remove(onnx_path) import shutil shutil.rmtree(tf_saved_path, ignore_errors=True) return output_path def main(): parser = argparse.ArgumentParser(description="Pose classification machine learning script") parser.add_argument("--data", "-d", default="PoseData", help="Pose data directory (default: PoseData)") parser.add_argument( "--model", "-m", choices=['random_forest', 'svm', 'gradient_boost', 'logistic', 'distilled_rf'], default='random_forest', help="Model type (default: random_forest)", ) parser.add_argument("--test-size", "-t", type=float, default=0.2, help="Test set ratio (default: 0.2)") parser.add_argument("--save-model", "-s", help="Path to save the trained model") parser.add_argument("--load-model", "-l", help="Path to load an already trained model") parser.add_argument("--predict", "-p", help="Path of a single JSON file to predict") parser.add_argument("--evaluate", "-e", help="Path of a test directory to evaluate all JSON files") parser.add_argument("--no-plot", action="store_true", help="Do not display confusion matrix plot") parser.add_argument("--train", action="store_true", help="Force training even if --load-model is provided") parser.add_argument("--export-onnx", help="Export model to ONNX format; specify output file path") parser.add_argument( "--export-model-type", choices=['random_forest', 'logistic', 'distilled_rf'], default='random_forest', help="Model type to export (default: random_forest)", ) parser.add_argument("--test-onnx", help="Test an ONNX model; specify ONNX file path") parser.add_argument("--onnx-labels", help="ONNX label mapping JSON path (auto-detect if not provided)") parser.add_argument("--onnx-test-data", help="ONNX batch test data directory (if not provided, single-sample test)") parser.add_argument( "--export-tflite", help="Export model to TFLite format; specify output path (supported for distilled_rf student model only)", ) args = parser.parse_args() print("Pose classification ML tool") print("=" * 60) # If ONNX test mode if args.test_onnx: print("ONNX model test mode") print(f"ONNX model: {args.test_onnx}") print("=" * 60) # Create classifier instance for testing classifier = PoseClassifier() # Note: test_onnx_model is not implemented in this script; this is a placeholder. # You can implement it later if needed. print("ONNX test requested but functionality is not implemented in this script.") return # If evaluation mode if args.evaluate: if not args.load_model: # Try to use default model file default_model = f"pose_classifier_{args.model}.pkl" if Path(default_model).exists(): args.load_model = default_model else: print( f"Error: Need to specify model file path (--load-model) or ensure default model file exists: {default_model}" ) return print("Evaluation mode") print(f"Test data directory: {args.evaluate}") print(f"Model file: {args.load_model}") print("=" * 60) # Create classifier and load model classifier = PoseClassifier(model_type=args.model) classifier.load_model(args.load_model) # Perform comprehensive evaluation try: eval_results = classifier.evaluate_test_directory(args.evaluate) classifier.print_evaluation_report(eval_results) except Exception as e: print(f"Error during evaluation: {e}") return # Prediction-only mode if args.predict: if not args.load_model: # Try to use default model file default_model = f"pose_classifier_{args.model}.pkl" if Path(default_model).exists(): args.load_model = default_model else: print( f"Error: Need to specify model file path (--load-model) or ensure default model file exists: {default_model}" ) return print("Prediction mode") print(f"JSON file: {args.predict}") print(f"Model file: {args.load_model}") print("=" * 60) # Create classifier and load model classifier = PoseClassifier(model_type=args.model) classifier.load_model(args.load_model) # Run prediction result = classifier.predict_single_json(args.predict) # Show prediction result print("\nPrediction result:") print(f"File: {result['file_name']}") if 'error' in result: print(f"Error: {result['error']}") else: print(f"Predicted label: {result['predicted_label']}") print(f"Joint coverage: {result['joint_coverage']}") if result['confidence_scores']: print(f"Max confidence: {result['max_confidence']:.4f}") print("\nPer-class confidence:") sorted_scores = sorted(result['confidence_scores'].items(), key=lambda x: x[1], reverse=True) for label, score in sorted_scores: print(f" {label}: {score:.4f}") if result['missing_joints']: print(f"\nMissing joints: {', '.join(result['missing_joints'])}") return # Training mode print("Training mode") print(f"Data directory: {args.data}") print(f"Model type: {args.model}") print(f"Test size: {args.test_size}") print("=" * 60) # Check data directory if not Path(args.data).exists(): print(f"Error: data directory does not exist: {args.data}") return # Create classifier classifier = PoseClassifier(model_type=args.model) # If loading an existing model and not forcing training if args.load_model and not args.train: print(f"Loading existing model: {args.load_model}") classifier.load_model(args.load_model) print("Model loaded, skipping training step") else: # Load data X, y = classifier.load_data(args.data) if len(X) == 0: print("Error: no valid data found") return # Train model results = classifier.train(X, y, test_size=args.test_size) # Plot confusion matrix (if not disabled) if not args.no_plot: try: classifier.plot_confusion_matrix( results['confusion_matrix'], results['target_names'], save_path=f"confusion_matrix_{args.model}.png" ) except Exception as e: print(f"Error while plotting confusion matrix: {e}") # Save model (if specified) if args.save_model: classifier.save_model(args.save_model) else: # Default save path default_path = f"pose_classifier_{args.model}.pkl" classifier.save_model(default_path) print("\nTraining complete!") print(f"Final test accuracy: {results['test_accuracy']:.4f}") # Export ONNX if requested if args.export_onnx: print(f"\nExporting {args.export_model_type} model to ONNX format...") onnx_path = classifier.export_to_onnx(model_type=args.export_model_type, output_path=args.export_onnx) if onnx_path: print(f"✅ ONNX model exported: {onnx_path}") # Export TFLite if requested if args.export_tflite: print("\nExporting student_model to TFLite format...") tflite_path = classifier.export_to_tflite(output_path=args.export_tflite) if tflite_path: print(f"✅ TFLite model exported: {tflite_path}") if __name__ == "__main__": main()