YogaPoseClassify / ml_pose_classifier.py
pegasama's picture
train and test python script
b26156a verified
raw
history blame
44.8 kB
#!/usr/bin/env python3
"""
Machine learning pose classification script.
Features:
1. Train classifiers on pose landmark inputs
2. Use selected landmark coordinates as features
3. Use folder names as class labels
4. Train and evaluate models
Usage:
python ml_pose_classifier.py [--data DATA_DIR] [--model MODEL_TYPE] [--test-size RATIO]
"""
import json
import argparse
import numpy as np
import time
from pathlib import Path
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
# from sklearn.pipeline import Pipeline # not used
from sklearn.neural_network import MLPRegressor
import joblib
import matplotlib.pyplot as plt
# seaborn is optional; used only for confusion matrix plotting
try:
import seaborn as sns
SEABORN_AVAILABLE = True
except ImportError:
SEABORN_AVAILABLE = False
# ONNX related imports
try:
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
# onnx is not required here; we import it lazily where needed
ONNX_AVAILABLE = True
except ImportError:
ONNX_AVAILABLE = False
# ONNX Runtime import
try:
# onnxruntime is optional and not required unless ONNX runtime testing is implemented
ONNX_RUNTIME_AVAILABLE = False
except ImportError:
ONNX_RUNTIME_AVAILABLE = False
class PoseClassifier:
def __init__(self, model_type='random_forest'):
"""
Initialize the pose classifier.
Args:
model_type: model type ('random_forest', 'svm', 'gradient_boost', 'logistic', 'distilled_rf')
"""
self.model_type = model_type
self.model = None
self.student_model = None # If distillation is used, save student (MLP) model
self.scaler = StandardScaler()
self.label_encoder = LabelEncoder()
# Define joints we want to use (based on MediaPipe keypoint indices)
self.target_joints = [
'nose', # Head (nose as reference, but will actually be 0,0,0)
'left_shoulder', # Left shoulder
'right_shoulder', # Right shoulder
'left_elbow', # Left elbow
'right_elbow', # Right elbow
'left_wrist', # Left wrist
'right_wrist', # Right wrist
'left_hip', # Left hip
'right_hip', # Right hip
'left_knee', # Left knee
'right_knee', # Right knee
'left_ankle', # Left ankle
'right_ankle' # Right ankle
]
self.feature_columns = []
for joint in self.target_joints:
self.feature_columns.extend([f'{joint}_x', f'{joint}_y', f'{joint}_z'])
print(f"Target joints: {len(self.target_joints)}")
print(f"Feature dimension: {len(self.feature_columns)}")
print("Joint list:", ', '.join(self.target_joints))
def _get_model(self):
"""Create a classifier based on the selected model type."""
if self.model_type == 'random_forest':
return RandomForestClassifier(
n_estimators=100,
max_depth=15,
min_samples_split=5,
min_samples_leaf=2,
random_state=42,
n_jobs=-1
)
elif self.model_type == 'svm':
return SVC(
C=1.0,
kernel='rbf',
gamma='scale',
random_state=42
)
elif self.model_type == 'gradient_boost':
return GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=6,
random_state=42
)
elif self.model_type == 'logistic':
return LogisticRegression(
C=10.0, # Increase regularization parameter to improve model complexity
max_iter=2000, # Increase maximum iterations
solver='lbfgs', # Use L-BFGS solver, suitable for small datasets
multi_class='multinomial', # Multi-class strategy
random_state=42,
n_jobs=-1
)
elif self.model_type == 'distilled_rf':
# Teacher uses random forest (returns an RF for training process)
return RandomForestClassifier(
n_estimators=100,
max_depth=15,
min_samples_split=5,
min_samples_leaf=2,
random_state=42,
n_jobs=-1
)
else:
raise ValueError(f"Unsupported model type: {self.model_type}")
def load_data(self, data_dir):
"""
Load pose data from JSON files
Args:
data_dir: Data directory containing label folders
Returns:
tuple: (feature data, labels)
"""
data_path = Path(data_dir)
all_features = []
all_labels = []
print(f"Loading data from: {data_path}")
# Iterate over each label directory
for label_dir in data_path.iterdir():
if not label_dir.is_dir() or not label_dir.name.startswith('label_'):
continue
label = label_dir.name
json_files = list(label_dir.glob('*.json'))
print(f"Processing {label}: {len(json_files)} files")
for json_file in json_files:
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
landmarks = data.get('landmarks', {})
# Extract coordinates of target joints
features = []
missing_joints = []
for joint in self.target_joints:
if joint in landmarks:
joint_data = landmarks[joint]
features.extend([
joint_data.get('x', 0.0),
joint_data.get('y', 0.0),
joint_data.get('z', 0.0)
])
else:
# If a joint is missing, fill with zeros
features.extend([0.0, 0.0, 0.0])
missing_joints.append(joint)
if len(features) == len(self.feature_columns):
all_features.append(features)
all_labels.append(label)
else:
print(f"Skipping file {json_file}: feature dimension mismatch")
if missing_joints:
print(f"File {json_file.name} missing joints: {missing_joints}")
except Exception as e:
print(f"Error reading file {json_file}: {e}")
continue
print(f"Loaded {len(all_features)} samples")
# count samples per label
label_counts = {}
for label in all_labels:
label_counts[label] = label_counts.get(label, 0) + 1
print("Label distribution:")
for label, count in sorted(label_counts.items()):
print(f" {label}: {count} samples")
return np.array(all_features), np.array(all_labels)
def train(self, X, y, test_size=0.2):
"""
Train the classifier.
Args:
X: feature data
y: labels
test_size: ratio for test split
Returns:
dict: a dictionary containing training results
"""
print(f"\nStarting training for model: {self.model_type}...")
print(f"Data shape: {X.shape}")
print(f"Number of labels: {len(np.unique(y))}")
# Encode labels
y_encoded = self.label_encoder.fit_transform(y)
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X, y_encoded, test_size=test_size, random_state=42, stratify=y_encoded
)
print(f"Train set size: {X_train.shape[0]}")
print(f"Test set size: {X_test.shape[0]}")
# standardize features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# If using distillation process: train RF teacher first, then train MLPRegressor student to fit teacher's predict_proba
if self.model_type == 'distilled_rf':
print("Using distillation: train RandomForest teacher, then fit an MLPRegressor student to teacher soft labels")
# Train teacher
teacher = self._get_model()
teacher.fit(X_train_scaled, y_train)
# Get teacher's probability distribution as soft labels
y_train_proba = teacher.predict_proba(X_train_scaled)
# Create and train student (MLPRegressor) to fit probability vectors
student = MLPRegressor(hidden_layer_sizes=(128, 64, 32),
activation='relu',
solver='adam',
max_iter=1000,
learning_rate_init=0.001,
random_state=42,
early_stopping=True,
validation_fraction=0.1)
print("Training student model to fit teacher probability outputs...")
print(f"Teacher probability output shape: {y_train_proba.shape}")
# Multi-output regression, target is probability vector
student.fit(X_train_scaled, y_train_proba)
# Save models
self.model = teacher
self.student_model = student
# Use student to predict on train/test sets
y_train_pred_proba = student.predict(X_train_scaled)
y_test_pred_proba = student.predict(X_test_scaled)
# Apply softmax to ensure probabilities sum to 1
def softmax(x):
exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
return exp_x / np.sum(exp_x, axis=1, keepdims=True)
y_train_pred_proba = softmax(y_train_pred_proba)
y_test_pred_proba = softmax(y_test_pred_proba)
y_train_pred = np.argmax(y_train_pred_proba, axis=1)
y_test_pred = np.argmax(y_test_pred_proba, axis=1)
print(f"Student predicted probability shape: {y_test_pred_proba.shape}")
print(f"Student training accuracy: {accuracy_score(y_train, y_train_pred):.4f}")
else:
# Standard flow: train a single model
self.model = self._get_model()
self.model.fit(X_train_scaled, y_train)
y_train_pred = self.model.predict(X_train_scaled)
y_test_pred = self.model.predict(X_test_scaled)
# compute accuracies
train_accuracy = accuracy_score(y_train, y_train_pred)
test_accuracy = accuracy_score(y_test, y_test_pred)
# cross validation on the model used for training
# if student_model exists, still use teacher for cross-val
cv_model = self.model if self.model is not None else None
if cv_model is not None:
cv_scores = cross_val_score(cv_model, X_train_scaled, y_train, cv=5)
else:
cv_scores = np.array([])
print("\nTraining results:")
print(f"Train accuracy: {train_accuracy:.4f}")
print(f"Test accuracy: {test_accuracy:.4f}")
print(f"5-fold CV accuracy: {cv_scores.mean():.4f} ± {cv_scores.std():.4f}")
# classification report
print("\nTest set classification report:")
target_names = self.label_encoder.classes_
print(classification_report(y_test, y_test_pred, target_names=target_names))
# confusion matrix
cm = confusion_matrix(y_test, y_test_pred)
return {
'train_accuracy': train_accuracy,
'test_accuracy': test_accuracy,
'cv_scores': cv_scores,
'confusion_matrix': cm,
'target_names': target_names,
'X_test': X_test_scaled,
'y_test': y_test,
'y_test_pred': y_test_pred
}
def save_model(self, filepath):
"""Save trained model to disk."""
model_data = {
'model': self.model,
'scaler': self.scaler,
'label_encoder': self.label_encoder,
'model_type': self.model_type,
'target_joints': self.target_joints,
'feature_columns': self.feature_columns
}
joblib.dump(model_data, filepath)
print(f"Model saved to: {filepath}")
def load_model(self, filepath):
"""Load trained model from disk."""
model_data = joblib.load(filepath)
self.model = model_data['model']
self.scaler = model_data['scaler']
self.label_encoder = model_data['label_encoder']
self.model_type = model_data['model_type']
self.target_joints = model_data['target_joints']
self.feature_columns = model_data['feature_columns']
print(f"Model loaded from: {filepath}")
def predict(self, X):
"""Run prediction on input features."""
if self.model is None and self.student_model is None:
raise ValueError("Model not trained or loaded")
X_scaled = self.scaler.transform(X)
# Prefer to use student_model (if exists) to generate probability output
if self.student_model is not None:
proba = self.student_model.predict(X_scaled) # Returns probability vector
preds = np.argmax(proba, axis=1)
labels = self.label_encoder.inverse_transform(preds)
return labels, proba
# Otherwise fall back to original model
predictions = self.model.predict(X_scaled)
probabilities = None
if hasattr(self.model, 'predict_proba'):
probabilities = self.model.predict_proba(X_scaled)
return self.label_encoder.inverse_transform(predictions), probabilities
def predict_single_json(self, json_path):
"""
Predict pose class for a single JSON file.
Args:
json_path: path to the JSON file
Returns:
dict: prediction details or error information
"""
if self.model is None:
raise ValueError("Model not trained or loaded")
try:
# Read JSON file
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
landmarks = data.get('landmarks', {})
# Extract coordinates of target joints
features = []
missing_joints = []
available_joints = []
for joint in self.target_joints:
if joint in landmarks:
joint_data = landmarks[joint]
features.extend([
joint_data.get('x', 0.0),
joint_data.get('y', 0.0),
joint_data.get('z', 0.0)
])
available_joints.append(joint)
else:
# If a joint is missing, fill with zeros
features.extend([0.0, 0.0, 0.0])
missing_joints.append(joint)
if len(features) != len(self.feature_columns):
raise ValueError(f"Feature dimension mismatch: expected {len(self.feature_columns)}, got {len(features)}")
# Convert to numpy array and predict
X = np.array([features])
predictions, probabilities = self.predict(X)
# build result dict
result = {
'file_path': str(json_path),
'file_name': Path(json_path).name,
'predicted_label': predictions[0],
'confidence_scores': {},
'available_joints': available_joints,
'missing_joints': missing_joints,
'joint_coverage': f"{len(available_joints)}/{len(self.target_joints)}"
}
# add per-class confidence scores
if probabilities is not None:
for i, label in enumerate(self.label_encoder.classes_):
result['confidence_scores'][label] = float(probabilities[0][i])
# highest confidence
max_prob_idx = np.argmax(probabilities[0])
result['max_confidence'] = float(probabilities[0][max_prob_idx])
return result
except Exception as e:
return {
'file_path': str(json_path),
'file_name': Path(json_path).name,
'error': str(e),
'predicted_label': None
}
def evaluate_test_directory(self, test_dir):
"""
Evaluate all data in a test directory.
Args:
test_dir: path to the test data directory
Returns:
dict: dictionary containing detailed evaluation results
"""
if self.model is None:
raise ValueError("Model not trained or loaded")
test_path = Path(test_dir)
if not test_path.exists():
raise ValueError(f"Test directory does not exist: {test_dir}")
# start timing
start_time = time.time()
print(f"Starting evaluation on test dataset: {test_path}")
print(f"Start time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")
# store all prediction results
all_results = []
label_stats = {}
total_prediction_time = 0.0
prediction_count = 0
# iterate over label folders
for label_dir in test_path.iterdir():
if not label_dir.is_dir() or not label_dir.name.startswith('label_'):
continue
true_label = label_dir.name
json_files = list(label_dir.glob('*.json'))
print(f"Evaluating {true_label}: {len(json_files)} files")
label_stats[true_label] = {
'total': len(json_files),
'correct': 0,
'incorrect': 0,
'errors': 0,
'predictions': {},
'confidence_scores': [],
'prediction_times': []
}
for json_file in json_files:
# Single prediction timing
pred_start_time = time.time()
result = self.predict_single_json(json_file)
pred_end_time = time.time()
single_prediction_time = pred_end_time - pred_start_time
total_prediction_time += single_prediction_time
prediction_count += 1
if 'error' in result:
label_stats[true_label]['errors'] += 1
print(f" Error: {json_file.name} - {result['error']}")
continue
predicted_label = result['predicted_label']
is_correct = predicted_label == true_label
if is_correct:
label_stats[true_label]['correct'] += 1
else:
label_stats[true_label]['incorrect'] += 1
# Count prediction distribution
if predicted_label not in label_stats[true_label]['predictions']:
label_stats[true_label]['predictions'][predicted_label] = 0
label_stats[true_label]['predictions'][predicted_label] += 1
# Record confidence and prediction time
if 'max_confidence' in result:
label_stats[true_label]['confidence_scores'].append(result['max_confidence'])
label_stats[true_label]['prediction_times'].append(single_prediction_time)
# Save detailed result
all_results.append({
'file_path': str(json_file),
'file_name': json_file.name,
'true_label': true_label,
'predicted_label': predicted_label,
'is_correct': is_correct,
'confidence': result.get('max_confidence', 0.0),
'confidence_scores': result.get('confidence_scores', {}),
'joint_coverage': result.get('joint_coverage', '0/13'),
'prediction_time': single_prediction_time
})
# end timing
end_time = time.time()
total_execution_time = end_time - start_time
# compute aggregate statistics
total_samples = sum(stats['total'] for stats in label_stats.values())
total_correct = sum(stats['correct'] for stats in label_stats.values())
total_errors = sum(stats['errors'] for stats in label_stats.values())
total_tested = total_samples - total_errors
overall_accuracy = total_correct / total_tested if total_tested > 0 else 0.0
avg_prediction_time = total_prediction_time / prediction_count if prediction_count > 0 else 0.0
# build confusion matrix
confusion_matrix = {}
for true_label in label_stats.keys():
confusion_matrix[true_label] = {}
for predicted_label in label_stats.keys():
confusion_matrix[true_label][predicted_label] = 0
for result in all_results:
if result.get('is_correct') is not None: # exclude error cases
true_label = result['true_label']
predicted_label = result['predicted_label']
confusion_matrix[true_label][predicted_label] += 1
return {
'label_stats': label_stats,
'overall_accuracy': overall_accuracy,
'total_samples': total_samples,
'total_correct': total_correct,
'total_errors': total_errors,
'total_tested': total_tested,
'confusion_matrix': confusion_matrix,
'detailed_results': all_results,
'timing_stats': {
'total_execution_time': total_execution_time,
'total_prediction_time': total_prediction_time,
'avg_prediction_time': avg_prediction_time,
'prediction_count': prediction_count,
'start_time': start_time,
'end_time': end_time,
'overhead_time': total_execution_time - total_prediction_time
}
}
def print_evaluation_report(self, eval_results):
"""
Print a detailed evaluation report.
Args:
eval_results: dictionary returned by evaluate_test_directory
"""
timing_stats = eval_results.get('timing_stats', {})
print("\n" + "=" * 80)
print("Test dataset evaluation report")
print("=" * 80)
# Overall statistics
print(f"Total samples: {eval_results['total_samples']}")
print(f"Successfully tested: {eval_results['total_tested']}")
print(f"Errors: {eval_results['total_errors']}")
print(
f"Overall accuracy: {eval_results['overall_accuracy']:.4f} "
f"({eval_results['total_correct']}/{eval_results['total_tested']})"
)
# Timing statistics
if timing_stats:
total_time = timing_stats['total_execution_time']
prediction_time = timing_stats['total_prediction_time']
avg_time = timing_stats['avg_prediction_time']
overhead_time = timing_stats['overhead_time']
prediction_count = timing_stats['prediction_count']
print("\nTiming statistics:")
print("-" * 50)
print(f"Total execution time: {total_time:.4f} s")
print(f"Total prediction time: {prediction_time:.4f} s")
print(f"Overhead time: {overhead_time:.4f} s")
print(f"Average prediction time: {avg_time * 1000:.2f} ms")
print(f"Prediction throughput: {prediction_count / total_time:.2f} preds/s")
print(
f"Prediction efficiency: {(prediction_time / total_time) * 100:.1f}% "
f"(prediction time / total)"
)
# Per-label detailed statistics
print("\nPer-label stats:")
print("-" * 80)
print(
f"{'Label':<10} {'Total':<6} {'Correct':<6} {'Wrong':<6} "
f"{'Accuracy':<8} {'AvgConf':<10} {'AvgPredTime':<12}"
)
print("-" * 80)
for label, stats in sorted(eval_results['label_stats'].items()):
accuracy = (
stats['correct'] / (stats['total'] - stats['errors'])
if (stats['total'] - stats['errors']) > 0
else 0.0
)
avg_confidence = (
np.mean(stats['confidence_scores']) if stats['confidence_scores'] else 0.0
)
avg_pred_time = (
np.mean(stats['prediction_times'])
if 'prediction_times' in stats and stats['prediction_times']
else 0.0
)
print(
f"{label:<10} {stats['total']:<6} {stats['correct']:<6} {stats['incorrect']:<6} "
f"{accuracy:.4f} {avg_confidence:.4f} {avg_pred_time * 1000:.2f}ms"
)
# Confusion matrix
print("\nConfusion matrix:")
print("-" * 60)
labels = sorted(eval_results['label_stats'].keys())
# Header row
print(f"{'True\\Pred':<12}", end="")
for label in labels:
print(f"{label:<10}", end="")
print()
# Data rows
for true_label in labels:
print(f"{true_label:<12}", end="")
for pred_label in labels:
count = eval_results['confusion_matrix'][true_label][pred_label]
print(f"{count:<10}", end="")
print()
# Per-label prediction distribution
print("\nPer-label prediction distribution:")
print("-" * 80)
for true_label, stats in sorted(eval_results['label_stats'].items()):
if stats['predictions']:
print(f"{true_label}:")
total_predictions = sum(stats['predictions'].values())
for pred_label, count in sorted(stats['predictions'].items()):
percentage = (count / total_predictions) * 100
print(f" -> {pred_label}: {count} ({percentage:.1f}%)")
# Error analysis
print("\nError analysis:")
print("-" * 40)
incorrect_results = [r for r in eval_results['detailed_results'] if not r['is_correct']]
if incorrect_results:
# Sort by confidence and show top mistaken predictions
incorrect_results.sort(key=lambda x: x['confidence'], reverse=True)
print("Highest-confidence incorrect predictions (top 10):")
for i, result in enumerate(incorrect_results[:10]):
pred_time = result.get('prediction_time', 0) * 1000 # ms
print(
f"{i + 1:2d}. {result['file_name']}: {result['true_label']} -> {result['predicted_label']} "
f"(conf: {result['confidence']:.4f}, time: {pred_time:.2f}ms)"
)
else:
print("No incorrect predictions found.")
# Performance analysis
if timing_stats and eval_results['detailed_results']:
print("\nPerformance analysis:")
print("-" * 40)
prediction_times = [
r.get('prediction_time', 0) for r in eval_results['detailed_results'] if 'prediction_time' in r
]
if prediction_times:
min_time = min(prediction_times) * 1000
max_time = max(prediction_times) * 1000
median_time = np.median(prediction_times) * 1000
std_time = np.std(prediction_times) * 1000
print("Prediction time distribution:")
print(f" Fastest: {min_time:.2f}ms")
print(f" Slowest: {max_time:.2f}ms")
print(f" Median: {median_time:.2f}ms")
print(f" Stddev: {std_time:.2f}ms")
print("\n" + "=" * 80)
def plot_confusion_matrix(self, cm, target_names, save_path=None):
"""Plot confusion matrix."""
plt.figure(figsize=(10, 8))
if SEABORN_AVAILABLE:
sns.heatmap(
cm,
annot=True,
fmt='d',
cmap='Blues',
xticklabels=target_names,
yticklabels=target_names,
)
else:
# Fallback using matplotlib only
im = plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.colorbar(im)
tick_marks = np.arange(len(target_names))
plt.xticks(tick_marks, target_names, rotation=45, ha='right')
plt.yticks(tick_marks, target_names)
# Annotate cells
thresh = cm.max() / 2.0 if cm.size else 0
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
plt.text(j, i, format(cm[i, j], 'd'),
ha="center", va="center",
color="white" if cm[i, j] > thresh else "black")
plt.title(f"{self.model_type.title()} model confusion matrix")
plt.xlabel('Predicted')
plt.ylabel('True')
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"Confusion matrix saved to: {save_path}")
plt.show()
def export_to_onnx(self, model_type='random_forest', output_path=None):
"""
Export the trained model to ONNX format (only models supported by Barracuda).
Note: Barracuda does not support LinearClassifier layers (e.g., LogisticRegression/SVM) — only tree models are supported.
"""
if not ONNX_AVAILABLE:
print("Error: ONNX export is unavailable. Please install skl2onnx and onnx packages:")
print("pip install skl2onnx onnx")
return None
if not hasattr(self, 'model') or self.model is None:
print("Error: Model is not trained yet. Please train the model first.")
return None
# Check if current model type matches requested export type
if hasattr(self, 'model_type') and self.model_type != model_type:
print(f"Warning: Currently trained {self.model_type} model, but requested to export {model_type} model")
print(f"Will export currently trained {self.model_type} model")
model_name = self.model_type
else:
model_name = model_type
# Barracuda only supports tree models, not LinearClassifier
if model_name in ['logistic', 'svm']:
print(f"❌ Barracuda/Unity does not support ONNX import for {model_name} models (LinearClassifier layer).")
print("Please use random_forest or gradient_boost for export.")
return None
# If student_model exists -> export student_model (MLP), otherwise export self.model
model_to_export = None
export_name = None
if self.student_model is not None:
model_to_export = self.student_model
export_name = 'distilled_mlp'
print("Detected student_model. Exporting student (MLP) to ONNX (suitable for Unity/Barracuda).")
else:
model_to_export = self.model
export_name = model_name
if model_to_export is None:
print("Error: No model available for export.")
return None
# Generate output file path
if output_path is None:
output_path = f"pose_classifier_{export_name}.onnx"
print(f"About to export model to: {output_path}, export target: {export_name}")
try:
feature_count = len(self.target_joints) * 3
initial_type = [('float_input', FloatTensorType([None, feature_count]))]
onnx_model = convert_sklearn(
model_to_export,
initial_types=initial_type,
target_opset=12
)
with open(output_path, "wb") as f:
f.write(onnx_model.SerializeToString())
print(f"✅ Successfully exported {export_name} model to ONNX format: {output_path}")
# Save label mapping and Scaler parameters
label_mapping_path = output_path.replace('.onnx', '_labels.json')
label_mapping = {
'label_encoder_classes': self.label_encoder.classes_.tolist(),
'model_type': export_name,
'feature_count': feature_count,
'target_joints': self.target_joints,
'description': f'Pose classifier - {len(self.target_joints)} landmarks with x,y,z coordinates',
'scaler_mean': self.scaler.mean_.tolist(),
'scaler_scale': self.scaler.scale_.tolist()
}
with open(label_mapping_path, 'w', encoding='utf-8') as f:
json.dump(label_mapping, f, ensure_ascii=False, indent=2)
print(f"✅ Label mapping and scaler parameters saved to: {label_mapping_path}")
print("⚠️ Note: The exported ONNX expects inputs to be standardized with scaler_mean/scaler_scale.")
return output_path
except Exception as e:
print(f"❌ ONNX export failed: {str(e)}")
import traceback
traceback.print_exc()
return None
def export_to_tflite(self, output_path=None):
"""
Export student_model (MLP) to TFLite format.
Dependencies: skl2onnx, onnx, onnx-tf, tensorflow
"""
if self.student_model is None:
print("❌ Only exporting student_model (MLPRegressor) to TFLite is supported. Please train with --model distilled_rf first.")
return None
try:
import onnx
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from onnx_tf.backend import prepare
import tensorflow as tf
except ImportError:
print("❌ You need to install skl2onnx, onnx, onnx-tf, tensorflow.")
print("pip install skl2onnx onnx onnx-tf tensorflow")
return None
feature_count = len(self.target_joints) * 3
initial_type = [('float_input', FloatTensorType([None, feature_count]))]
# 1. Export to ONNX
print("Exporting student_model to ONNX...")
onnx_model = convert_sklearn(
self.student_model,
initial_types=initial_type,
target_opset=12
)
onnx_path = "temp_student.onnx"
with open(onnx_path, "wb") as f:
f.write(onnx_model.SerializeToString())
print(f"✅ ONNX export successful: {onnx_path}")
# 2. ONNX -> TensorFlow SavedModel
print("Converting ONNX to TensorFlow SavedModel...")
tf_model = prepare(onnx.load(onnx_path))
tf_saved_path = "temp_student_tf"
tf_model.export_graph(tf_saved_path)
print(f"✅ SavedModel export successful: {tf_saved_path}")
# 3. SavedModel -> TFLite
print("Converting SavedModel to TFLite...")
converter = tf.lite.TFLiteConverter.from_saved_model(tf_saved_path)
tflite_model = converter.convert()
if output_path is None:
output_path = "pose_classifier_distilled_mlp.tflite"
with open(output_path, "wb") as f:
f.write(tflite_model)
print(f"✅ TFLite export successful: {output_path}")
# Cleanup temporary files (optional)
import os
os.remove(onnx_path)
import shutil
shutil.rmtree(tf_saved_path, ignore_errors=True)
return output_path
def main():
parser = argparse.ArgumentParser(description="Pose classification machine learning script")
parser.add_argument("--data", "-d", default="PoseData", help="Pose data directory (default: PoseData)")
parser.add_argument(
"--model",
"-m",
choices=['random_forest', 'svm', 'gradient_boost', 'logistic', 'distilled_rf'],
default='random_forest',
help="Model type (default: random_forest)",
)
parser.add_argument("--test-size", "-t", type=float, default=0.2, help="Test set ratio (default: 0.2)")
parser.add_argument("--save-model", "-s", help="Path to save the trained model")
parser.add_argument("--load-model", "-l", help="Path to load an already trained model")
parser.add_argument("--predict", "-p", help="Path of a single JSON file to predict")
parser.add_argument("--evaluate", "-e", help="Path of a test directory to evaluate all JSON files")
parser.add_argument("--no-plot", action="store_true", help="Do not display confusion matrix plot")
parser.add_argument("--train", action="store_true", help="Force training even if --load-model is provided")
parser.add_argument("--export-onnx", help="Export model to ONNX format; specify output file path")
parser.add_argument(
"--export-model-type",
choices=['random_forest', 'logistic', 'distilled_rf'],
default='random_forest',
help="Model type to export (default: random_forest)",
)
parser.add_argument("--test-onnx", help="Test an ONNX model; specify ONNX file path")
parser.add_argument("--onnx-labels", help="ONNX label mapping JSON path (auto-detect if not provided)")
parser.add_argument("--onnx-test-data", help="ONNX batch test data directory (if not provided, single-sample test)")
parser.add_argument(
"--export-tflite",
help="Export model to TFLite format; specify output path (supported for distilled_rf student model only)",
)
args = parser.parse_args()
print("Pose classification ML tool")
print("=" * 60)
# If ONNX test mode
if args.test_onnx:
print("ONNX model test mode")
print(f"ONNX model: {args.test_onnx}")
print("=" * 60)
# Create classifier instance for testing
classifier = PoseClassifier()
# Note: test_onnx_model is not implemented in this script; this is a placeholder.
# You can implement it later if needed.
print("ONNX test requested but functionality is not implemented in this script.")
return
# If evaluation mode
if args.evaluate:
if not args.load_model:
# Try to use default model file
default_model = f"pose_classifier_{args.model}.pkl"
if Path(default_model).exists():
args.load_model = default_model
else:
print(
f"Error: Need to specify model file path (--load-model) or ensure default model file exists: {default_model}"
)
return
print("Evaluation mode")
print(f"Test data directory: {args.evaluate}")
print(f"Model file: {args.load_model}")
print("=" * 60)
# Create classifier and load model
classifier = PoseClassifier(model_type=args.model)
classifier.load_model(args.load_model)
# Perform comprehensive evaluation
try:
eval_results = classifier.evaluate_test_directory(args.evaluate)
classifier.print_evaluation_report(eval_results)
except Exception as e:
print(f"Error during evaluation: {e}")
return
# Prediction-only mode
if args.predict:
if not args.load_model:
# Try to use default model file
default_model = f"pose_classifier_{args.model}.pkl"
if Path(default_model).exists():
args.load_model = default_model
else:
print(
f"Error: Need to specify model file path (--load-model) or ensure default model file exists: {default_model}"
)
return
print("Prediction mode")
print(f"JSON file: {args.predict}")
print(f"Model file: {args.load_model}")
print("=" * 60)
# Create classifier and load model
classifier = PoseClassifier(model_type=args.model)
classifier.load_model(args.load_model)
# Run prediction
result = classifier.predict_single_json(args.predict)
# Show prediction result
print("\nPrediction result:")
print(f"File: {result['file_name']}")
if 'error' in result:
print(f"Error: {result['error']}")
else:
print(f"Predicted label: {result['predicted_label']}")
print(f"Joint coverage: {result['joint_coverage']}")
if result['confidence_scores']:
print(f"Max confidence: {result['max_confidence']:.4f}")
print("\nPer-class confidence:")
sorted_scores = sorted(result['confidence_scores'].items(), key=lambda x: x[1], reverse=True)
for label, score in sorted_scores:
print(f" {label}: {score:.4f}")
if result['missing_joints']:
print(f"\nMissing joints: {', '.join(result['missing_joints'])}")
return
# Training mode
print("Training mode")
print(f"Data directory: {args.data}")
print(f"Model type: {args.model}")
print(f"Test size: {args.test_size}")
print("=" * 60)
# Check data directory
if not Path(args.data).exists():
print(f"Error: data directory does not exist: {args.data}")
return
# Create classifier
classifier = PoseClassifier(model_type=args.model)
# If loading an existing model and not forcing training
if args.load_model and not args.train:
print(f"Loading existing model: {args.load_model}")
classifier.load_model(args.load_model)
print("Model loaded, skipping training step")
else:
# Load data
X, y = classifier.load_data(args.data)
if len(X) == 0:
print("Error: no valid data found")
return
# Train model
results = classifier.train(X, y, test_size=args.test_size)
# Plot confusion matrix (if not disabled)
if not args.no_plot:
try:
classifier.plot_confusion_matrix(
results['confusion_matrix'], results['target_names'], save_path=f"confusion_matrix_{args.model}.png"
)
except Exception as e:
print(f"Error while plotting confusion matrix: {e}")
# Save model (if specified)
if args.save_model:
classifier.save_model(args.save_model)
else:
# Default save path
default_path = f"pose_classifier_{args.model}.pkl"
classifier.save_model(default_path)
print("\nTraining complete!")
print(f"Final test accuracy: {results['test_accuracy']:.4f}")
# Export ONNX if requested
if args.export_onnx:
print(f"\nExporting {args.export_model_type} model to ONNX format...")
onnx_path = classifier.export_to_onnx(model_type=args.export_model_type, output_path=args.export_onnx)
if onnx_path:
print(f"✅ ONNX model exported: {onnx_path}")
# Export TFLite if requested
if args.export_tflite:
print("\nExporting student_model to TFLite format...")
tflite_path = classifier.export_to_tflite(output_path=args.export_tflite)
if tflite_path:
print(f"✅ TFLite model exported: {tflite_path}")
if __name__ == "__main__":
main()