#!/usr/bin/env python3 """ Training script for Vietnamese sentiment classification. Trains TF-IDF + ML models on VLSP2016 sentiment dataset. This script trains various machine learning models for Vietnamese sentiment analysis. """ import argparse import json import logging import os import time from datetime import datetime import numpy as np from datasets import load_dataset from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score, classification_report, confusion_matrix from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline from sklearn.svm import SVC from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, AdaBoostClassifier from sklearn.naive_bayes import MultinomialNB from sklearn.neural_network import MLPClassifier from sklearn.tree import DecisionTreeClassifier import joblib def setup_logging(run_name): """Setup logging to save all information to runs folder""" runs_dir = "runs" os.makedirs(runs_dir, exist_ok=True) run_dir = os.path.join(runs_dir, run_name) os.makedirs(run_dir, exist_ok=True) log_file = os.path.join(run_dir, "training.log") logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler(log_file), logging.StreamHandler()], ) return run_dir def load_uts2017_data(split_ratio=0.2, random_state=42, n_samples=None): """Load and prepare UTS2017_Bank aspect sentiment dataset Args: split_ratio: Ratio for train/test split random_state: Random seed for reproducibility n_samples: Optional limit on number of samples Returns: Tuple of (X_train, y_train), (X_test, y_test) """ print("Loading UTS2017_Bank aspect sentiment dataset from Hugging Face...") # Load the aspect sentiment subset dataset = load_dataset("undertheseanlp/UTS2017_Bank", "aspect_sentiment") # Get the train split (the dataset only has a train split) train_data = dataset["train"] # Extract text and aspects (which contains both aspect and sentiment) texts = [] labels = [] for item in train_data: text = item["text"] aspect_data = item["aspects"] # Handle multiple aspects per text (take the first one for now) if aspect_data and len(aspect_data) > 0: aspect = aspect_data[0]["aspect"] sentiment = aspect_data[0]["sentiment"] texts.append(text) labels.append(f"{aspect}#{sentiment}") # Convert to lists for consistency texts = list(texts) labels = list(labels) # Apply sample limit if specified if n_samples and n_samples < len(texts): # Shuffle before sampling to get balanced classes indices = np.arange(len(texts)) np.random.seed(random_state) np.random.shuffle(indices) indices = indices[:n_samples] texts = [texts[i] for i in indices] labels = [labels[i] for i in indices] # Convert to numpy arrays for consistency X = np.array(texts) y = np.array(labels) # Split into train and test sets # Use stratify only if we have enough samples per class (at least 2) min_samples_per_class = 2 unique_classes, class_counts = np.unique(y, return_counts=True) can_stratify = all(count >= min_samples_per_class for count in class_counts) if can_stratify: X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=split_ratio, random_state=random_state, stratify=y ) else: print( f"Warning: Some classes have fewer than {min_samples_per_class} samples. Disabling stratification." ) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=split_ratio, random_state=random_state ) print(f"Dataset loaded: {len(X_train)} train samples, {len(X_test)} test samples") print(f"Number of unique labels: {len(set(y))}") return (X_train, y_train), (X_test, y_test) def load_vlsp2016_data(use_predefined_split=True, split_ratio=0.2, random_state=42, n_samples=None): """Load and prepare VLSP2016 sentiment dataset Args: use_predefined_split: If True, use the predefined train/test split from the dataset split_ratio: Ratio for train/test split (only used if use_predefined_split is False) random_state: Random seed for reproducibility n_samples: Optional limit on number of samples Returns: Tuple of (X_train, y_train), (X_test, y_test) """ print("Loading VLSP2016 sentiment dataset from Hugging Face...") # Load the dataset dataset = load_dataset("ura-hcmut/vlsp2016") if use_predefined_split: # Use the predefined train/test split train_data = dataset["train"] test_data = dataset["test"] # Extract texts and labels X_train = [item["Data"] for item in train_data] y_train = [item["Class"] for item in train_data] X_test = [item["Data"] for item in test_data] y_test = [item["Class"] for item in test_data] # Apply sample limit if specified if n_samples: if n_samples < len(X_train): # Shuffle before sampling to get balanced classes indices = np.arange(len(X_train)) np.random.seed(random_state) np.random.shuffle(indices) indices = indices[:n_samples] X_train = [X_train[i] for i in indices] y_train = [y_train[i] for i in indices] if n_samples < len(X_test): # Proportionally reduce test set with shuffling test_samples = int(n_samples * 0.2) # Keep similar ratio indices = np.arange(len(X_test)) np.random.seed(random_state) np.random.shuffle(indices) indices = indices[:test_samples] X_test = [X_test[i] for i in indices] y_test = [y_test[i] for i in indices] # Convert to numpy arrays X_train = np.array(X_train) y_train = np.array(y_train) X_test = np.array(X_test) y_test = np.array(y_test) else: # Combine train and test, then create custom split all_data = list(dataset["train"]) + list(dataset["test"]) # Extract texts and labels texts = [item["Data"] for item in all_data] labels = [item["Class"] for item in all_data] # Apply sample limit if specified if n_samples and n_samples < len(texts): texts = texts[:n_samples] labels = labels[:n_samples] # Convert to numpy arrays X = np.array(texts) y = np.array(labels) # Split into train and test sets # Use stratify only if we have enough samples per class (at least 2) min_samples_per_class = 2 unique_classes, class_counts = np.unique(y, return_counts=True) can_stratify = all(count >= min_samples_per_class for count in class_counts) if can_stratify: X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=split_ratio, random_state=random_state, stratify=y ) else: print( f"Warning: Some classes have fewer than {min_samples_per_class} samples. Disabling stratification." ) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=split_ratio, random_state=random_state ) print(f"Dataset loaded: {len(X_train)} train samples, {len(X_test)} test samples") print(f"Number of unique labels: {len(set(y_train))}") print(f"Labels: {sorted(set(y_train))}") return (X_train, y_train), (X_test, y_test) def get_available_models(): """Get available classifier options""" return { # Traditional algorithms "logistic": LogisticRegression(max_iter=1000, random_state=42), "svc_linear": SVC(kernel="linear", random_state=42, probability=True), "svc_rbf": SVC(kernel="rbf", random_state=42, probability=True, gamma='scale'), "naive_bayes": MultinomialNB(), # Tree-based algorithms "decision_tree": DecisionTreeClassifier(random_state=42, max_depth=10), "random_forest": RandomForestClassifier(n_estimators=100, random_state=42, max_depth=10, n_jobs=-1), # Boosting algorithms "gradient_boost": GradientBoostingClassifier(n_estimators=100, random_state=42, max_depth=5), "ada_boost": AdaBoostClassifier(n_estimators=100, random_state=42), # Neural network "mlp": MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=500, random_state=42, early_stopping=True), } def load_data(dataset_name="vlsp2016", split_ratio=0.2, random_state=42, n_samples=None): """Load data from the specified dataset Args: dataset_name: Name of the dataset to load ('vlsp2016' or 'uts2017') split_ratio: Ratio for train/test split random_state: Random seed for reproducibility n_samples: Optional limit on number of samples Returns: Tuple of (X_train, y_train), (X_test, y_test), dataset_display_name """ if dataset_name.lower() == "vlsp2016": (X_train, y_train), (X_test, y_test) = load_vlsp2016_data( use_predefined_split=True, split_ratio=split_ratio, random_state=random_state, n_samples=n_samples ) display_name = "VLSP2016_Sentiment" elif dataset_name.lower() == "uts2017": (X_train, y_train), (X_test, y_test) = load_uts2017_data( split_ratio=split_ratio, random_state=random_state, n_samples=n_samples ) display_name = "UTS2017_Bank_AspectSentiment" else: raise ValueError(f"Unknown dataset: {dataset_name}. Choose 'vlsp2016' or 'uts2017'") return (X_train, y_train), (X_test, y_test), display_name def train_model( dataset="vlsp2016", model_name="logistic", max_features=20000, ngram_range=(1, 2), split_ratio=0.2, n_samples=None, export_model=False, ): """Train a single model with specified parameters Args: dataset: Name of the dataset to use ('vlsp2016' or 'uts2017') model_name: Name of the model to train ('logistic' or 'svc') max_features: Maximum number of features for TF-IDF vectorizer ngram_range: N-gram range for feature extraction split_ratio: Train/test split ratio n_samples: Optional limit on number of samples export_model: Whether to export the model for distribution Returns: Dictionary containing training results """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") run_dir = setup_logging(timestamp) logging.info(f"Starting training run: {timestamp}") logging.info(f"Dataset: {dataset}") logging.info(f"Model: {model_name}") logging.info(f"Max features: {max_features}") logging.info(f"N-gram range: {ngram_range}") if n_samples: logging.info(f"Sample limit: {n_samples}") # Create output folder for models output_folder = os.path.join(run_dir, "models") os.makedirs(output_folder, exist_ok=True) # Load data logging.info(f"Loading {dataset} dataset...") (X_train, y_train), (X_test, y_test), dataset_name = load_data( dataset_name=dataset, split_ratio=split_ratio, random_state=42, n_samples=n_samples ) # Get unique labels for reporting unique_labels = sorted(set(y_train)) label_counts_train = {label: np.sum(y_train == label) for label in unique_labels} label_counts_test = {label: np.sum(y_test == label) for label in unique_labels} logging.info(f"Train samples: {len(X_train)}") logging.info(f"Test samples: {len(X_test)}") logging.info(f"Unique labels: {len(unique_labels)}") logging.info(f"Label distribution (train): {label_counts_train}") logging.info(f"Label distribution (test): {label_counts_test}") # Get model available_models = get_available_models() if model_name not in available_models: raise ValueError( f"Model '{model_name}' not available. Choose from: {list(available_models.keys())}" ) classifier = available_models[model_name] clf_name = classifier.__class__.__name__ logging.info(f"Selected classifier: {clf_name}") # Configuration name config_name = f"{dataset_name}_{clf_name}_feat{max_features // 1000}k_ngram{ngram_range[0]}-{ngram_range[1]}" logging.info("=" * 60) logging.info(f"Training: {config_name}") logging.info("=" * 60) # Create TF-IDF pipeline logging.info( f"Creating pipeline with max_features={max_features}, ngram_range={ngram_range}" ) text_clf = Pipeline( [ ( "vect", CountVectorizer(max_features=max_features, ngram_range=ngram_range), ), ("tfidf", TfidfTransformer(use_idf=True)), ("clf", classifier), ] ) # Train the model logging.info("Training model...") start_time = time.time() text_clf.fit(X_train, y_train) train_time = time.time() - start_time logging.info(f"Training completed in {train_time:.2f} seconds") # Evaluate on training set logging.info("Evaluating on training set...") train_predictions = text_clf.predict(X_train) train_accuracy = accuracy_score(y_train, train_predictions) logging.info(f"Training accuracy: {train_accuracy:.4f}") # Evaluate on test set logging.info("Evaluating on test set...") start_time = time.time() test_predictions = text_clf.predict(X_test) test_accuracy = accuracy_score(y_test, test_predictions) prediction_time = time.time() - start_time logging.info(f"Test accuracy: {test_accuracy:.4f}") logging.info(f"Prediction time: {prediction_time:.2f} seconds") # Classification report logging.info("Classification Report:") report = classification_report(y_test, test_predictions, zero_division=0) logging.info(report) print("\nClassification Report:") print(report) # Save classification report as dict report_dict = classification_report( y_test, test_predictions, zero_division=0, output_dict=True ) # Confusion matrix cm = confusion_matrix(y_test, test_predictions, labels=unique_labels) logging.info(f"Confusion Matrix shape: {cm.shape}") # Save the model model_path = os.path.join(output_folder, "model.joblib") joblib.dump(text_clf, model_path) logging.info(f"Model saved to {model_path}") print(f"Model saved to {model_path}") # Save model with config name config_model_path = os.path.join(output_folder, f"{config_name}.joblib") joblib.dump(text_clf, config_model_path) logging.info(f"Model also saved as {config_model_path}") # Export model if requested if export_model: # Use format: _sentiment_.joblib run_id = os.path.basename(run_dir) dataset_prefix = dataset.lower() export_filename = f"{dataset_prefix}_sentiment_{run_id}.joblib" export_path = os.path.join(".", export_filename) joblib.dump(text_clf, export_path) logging.info(f"Model exported as {export_path}") print(f"Model exported for distribution: {export_filename}") # Save label mapping label_mapping_path = os.path.join(output_folder, "labels.txt") with open(label_mapping_path, "w", encoding="utf-8") as f: for label in unique_labels: f.write(f"{label}\n") logging.info(f"Label mapping saved to {label_mapping_path}") # Save metadata metadata = { "timestamp": timestamp, "dataset": dataset, "dataset_name": dataset_name, "config_name": config_name, "model_name": model_name, "classifier": clf_name, "max_features": max_features, "ngram_range": list(ngram_range), "split_ratio": split_ratio, "n_samples": n_samples, "train_samples": len(X_train), "test_samples": len(X_test), "unique_labels": len(unique_labels), "labels": unique_labels, "train_accuracy": float(train_accuracy), "test_accuracy": float(test_accuracy), "train_time": train_time, "prediction_time": prediction_time, "classification_report": report_dict, "confusion_matrix": cm.tolist(), } metadata_path = os.path.join(run_dir, "metadata.json") with open(metadata_path, "w", encoding="utf-8") as f: json.dump(metadata, f, indent=2, ensure_ascii=False) logging.info(f"Metadata saved to {metadata_path}") # Print summary print("\n" + "=" * 60) print("Training Summary") print("=" * 60) print(f"Model: {clf_name}") print(f"Training samples: {len(X_train)}") print(f"Test samples: {len(X_test)}") print(f"Number of classes: {len(unique_labels)}") print(f"Training accuracy: {train_accuracy:.4f}") print(f"Test accuracy: {test_accuracy:.4f}") print(f"Training time: {train_time:.2f} seconds") print(f"Model saved to: {model_path}") print("=" * 60) return metadata def train_all_configurations(dataset="vlsp2016", models=None, num_rows=None): """Train multiple model configurations and compare results""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") run_dir = setup_logging(timestamp) logging.info(f"Starting comparison run: {timestamp}") logging.info(f"Dataset: {dataset}") if num_rows: logging.info(f"Sample limit: {num_rows}") if models is None: # Define all available models for comparison available_models = get_available_models() models = list(available_models.keys()) logging.info(f"Models to compare: {models}") # Define configurations to test - focusing on best performing settings configurations = [] for model_name in models: if model_name in ["svc_rbf", "gradient_boost", "ada_boost", "mlp"]: # Use fewer features for computationally expensive models configurations.append({ "dataset": dataset, "model_name": model_name, "max_features": 10000, "ngram_range": (1, 2), "n_samples": num_rows }) else: # Use more features for faster models configurations.append({ "dataset": dataset, "model_name": model_name, "max_features": 20000, "ngram_range": (1, 2), "n_samples": num_rows }) results = [] for config in configurations: print(f"\nTraining configuration: {config}") try: result = train_model(**config) results.append(result) except Exception as e: logging.error(f"Failed to train with config {config}: {e}") print(f"Error training configuration: {e}") # Save comparison results comparison_path = os.path.join(run_dir, "comparison_results.json") with open(comparison_path, "w", encoding="utf-8") as f: json.dump(results, f, indent=2, ensure_ascii=False) # Print comparison table print("\n" + "=" * 80) print("Model Comparison Results") print("=" * 80) print( f"{'Model':<10} {'Features':<10} {'N-gram':<10} {'Train Acc':<12} {'Test Acc':<12}" ) print("-" * 80) for result in sorted(results, key=lambda x: x["test_accuracy"], reverse=True): model = result["classifier"][:8] features = f"{result['max_features'] // 1000}k" ngram = f"{result['ngram_range'][0]}-{result['ngram_range'][1]}" train_acc = result["train_accuracy"] test_acc = result["test_accuracy"] print( f"{model:<10} {features:<10} {ngram:<10} {train_acc:<12.4f} {test_acc:<12.4f}" ) print("=" * 80) # Find best model best_model = max(results, key=lambda x: x["test_accuracy"]) print(f"\nBest model: {best_model['config_name']}") print(f"Test accuracy: {best_model['test_accuracy']:.4f}") return results def train_notebook(dataset="vlsp2016", model_name="logistic", max_features=20000, ngram_min=1, ngram_max=2, split_ratio=0.2, n_samples=None, compare=False, export_model=False): """ Convenience function for training in Jupyter/Colab notebooks without argparse. Example usage: from train import train_notebook train_notebook(dataset="vlsp2016", model_name="logistic", max_features=20000, export_model=True) """ if compare: print(f"Training and comparing multiple configurations on {dataset}...") return train_all_configurations(dataset=dataset) else: print(f"Training {model_name} model on {dataset} dataset...") print(f"Configuration: max_features={max_features}, ngram=({ngram_min}, {ngram_max})") return train_model( dataset=dataset, model_name=model_name, max_features=max_features, ngram_range=(ngram_min, ngram_max), split_ratio=split_ratio, n_samples=n_samples, export_model=export_model, ) def main(): """Main function with argument parsing""" # Detect if running in Jupyter/Colab import sys in_notebook = hasattr(sys, 'ps1') or 'ipykernel' in sys.modules or 'google.colab' in sys.modules parser = argparse.ArgumentParser( description="Train Vietnamese sentiment classification model on various datasets" ) parser.add_argument( "--dataset", type=str, choices=["vlsp2016", "uts2017"], default="vlsp2016", help="Dataset to use for training (default: vlsp2016)", ) parser.add_argument( "--model", type=str, choices=["logistic", "svc_linear", "svc_rbf", "naive_bayes", "decision_tree", "random_forest", "gradient_boost", "ada_boost", "mlp"], default="logistic", help="Model type to train (default: logistic)", ) parser.add_argument( "--max-features", type=int, default=20000, help="Maximum number of features for TF-IDF (default: 20000)", ) parser.add_argument( "--ngram-min", type=int, default=1, help="Minimum n-gram range (default: 1)" ) parser.add_argument( "--ngram-max", type=int, default=2, help="Maximum n-gram range (default: 2)" ) parser.add_argument( "--split-ratio", type=float, default=0.2, help="Test split ratio (default: 0.2)" ) parser.add_argument( "--num-rows", type=int, default=None, help="Limit number of rows/samples for quick testing (default: None - use all data)", ) parser.add_argument( "--compare", action="store_true", help="Train and compare multiple configurations", ) parser.add_argument( "--compare-models", nargs="+", help="List of specific models to compare (e.g., --compare-models logistic random_forest svc_rbf)", choices=["logistic", "svc_linear", "svc_rbf", "naive_bayes", "decision_tree", "random_forest", "gradient_boost", "ada_boost", "mlp"] ) parser.add_argument( "--export-model", action="store_true", help="Export a copy of the trained model to project root for distribution/publishing" ) # Use parse_known_args to ignore Jupyter/Colab kernel arguments args, unknown = parser.parse_known_args() # If running in notebook and there are unknown args, inform user if in_notebook and unknown: print(f"Note: Running in Jupyter/Colab environment. Ignoring kernel arguments: {unknown}") if args.compare or args.compare_models: if args.compare_models: print(f"Training and comparing selected models: {args.compare_models}") print(f"Dataset: {args.dataset}") if args.num_rows: print(f"Using {args.num_rows} rows") train_all_configurations(dataset=args.dataset, models=args.compare_models, num_rows=args.num_rows) else: print("Training and comparing all available models...") print(f"Dataset: {args.dataset}") if args.num_rows: print(f"Using {args.num_rows} rows") train_all_configurations(dataset=args.dataset, num_rows=args.num_rows) else: print(f"Training {args.model} model on {args.dataset} dataset...") print( f"Configuration: max_features={args.max_features}, ngram=({args.ngram_min}, {args.ngram_max})" ) train_model( dataset=args.dataset, model_name=args.model, max_features=args.max_features, ngram_range=(args.ngram_min, args.ngram_max), split_ratio=args.split_ratio, n_samples=args.num_rows, export_model=args.export_model, ) if __name__ == "__main__": main()