|
|
|
|
|
""" |
|
|
Machine learning pose classification script. |
|
|
|
|
|
Features: |
|
|
1. Train classifiers on pose landmark inputs |
|
|
2. Use selected landmark coordinates as features |
|
|
3. Use folder names as class labels |
|
|
4. Train and evaluate models |
|
|
|
|
|
Usage: |
|
|
python ml_pose_classifier.py [--data DATA_DIR] [--model MODEL_TYPE] [--test-size RATIO] |
|
|
""" |
|
|
|
|
|
import json |
|
|
import argparse |
|
|
import numpy as np |
|
|
import time |
|
|
from pathlib import Path |
|
|
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier |
|
|
from sklearn.svm import SVC |
|
|
from sklearn.linear_model import LogisticRegression |
|
|
from sklearn.model_selection import train_test_split, cross_val_score |
|
|
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score |
|
|
from sklearn.preprocessing import StandardScaler, LabelEncoder |
|
|
|
|
|
from sklearn.neural_network import MLPRegressor |
|
|
import joblib |
|
|
import matplotlib.pyplot as plt |
|
|
|
|
|
try: |
|
|
import seaborn as sns |
|
|
SEABORN_AVAILABLE = True |
|
|
except ImportError: |
|
|
SEABORN_AVAILABLE = False |
|
|
|
|
|
|
|
|
try: |
|
|
from skl2onnx import convert_sklearn |
|
|
from skl2onnx.common.data_types import FloatTensorType |
|
|
|
|
|
ONNX_AVAILABLE = True |
|
|
except ImportError: |
|
|
ONNX_AVAILABLE = False |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
ONNX_RUNTIME_AVAILABLE = False |
|
|
except ImportError: |
|
|
ONNX_RUNTIME_AVAILABLE = False |
|
|
|
|
|
|
|
|
class PoseClassifier: |
|
|
def __init__(self, model_type='random_forest'): |
|
|
""" |
|
|
Initialize the pose classifier. |
|
|
|
|
|
Args: |
|
|
model_type: model type ('random_forest', 'svm', 'gradient_boost', 'logistic', 'distilled_rf') |
|
|
""" |
|
|
self.model_type = model_type |
|
|
self.model = None |
|
|
self.student_model = None |
|
|
self.scaler = StandardScaler() |
|
|
self.label_encoder = LabelEncoder() |
|
|
|
|
|
|
|
|
self.target_joints = [ |
|
|
'nose', |
|
|
'left_shoulder', |
|
|
'right_shoulder', |
|
|
'left_elbow', |
|
|
'right_elbow', |
|
|
'left_wrist', |
|
|
'right_wrist', |
|
|
'left_hip', |
|
|
'right_hip', |
|
|
'left_knee', |
|
|
'right_knee', |
|
|
'left_ankle', |
|
|
'right_ankle' |
|
|
] |
|
|
|
|
|
self.feature_columns = [] |
|
|
for joint in self.target_joints: |
|
|
self.feature_columns.extend([f'{joint}_x', f'{joint}_y', f'{joint}_z']) |
|
|
|
|
|
print(f"Target joints: {len(self.target_joints)}") |
|
|
print(f"Feature dimension: {len(self.feature_columns)}") |
|
|
print("Joint list:", ', '.join(self.target_joints)) |
|
|
|
|
|
def _get_model(self): |
|
|
"""Create a classifier based on the selected model type.""" |
|
|
if self.model_type == 'random_forest': |
|
|
return RandomForestClassifier( |
|
|
n_estimators=100, |
|
|
max_depth=15, |
|
|
min_samples_split=5, |
|
|
min_samples_leaf=2, |
|
|
random_state=42, |
|
|
n_jobs=-1 |
|
|
) |
|
|
elif self.model_type == 'svm': |
|
|
return SVC( |
|
|
C=1.0, |
|
|
kernel='rbf', |
|
|
gamma='scale', |
|
|
random_state=42 |
|
|
) |
|
|
elif self.model_type == 'gradient_boost': |
|
|
return GradientBoostingClassifier( |
|
|
n_estimators=100, |
|
|
learning_rate=0.1, |
|
|
max_depth=6, |
|
|
random_state=42 |
|
|
) |
|
|
elif self.model_type == 'logistic': |
|
|
return LogisticRegression( |
|
|
C=10.0, |
|
|
max_iter=2000, |
|
|
solver='lbfgs', |
|
|
multi_class='multinomial', |
|
|
random_state=42, |
|
|
n_jobs=-1 |
|
|
) |
|
|
elif self.model_type == 'distilled_rf': |
|
|
|
|
|
return RandomForestClassifier( |
|
|
n_estimators=100, |
|
|
max_depth=15, |
|
|
min_samples_split=5, |
|
|
min_samples_leaf=2, |
|
|
random_state=42, |
|
|
n_jobs=-1 |
|
|
) |
|
|
else: |
|
|
raise ValueError(f"Unsupported model type: {self.model_type}") |
|
|
|
|
|
def load_data(self, data_dir): |
|
|
""" |
|
|
Load pose data from JSON files |
|
|
|
|
|
Args: |
|
|
data_dir: Data directory containing label folders |
|
|
|
|
|
Returns: |
|
|
tuple: (feature data, labels) |
|
|
""" |
|
|
data_path = Path(data_dir) |
|
|
all_features = [] |
|
|
all_labels = [] |
|
|
|
|
|
print(f"Loading data from: {data_path}") |
|
|
|
|
|
|
|
|
for label_dir in data_path.iterdir(): |
|
|
if not label_dir.is_dir() or not label_dir.name.startswith('label_'): |
|
|
continue |
|
|
|
|
|
label = label_dir.name |
|
|
json_files = list(label_dir.glob('*.json')) |
|
|
|
|
|
print(f"Processing {label}: {len(json_files)} files") |
|
|
|
|
|
for json_file in json_files: |
|
|
try: |
|
|
with open(json_file, 'r', encoding='utf-8') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
landmarks = data.get('landmarks', {}) |
|
|
|
|
|
|
|
|
features = [] |
|
|
missing_joints = [] |
|
|
|
|
|
for joint in self.target_joints: |
|
|
if joint in landmarks: |
|
|
joint_data = landmarks[joint] |
|
|
features.extend([ |
|
|
joint_data.get('x', 0.0), |
|
|
joint_data.get('y', 0.0), |
|
|
joint_data.get('z', 0.0) |
|
|
]) |
|
|
else: |
|
|
|
|
|
features.extend([0.0, 0.0, 0.0]) |
|
|
missing_joints.append(joint) |
|
|
|
|
|
if len(features) == len(self.feature_columns): |
|
|
all_features.append(features) |
|
|
all_labels.append(label) |
|
|
else: |
|
|
print(f"Skipping file {json_file}: feature dimension mismatch") |
|
|
|
|
|
if missing_joints: |
|
|
print(f"File {json_file.name} missing joints: {missing_joints}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error reading file {json_file}: {e}") |
|
|
continue |
|
|
|
|
|
print(f"Loaded {len(all_features)} samples") |
|
|
|
|
|
|
|
|
label_counts = {} |
|
|
for label in all_labels: |
|
|
label_counts[label] = label_counts.get(label, 0) + 1 |
|
|
|
|
|
print("Label distribution:") |
|
|
for label, count in sorted(label_counts.items()): |
|
|
print(f" {label}: {count} samples") |
|
|
|
|
|
return np.array(all_features), np.array(all_labels) |
|
|
|
|
|
def train(self, X, y, test_size=0.2): |
|
|
""" |
|
|
Train the classifier. |
|
|
|
|
|
Args: |
|
|
X: feature data |
|
|
y: labels |
|
|
test_size: ratio for test split |
|
|
|
|
|
Returns: |
|
|
dict: a dictionary containing training results |
|
|
""" |
|
|
print(f"\nStarting training for model: {self.model_type}...") |
|
|
print(f"Data shape: {X.shape}") |
|
|
print(f"Number of labels: {len(np.unique(y))}") |
|
|
|
|
|
|
|
|
y_encoded = self.label_encoder.fit_transform(y) |
|
|
|
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split( |
|
|
X, y_encoded, test_size=test_size, random_state=42, stratify=y_encoded |
|
|
) |
|
|
|
|
|
print(f"Train set size: {X_train.shape[0]}") |
|
|
print(f"Test set size: {X_test.shape[0]}") |
|
|
|
|
|
|
|
|
X_train_scaled = self.scaler.fit_transform(X_train) |
|
|
X_test_scaled = self.scaler.transform(X_test) |
|
|
|
|
|
|
|
|
if self.model_type == 'distilled_rf': |
|
|
print("Using distillation: train RandomForest teacher, then fit an MLPRegressor student to teacher soft labels") |
|
|
|
|
|
teacher = self._get_model() |
|
|
teacher.fit(X_train_scaled, y_train) |
|
|
|
|
|
|
|
|
y_train_proba = teacher.predict_proba(X_train_scaled) |
|
|
|
|
|
|
|
|
student = MLPRegressor(hidden_layer_sizes=(128, 64, 32), |
|
|
activation='relu', |
|
|
solver='adam', |
|
|
max_iter=1000, |
|
|
learning_rate_init=0.001, |
|
|
random_state=42, |
|
|
early_stopping=True, |
|
|
validation_fraction=0.1) |
|
|
|
|
|
print("Training student model to fit teacher probability outputs...") |
|
|
print(f"Teacher probability output shape: {y_train_proba.shape}") |
|
|
|
|
|
|
|
|
student.fit(X_train_scaled, y_train_proba) |
|
|
|
|
|
|
|
|
self.model = teacher |
|
|
self.student_model = student |
|
|
|
|
|
|
|
|
y_train_pred_proba = student.predict(X_train_scaled) |
|
|
y_test_pred_proba = student.predict(X_test_scaled) |
|
|
|
|
|
|
|
|
def softmax(x): |
|
|
exp_x = np.exp(x - np.max(x, axis=1, keepdims=True)) |
|
|
return exp_x / np.sum(exp_x, axis=1, keepdims=True) |
|
|
|
|
|
y_train_pred_proba = softmax(y_train_pred_proba) |
|
|
y_test_pred_proba = softmax(y_test_pred_proba) |
|
|
|
|
|
y_train_pred = np.argmax(y_train_pred_proba, axis=1) |
|
|
y_test_pred = np.argmax(y_test_pred_proba, axis=1) |
|
|
|
|
|
print(f"Student predicted probability shape: {y_test_pred_proba.shape}") |
|
|
print(f"Student training accuracy: {accuracy_score(y_train, y_train_pred):.4f}") |
|
|
|
|
|
else: |
|
|
|
|
|
self.model = self._get_model() |
|
|
self.model.fit(X_train_scaled, y_train) |
|
|
|
|
|
y_train_pred = self.model.predict(X_train_scaled) |
|
|
y_test_pred = self.model.predict(X_test_scaled) |
|
|
|
|
|
|
|
|
train_accuracy = accuracy_score(y_train, y_train_pred) |
|
|
test_accuracy = accuracy_score(y_test, y_test_pred) |
|
|
|
|
|
|
|
|
|
|
|
cv_model = self.model if self.model is not None else None |
|
|
if cv_model is not None: |
|
|
cv_scores = cross_val_score(cv_model, X_train_scaled, y_train, cv=5) |
|
|
else: |
|
|
cv_scores = np.array([]) |
|
|
|
|
|
print("\nTraining results:") |
|
|
print(f"Train accuracy: {train_accuracy:.4f}") |
|
|
print(f"Test accuracy: {test_accuracy:.4f}") |
|
|
print(f"5-fold CV accuracy: {cv_scores.mean():.4f} ± {cv_scores.std():.4f}") |
|
|
|
|
|
|
|
|
print("\nTest set classification report:") |
|
|
target_names = self.label_encoder.classes_ |
|
|
print(classification_report(y_test, y_test_pred, target_names=target_names)) |
|
|
|
|
|
|
|
|
cm = confusion_matrix(y_test, y_test_pred) |
|
|
|
|
|
return { |
|
|
'train_accuracy': train_accuracy, |
|
|
'test_accuracy': test_accuracy, |
|
|
'cv_scores': cv_scores, |
|
|
'confusion_matrix': cm, |
|
|
'target_names': target_names, |
|
|
'X_test': X_test_scaled, |
|
|
'y_test': y_test, |
|
|
'y_test_pred': y_test_pred |
|
|
} |
|
|
|
|
|
def save_model(self, filepath): |
|
|
"""Save trained model to disk.""" |
|
|
model_data = { |
|
|
'model': self.model, |
|
|
'scaler': self.scaler, |
|
|
'label_encoder': self.label_encoder, |
|
|
'model_type': self.model_type, |
|
|
'target_joints': self.target_joints, |
|
|
'feature_columns': self.feature_columns |
|
|
} |
|
|
joblib.dump(model_data, filepath) |
|
|
print(f"Model saved to: {filepath}") |
|
|
|
|
|
def load_model(self, filepath): |
|
|
"""Load trained model from disk.""" |
|
|
model_data = joblib.load(filepath) |
|
|
self.model = model_data['model'] |
|
|
self.scaler = model_data['scaler'] |
|
|
self.label_encoder = model_data['label_encoder'] |
|
|
self.model_type = model_data['model_type'] |
|
|
self.target_joints = model_data['target_joints'] |
|
|
self.feature_columns = model_data['feature_columns'] |
|
|
print(f"Model loaded from: {filepath}") |
|
|
|
|
|
def predict(self, X): |
|
|
"""Run prediction on input features.""" |
|
|
if self.model is None and self.student_model is None: |
|
|
raise ValueError("Model not trained or loaded") |
|
|
|
|
|
X_scaled = self.scaler.transform(X) |
|
|
|
|
|
|
|
|
if self.student_model is not None: |
|
|
proba = self.student_model.predict(X_scaled) |
|
|
preds = np.argmax(proba, axis=1) |
|
|
labels = self.label_encoder.inverse_transform(preds) |
|
|
return labels, proba |
|
|
|
|
|
|
|
|
predictions = self.model.predict(X_scaled) |
|
|
probabilities = None |
|
|
if hasattr(self.model, 'predict_proba'): |
|
|
probabilities = self.model.predict_proba(X_scaled) |
|
|
return self.label_encoder.inverse_transform(predictions), probabilities |
|
|
|
|
|
def predict_single_json(self, json_path): |
|
|
""" |
|
|
Predict pose class for a single JSON file. |
|
|
|
|
|
Args: |
|
|
json_path: path to the JSON file |
|
|
|
|
|
Returns: |
|
|
dict: prediction details or error information |
|
|
""" |
|
|
if self.model is None: |
|
|
raise ValueError("Model not trained or loaded") |
|
|
|
|
|
try: |
|
|
|
|
|
with open(json_path, 'r', encoding='utf-8') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
landmarks = data.get('landmarks', {}) |
|
|
|
|
|
|
|
|
features = [] |
|
|
missing_joints = [] |
|
|
available_joints = [] |
|
|
|
|
|
for joint in self.target_joints: |
|
|
if joint in landmarks: |
|
|
joint_data = landmarks[joint] |
|
|
features.extend([ |
|
|
joint_data.get('x', 0.0), |
|
|
joint_data.get('y', 0.0), |
|
|
joint_data.get('z', 0.0) |
|
|
]) |
|
|
available_joints.append(joint) |
|
|
else: |
|
|
|
|
|
features.extend([0.0, 0.0, 0.0]) |
|
|
missing_joints.append(joint) |
|
|
|
|
|
if len(features) != len(self.feature_columns): |
|
|
raise ValueError(f"Feature dimension mismatch: expected {len(self.feature_columns)}, got {len(features)}") |
|
|
|
|
|
|
|
|
X = np.array([features]) |
|
|
predictions, probabilities = self.predict(X) |
|
|
|
|
|
|
|
|
result = { |
|
|
'file_path': str(json_path), |
|
|
'file_name': Path(json_path).name, |
|
|
'predicted_label': predictions[0], |
|
|
'confidence_scores': {}, |
|
|
'available_joints': available_joints, |
|
|
'missing_joints': missing_joints, |
|
|
'joint_coverage': f"{len(available_joints)}/{len(self.target_joints)}" |
|
|
} |
|
|
|
|
|
|
|
|
if probabilities is not None: |
|
|
for i, label in enumerate(self.label_encoder.classes_): |
|
|
result['confidence_scores'][label] = float(probabilities[0][i]) |
|
|
|
|
|
|
|
|
max_prob_idx = np.argmax(probabilities[0]) |
|
|
result['max_confidence'] = float(probabilities[0][max_prob_idx]) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
'file_path': str(json_path), |
|
|
'file_name': Path(json_path).name, |
|
|
'error': str(e), |
|
|
'predicted_label': None |
|
|
} |
|
|
|
|
|
def evaluate_test_directory(self, test_dir): |
|
|
""" |
|
|
Evaluate all data in a test directory. |
|
|
|
|
|
Args: |
|
|
test_dir: path to the test data directory |
|
|
|
|
|
Returns: |
|
|
dict: dictionary containing detailed evaluation results |
|
|
""" |
|
|
if self.model is None: |
|
|
raise ValueError("Model not trained or loaded") |
|
|
|
|
|
test_path = Path(test_dir) |
|
|
if not test_path.exists(): |
|
|
raise ValueError(f"Test directory does not exist: {test_dir}") |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
print(f"Starting evaluation on test dataset: {test_path}") |
|
|
print(f"Start time: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}") |
|
|
|
|
|
|
|
|
all_results = [] |
|
|
label_stats = {} |
|
|
total_prediction_time = 0.0 |
|
|
prediction_count = 0 |
|
|
|
|
|
|
|
|
for label_dir in test_path.iterdir(): |
|
|
if not label_dir.is_dir() or not label_dir.name.startswith('label_'): |
|
|
continue |
|
|
|
|
|
true_label = label_dir.name |
|
|
json_files = list(label_dir.glob('*.json')) |
|
|
|
|
|
print(f"Evaluating {true_label}: {len(json_files)} files") |
|
|
|
|
|
label_stats[true_label] = { |
|
|
'total': len(json_files), |
|
|
'correct': 0, |
|
|
'incorrect': 0, |
|
|
'errors': 0, |
|
|
'predictions': {}, |
|
|
'confidence_scores': [], |
|
|
'prediction_times': [] |
|
|
} |
|
|
|
|
|
for json_file in json_files: |
|
|
|
|
|
pred_start_time = time.time() |
|
|
result = self.predict_single_json(json_file) |
|
|
pred_end_time = time.time() |
|
|
|
|
|
single_prediction_time = pred_end_time - pred_start_time |
|
|
total_prediction_time += single_prediction_time |
|
|
prediction_count += 1 |
|
|
|
|
|
if 'error' in result: |
|
|
label_stats[true_label]['errors'] += 1 |
|
|
print(f" Error: {json_file.name} - {result['error']}") |
|
|
continue |
|
|
|
|
|
predicted_label = result['predicted_label'] |
|
|
is_correct = predicted_label == true_label |
|
|
|
|
|
if is_correct: |
|
|
label_stats[true_label]['correct'] += 1 |
|
|
else: |
|
|
label_stats[true_label]['incorrect'] += 1 |
|
|
|
|
|
|
|
|
if predicted_label not in label_stats[true_label]['predictions']: |
|
|
label_stats[true_label]['predictions'][predicted_label] = 0 |
|
|
label_stats[true_label]['predictions'][predicted_label] += 1 |
|
|
|
|
|
|
|
|
if 'max_confidence' in result: |
|
|
label_stats[true_label]['confidence_scores'].append(result['max_confidence']) |
|
|
label_stats[true_label]['prediction_times'].append(single_prediction_time) |
|
|
|
|
|
|
|
|
all_results.append({ |
|
|
'file_path': str(json_file), |
|
|
'file_name': json_file.name, |
|
|
'true_label': true_label, |
|
|
'predicted_label': predicted_label, |
|
|
'is_correct': is_correct, |
|
|
'confidence': result.get('max_confidence', 0.0), |
|
|
'confidence_scores': result.get('confidence_scores', {}), |
|
|
'joint_coverage': result.get('joint_coverage', '0/13'), |
|
|
'prediction_time': single_prediction_time |
|
|
}) |
|
|
|
|
|
|
|
|
end_time = time.time() |
|
|
total_execution_time = end_time - start_time |
|
|
|
|
|
|
|
|
total_samples = sum(stats['total'] for stats in label_stats.values()) |
|
|
total_correct = sum(stats['correct'] for stats in label_stats.values()) |
|
|
total_errors = sum(stats['errors'] for stats in label_stats.values()) |
|
|
total_tested = total_samples - total_errors |
|
|
|
|
|
overall_accuracy = total_correct / total_tested if total_tested > 0 else 0.0 |
|
|
avg_prediction_time = total_prediction_time / prediction_count if prediction_count > 0 else 0.0 |
|
|
|
|
|
|
|
|
confusion_matrix = {} |
|
|
for true_label in label_stats.keys(): |
|
|
confusion_matrix[true_label] = {} |
|
|
for predicted_label in label_stats.keys(): |
|
|
confusion_matrix[true_label][predicted_label] = 0 |
|
|
|
|
|
for result in all_results: |
|
|
if result.get('is_correct') is not None: |
|
|
true_label = result['true_label'] |
|
|
predicted_label = result['predicted_label'] |
|
|
confusion_matrix[true_label][predicted_label] += 1 |
|
|
|
|
|
return { |
|
|
'label_stats': label_stats, |
|
|
'overall_accuracy': overall_accuracy, |
|
|
'total_samples': total_samples, |
|
|
'total_correct': total_correct, |
|
|
'total_errors': total_errors, |
|
|
'total_tested': total_tested, |
|
|
'confusion_matrix': confusion_matrix, |
|
|
'detailed_results': all_results, |
|
|
'timing_stats': { |
|
|
'total_execution_time': total_execution_time, |
|
|
'total_prediction_time': total_prediction_time, |
|
|
'avg_prediction_time': avg_prediction_time, |
|
|
'prediction_count': prediction_count, |
|
|
'start_time': start_time, |
|
|
'end_time': end_time, |
|
|
'overhead_time': total_execution_time - total_prediction_time |
|
|
} |
|
|
} |
|
|
|
|
|
def print_evaluation_report(self, eval_results): |
|
|
""" |
|
|
Print a detailed evaluation report. |
|
|
|
|
|
Args: |
|
|
eval_results: dictionary returned by evaluate_test_directory |
|
|
""" |
|
|
timing_stats = eval_results.get('timing_stats', {}) |
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
print("Test dataset evaluation report") |
|
|
print("=" * 80) |
|
|
|
|
|
|
|
|
print(f"Total samples: {eval_results['total_samples']}") |
|
|
print(f"Successfully tested: {eval_results['total_tested']}") |
|
|
print(f"Errors: {eval_results['total_errors']}") |
|
|
print( |
|
|
f"Overall accuracy: {eval_results['overall_accuracy']:.4f} " |
|
|
f"({eval_results['total_correct']}/{eval_results['total_tested']})" |
|
|
) |
|
|
|
|
|
|
|
|
if timing_stats: |
|
|
total_time = timing_stats['total_execution_time'] |
|
|
prediction_time = timing_stats['total_prediction_time'] |
|
|
avg_time = timing_stats['avg_prediction_time'] |
|
|
overhead_time = timing_stats['overhead_time'] |
|
|
prediction_count = timing_stats['prediction_count'] |
|
|
|
|
|
print("\nTiming statistics:") |
|
|
print("-" * 50) |
|
|
print(f"Total execution time: {total_time:.4f} s") |
|
|
print(f"Total prediction time: {prediction_time:.4f} s") |
|
|
print(f"Overhead time: {overhead_time:.4f} s") |
|
|
print(f"Average prediction time: {avg_time * 1000:.2f} ms") |
|
|
print(f"Prediction throughput: {prediction_count / total_time:.2f} preds/s") |
|
|
print( |
|
|
f"Prediction efficiency: {(prediction_time / total_time) * 100:.1f}% " |
|
|
f"(prediction time / total)" |
|
|
) |
|
|
|
|
|
|
|
|
print("\nPer-label stats:") |
|
|
print("-" * 80) |
|
|
print( |
|
|
f"{'Label':<10} {'Total':<6} {'Correct':<6} {'Wrong':<6} " |
|
|
f"{'Accuracy':<8} {'AvgConf':<10} {'AvgPredTime':<12}" |
|
|
) |
|
|
print("-" * 80) |
|
|
|
|
|
for label, stats in sorted(eval_results['label_stats'].items()): |
|
|
accuracy = ( |
|
|
stats['correct'] / (stats['total'] - stats['errors']) |
|
|
if (stats['total'] - stats['errors']) > 0 |
|
|
else 0.0 |
|
|
) |
|
|
avg_confidence = ( |
|
|
np.mean(stats['confidence_scores']) if stats['confidence_scores'] else 0.0 |
|
|
) |
|
|
avg_pred_time = ( |
|
|
np.mean(stats['prediction_times']) |
|
|
if 'prediction_times' in stats and stats['prediction_times'] |
|
|
else 0.0 |
|
|
) |
|
|
|
|
|
print( |
|
|
f"{label:<10} {stats['total']:<6} {stats['correct']:<6} {stats['incorrect']:<6} " |
|
|
f"{accuracy:.4f} {avg_confidence:.4f} {avg_pred_time * 1000:.2f}ms" |
|
|
) |
|
|
|
|
|
|
|
|
print("\nConfusion matrix:") |
|
|
print("-" * 60) |
|
|
labels = sorted(eval_results['label_stats'].keys()) |
|
|
|
|
|
|
|
|
print(f"{'True\\Pred':<12}", end="") |
|
|
for label in labels: |
|
|
print(f"{label:<10}", end="") |
|
|
print() |
|
|
|
|
|
|
|
|
for true_label in labels: |
|
|
print(f"{true_label:<12}", end="") |
|
|
for pred_label in labels: |
|
|
count = eval_results['confusion_matrix'][true_label][pred_label] |
|
|
print(f"{count:<10}", end="") |
|
|
print() |
|
|
|
|
|
|
|
|
print("\nPer-label prediction distribution:") |
|
|
print("-" * 80) |
|
|
for true_label, stats in sorted(eval_results['label_stats'].items()): |
|
|
if stats['predictions']: |
|
|
print(f"{true_label}:") |
|
|
total_predictions = sum(stats['predictions'].values()) |
|
|
for pred_label, count in sorted(stats['predictions'].items()): |
|
|
percentage = (count / total_predictions) * 100 |
|
|
print(f" -> {pred_label}: {count} ({percentage:.1f}%)") |
|
|
|
|
|
|
|
|
print("\nError analysis:") |
|
|
print("-" * 40) |
|
|
incorrect_results = [r for r in eval_results['detailed_results'] if not r['is_correct']] |
|
|
|
|
|
if incorrect_results: |
|
|
|
|
|
incorrect_results.sort(key=lambda x: x['confidence'], reverse=True) |
|
|
print("Highest-confidence incorrect predictions (top 10):") |
|
|
for i, result in enumerate(incorrect_results[:10]): |
|
|
pred_time = result.get('prediction_time', 0) * 1000 |
|
|
print( |
|
|
f"{i + 1:2d}. {result['file_name']}: {result['true_label']} -> {result['predicted_label']} " |
|
|
f"(conf: {result['confidence']:.4f}, time: {pred_time:.2f}ms)" |
|
|
) |
|
|
else: |
|
|
print("No incorrect predictions found.") |
|
|
|
|
|
|
|
|
if timing_stats and eval_results['detailed_results']: |
|
|
print("\nPerformance analysis:") |
|
|
print("-" * 40) |
|
|
prediction_times = [ |
|
|
r.get('prediction_time', 0) for r in eval_results['detailed_results'] if 'prediction_time' in r |
|
|
] |
|
|
if prediction_times: |
|
|
min_time = min(prediction_times) * 1000 |
|
|
max_time = max(prediction_times) * 1000 |
|
|
median_time = np.median(prediction_times) * 1000 |
|
|
std_time = np.std(prediction_times) * 1000 |
|
|
|
|
|
print("Prediction time distribution:") |
|
|
print(f" Fastest: {min_time:.2f}ms") |
|
|
print(f" Slowest: {max_time:.2f}ms") |
|
|
print(f" Median: {median_time:.2f}ms") |
|
|
print(f" Stddev: {std_time:.2f}ms") |
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
|
|
|
def plot_confusion_matrix(self, cm, target_names, save_path=None): |
|
|
"""Plot confusion matrix.""" |
|
|
plt.figure(figsize=(10, 8)) |
|
|
if SEABORN_AVAILABLE: |
|
|
sns.heatmap( |
|
|
cm, |
|
|
annot=True, |
|
|
fmt='d', |
|
|
cmap='Blues', |
|
|
xticklabels=target_names, |
|
|
yticklabels=target_names, |
|
|
) |
|
|
else: |
|
|
|
|
|
im = plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues) |
|
|
plt.colorbar(im) |
|
|
tick_marks = np.arange(len(target_names)) |
|
|
plt.xticks(tick_marks, target_names, rotation=45, ha='right') |
|
|
plt.yticks(tick_marks, target_names) |
|
|
|
|
|
thresh = cm.max() / 2.0 if cm.size else 0 |
|
|
for i in range(cm.shape[0]): |
|
|
for j in range(cm.shape[1]): |
|
|
plt.text(j, i, format(cm[i, j], 'd'), |
|
|
ha="center", va="center", |
|
|
color="white" if cm[i, j] > thresh else "black") |
|
|
|
|
|
plt.title(f"{self.model_type.title()} model confusion matrix") |
|
|
plt.xlabel('Predicted') |
|
|
plt.ylabel('True') |
|
|
|
|
|
if save_path: |
|
|
plt.savefig(save_path, dpi=300, bbox_inches='tight') |
|
|
print(f"Confusion matrix saved to: {save_path}") |
|
|
|
|
|
plt.show() |
|
|
|
|
|
def export_to_onnx(self, model_type='random_forest', output_path=None): |
|
|
""" |
|
|
Export the trained model to ONNX format (only models supported by Barracuda). |
|
|
Note: Barracuda does not support LinearClassifier layers (e.g., LogisticRegression/SVM) — only tree models are supported. |
|
|
""" |
|
|
if not ONNX_AVAILABLE: |
|
|
print("Error: ONNX export is unavailable. Please install skl2onnx and onnx packages:") |
|
|
print("pip install skl2onnx onnx") |
|
|
return None |
|
|
|
|
|
if not hasattr(self, 'model') or self.model is None: |
|
|
print("Error: Model is not trained yet. Please train the model first.") |
|
|
return None |
|
|
|
|
|
|
|
|
if hasattr(self, 'model_type') and self.model_type != model_type: |
|
|
print(f"Warning: Currently trained {self.model_type} model, but requested to export {model_type} model") |
|
|
print(f"Will export currently trained {self.model_type} model") |
|
|
model_name = self.model_type |
|
|
else: |
|
|
model_name = model_type |
|
|
|
|
|
|
|
|
if model_name in ['logistic', 'svm']: |
|
|
print(f"❌ Barracuda/Unity does not support ONNX import for {model_name} models (LinearClassifier layer).") |
|
|
print("Please use random_forest or gradient_boost for export.") |
|
|
return None |
|
|
|
|
|
|
|
|
model_to_export = None |
|
|
export_name = None |
|
|
|
|
|
if self.student_model is not None: |
|
|
model_to_export = self.student_model |
|
|
export_name = 'distilled_mlp' |
|
|
print("Detected student_model. Exporting student (MLP) to ONNX (suitable for Unity/Barracuda).") |
|
|
else: |
|
|
model_to_export = self.model |
|
|
export_name = model_name |
|
|
|
|
|
if model_to_export is None: |
|
|
print("Error: No model available for export.") |
|
|
return None |
|
|
|
|
|
|
|
|
if output_path is None: |
|
|
output_path = f"pose_classifier_{export_name}.onnx" |
|
|
|
|
|
print(f"About to export model to: {output_path}, export target: {export_name}") |
|
|
|
|
|
try: |
|
|
feature_count = len(self.target_joints) * 3 |
|
|
initial_type = [('float_input', FloatTensorType([None, feature_count]))] |
|
|
|
|
|
onnx_model = convert_sklearn( |
|
|
model_to_export, |
|
|
initial_types=initial_type, |
|
|
target_opset=12 |
|
|
) |
|
|
|
|
|
with open(output_path, "wb") as f: |
|
|
f.write(onnx_model.SerializeToString()) |
|
|
|
|
|
print(f"✅ Successfully exported {export_name} model to ONNX format: {output_path}") |
|
|
|
|
|
|
|
|
label_mapping_path = output_path.replace('.onnx', '_labels.json') |
|
|
label_mapping = { |
|
|
'label_encoder_classes': self.label_encoder.classes_.tolist(), |
|
|
'model_type': export_name, |
|
|
'feature_count': feature_count, |
|
|
'target_joints': self.target_joints, |
|
|
'description': f'Pose classifier - {len(self.target_joints)} landmarks with x,y,z coordinates', |
|
|
'scaler_mean': self.scaler.mean_.tolist(), |
|
|
'scaler_scale': self.scaler.scale_.tolist() |
|
|
} |
|
|
|
|
|
with open(label_mapping_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(label_mapping, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
print(f"✅ Label mapping and scaler parameters saved to: {label_mapping_path}") |
|
|
|
|
|
print("⚠️ Note: The exported ONNX expects inputs to be standardized with scaler_mean/scaler_scale.") |
|
|
|
|
|
return output_path |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ ONNX export failed: {str(e)}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
def export_to_tflite(self, output_path=None): |
|
|
""" |
|
|
Export student_model (MLP) to TFLite format. |
|
|
Dependencies: skl2onnx, onnx, onnx-tf, tensorflow |
|
|
""" |
|
|
if self.student_model is None: |
|
|
print("❌ Only exporting student_model (MLPRegressor) to TFLite is supported. Please train with --model distilled_rf first.") |
|
|
return None |
|
|
|
|
|
try: |
|
|
import onnx |
|
|
from skl2onnx import convert_sklearn |
|
|
from skl2onnx.common.data_types import FloatTensorType |
|
|
from onnx_tf.backend import prepare |
|
|
import tensorflow as tf |
|
|
except ImportError: |
|
|
print("❌ You need to install skl2onnx, onnx, onnx-tf, tensorflow.") |
|
|
print("pip install skl2onnx onnx onnx-tf tensorflow") |
|
|
return None |
|
|
|
|
|
feature_count = len(self.target_joints) * 3 |
|
|
initial_type = [('float_input', FloatTensorType([None, feature_count]))] |
|
|
|
|
|
|
|
|
print("Exporting student_model to ONNX...") |
|
|
onnx_model = convert_sklearn( |
|
|
self.student_model, |
|
|
initial_types=initial_type, |
|
|
target_opset=12 |
|
|
) |
|
|
onnx_path = "temp_student.onnx" |
|
|
with open(onnx_path, "wb") as f: |
|
|
f.write(onnx_model.SerializeToString()) |
|
|
print(f"✅ ONNX export successful: {onnx_path}") |
|
|
|
|
|
|
|
|
print("Converting ONNX to TensorFlow SavedModel...") |
|
|
tf_model = prepare(onnx.load(onnx_path)) |
|
|
tf_saved_path = "temp_student_tf" |
|
|
tf_model.export_graph(tf_saved_path) |
|
|
print(f"✅ SavedModel export successful: {tf_saved_path}") |
|
|
|
|
|
|
|
|
print("Converting SavedModel to TFLite...") |
|
|
converter = tf.lite.TFLiteConverter.from_saved_model(tf_saved_path) |
|
|
tflite_model = converter.convert() |
|
|
if output_path is None: |
|
|
output_path = "pose_classifier_distilled_mlp.tflite" |
|
|
with open(output_path, "wb") as f: |
|
|
f.write(tflite_model) |
|
|
print(f"✅ TFLite export successful: {output_path}") |
|
|
|
|
|
|
|
|
import os |
|
|
os.remove(onnx_path) |
|
|
import shutil |
|
|
shutil.rmtree(tf_saved_path, ignore_errors=True) |
|
|
|
|
|
return output_path |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Pose classification machine learning script") |
|
|
parser.add_argument("--data", "-d", default="PoseData", help="Pose data directory (default: PoseData)") |
|
|
parser.add_argument( |
|
|
"--model", |
|
|
"-m", |
|
|
choices=['random_forest', 'svm', 'gradient_boost', 'logistic', 'distilled_rf'], |
|
|
default='random_forest', |
|
|
help="Model type (default: random_forest)", |
|
|
) |
|
|
parser.add_argument("--test-size", "-t", type=float, default=0.2, help="Test set ratio (default: 0.2)") |
|
|
parser.add_argument("--save-model", "-s", help="Path to save the trained model") |
|
|
parser.add_argument("--load-model", "-l", help="Path to load an already trained model") |
|
|
parser.add_argument("--predict", "-p", help="Path of a single JSON file to predict") |
|
|
parser.add_argument("--evaluate", "-e", help="Path of a test directory to evaluate all JSON files") |
|
|
parser.add_argument("--no-plot", action="store_true", help="Do not display confusion matrix plot") |
|
|
parser.add_argument("--train", action="store_true", help="Force training even if --load-model is provided") |
|
|
parser.add_argument("--export-onnx", help="Export model to ONNX format; specify output file path") |
|
|
parser.add_argument( |
|
|
"--export-model-type", |
|
|
choices=['random_forest', 'logistic', 'distilled_rf'], |
|
|
default='random_forest', |
|
|
help="Model type to export (default: random_forest)", |
|
|
) |
|
|
parser.add_argument("--test-onnx", help="Test an ONNX model; specify ONNX file path") |
|
|
parser.add_argument("--onnx-labels", help="ONNX label mapping JSON path (auto-detect if not provided)") |
|
|
parser.add_argument("--onnx-test-data", help="ONNX batch test data directory (if not provided, single-sample test)") |
|
|
parser.add_argument( |
|
|
"--export-tflite", |
|
|
help="Export model to TFLite format; specify output path (supported for distilled_rf student model only)", |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
print("Pose classification ML tool") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
if args.test_onnx: |
|
|
print("ONNX model test mode") |
|
|
print(f"ONNX model: {args.test_onnx}") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
classifier = PoseClassifier() |
|
|
|
|
|
|
|
|
print("ONNX test requested but functionality is not implemented in this script.") |
|
|
return |
|
|
|
|
|
|
|
|
if args.evaluate: |
|
|
if not args.load_model: |
|
|
|
|
|
default_model = f"pose_classifier_{args.model}.pkl" |
|
|
if Path(default_model).exists(): |
|
|
args.load_model = default_model |
|
|
else: |
|
|
print( |
|
|
f"Error: Need to specify model file path (--load-model) or ensure default model file exists: {default_model}" |
|
|
) |
|
|
return |
|
|
|
|
|
print("Evaluation mode") |
|
|
print(f"Test data directory: {args.evaluate}") |
|
|
print(f"Model file: {args.load_model}") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
classifier = PoseClassifier(model_type=args.model) |
|
|
classifier.load_model(args.load_model) |
|
|
|
|
|
|
|
|
try: |
|
|
eval_results = classifier.evaluate_test_directory(args.evaluate) |
|
|
classifier.print_evaluation_report(eval_results) |
|
|
except Exception as e: |
|
|
print(f"Error during evaluation: {e}") |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
if args.predict: |
|
|
if not args.load_model: |
|
|
|
|
|
default_model = f"pose_classifier_{args.model}.pkl" |
|
|
if Path(default_model).exists(): |
|
|
args.load_model = default_model |
|
|
else: |
|
|
print( |
|
|
f"Error: Need to specify model file path (--load-model) or ensure default model file exists: {default_model}" |
|
|
) |
|
|
return |
|
|
|
|
|
print("Prediction mode") |
|
|
print(f"JSON file: {args.predict}") |
|
|
print(f"Model file: {args.load_model}") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
classifier = PoseClassifier(model_type=args.model) |
|
|
classifier.load_model(args.load_model) |
|
|
|
|
|
|
|
|
result = classifier.predict_single_json(args.predict) |
|
|
|
|
|
|
|
|
print("\nPrediction result:") |
|
|
print(f"File: {result['file_name']}") |
|
|
|
|
|
if 'error' in result: |
|
|
print(f"Error: {result['error']}") |
|
|
else: |
|
|
print(f"Predicted label: {result['predicted_label']}") |
|
|
print(f"Joint coverage: {result['joint_coverage']}") |
|
|
|
|
|
if result['confidence_scores']: |
|
|
print(f"Max confidence: {result['max_confidence']:.4f}") |
|
|
print("\nPer-class confidence:") |
|
|
sorted_scores = sorted(result['confidence_scores'].items(), key=lambda x: x[1], reverse=True) |
|
|
for label, score in sorted_scores: |
|
|
print(f" {label}: {score:.4f}") |
|
|
|
|
|
if result['missing_joints']: |
|
|
print(f"\nMissing joints: {', '.join(result['missing_joints'])}") |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
print("Training mode") |
|
|
print(f"Data directory: {args.data}") |
|
|
print(f"Model type: {args.model}") |
|
|
print(f"Test size: {args.test_size}") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
if not Path(args.data).exists(): |
|
|
print(f"Error: data directory does not exist: {args.data}") |
|
|
return |
|
|
|
|
|
|
|
|
classifier = PoseClassifier(model_type=args.model) |
|
|
|
|
|
|
|
|
if args.load_model and not args.train: |
|
|
print(f"Loading existing model: {args.load_model}") |
|
|
classifier.load_model(args.load_model) |
|
|
print("Model loaded, skipping training step") |
|
|
else: |
|
|
|
|
|
X, y = classifier.load_data(args.data) |
|
|
if len(X) == 0: |
|
|
print("Error: no valid data found") |
|
|
return |
|
|
|
|
|
results = classifier.train(X, y, test_size=args.test_size) |
|
|
|
|
|
if not args.no_plot: |
|
|
try: |
|
|
classifier.plot_confusion_matrix( |
|
|
results['confusion_matrix'], results['target_names'], save_path=f"confusion_matrix_{args.model}.png" |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error while plotting confusion matrix: {e}") |
|
|
|
|
|
if args.save_model: |
|
|
classifier.save_model(args.save_model) |
|
|
else: |
|
|
|
|
|
default_path = f"pose_classifier_{args.model}.pkl" |
|
|
classifier.save_model(default_path) |
|
|
print("\nTraining complete!") |
|
|
print(f"Final test accuracy: {results['test_accuracy']:.4f}") |
|
|
|
|
|
|
|
|
if args.export_onnx: |
|
|
print(f"\nExporting {args.export_model_type} model to ONNX format...") |
|
|
onnx_path = classifier.export_to_onnx(model_type=args.export_model_type, output_path=args.export_onnx) |
|
|
if onnx_path: |
|
|
print(f"✅ ONNX model exported: {onnx_path}") |
|
|
|
|
|
|
|
|
if args.export_tflite: |
|
|
print("\nExporting student_model to TFLite format...") |
|
|
tflite_path = classifier.export_to_tflite(output_path=args.export_tflite) |
|
|
if tflite_path: |
|
|
print(f"✅ TFLite model exported: {tflite_path}") |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|