Source code for reverse_feature_selection.reverse_random_forests

# Copyright (c) 2024 Sigrun May,
# Ostfalia Hochschule für angewandte Wissenschaften
#
# This software is distributed under the terms of the MIT license
# which is available at https://opensource.org/licenses/MIT

"""Reverse feature selection with random forest regressors."""

import logging
import math
import multiprocessing

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from scipy.stats import mannwhitneyu
from sklearn import clone
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def remove_features_correlated_to_target_feature(
    train_df: pd.DataFrame, correlation_matrix_df: pd.DataFrame, target_feature: str, meta_data: dict
) -> pd.DataFrame:
    """Remove features from the training data that are correlated to the target feature.

    This function creates a mask for uncorrelated features based on the correlation threshold
    specified in the metadata. It then uses this mask to select the uncorrelated features from the training data.

    Args:
        train_df: The training data.
        correlation_matrix_df: The correlation matrix of the training data.
        target_feature: The name of the target feature.
        meta_data: The metadata related to the dataset and experiment. Must contain "train_correlation_threshold" to
            define the threshold for the absolute correlation to the target feature.

    Returns:
        The training data including the label with only the features uncorrelated to the target feature remaining.

    Raises:
        AssertionError: If no features uncorrelated to the target feature are found.
    """
    # Create a mask for uncorrelated features based on the correlation threshold
    uncorrelated_features_mask = (
        correlation_matrix_df[target_feature]
        .abs()
        .le(meta_data["train_correlation_threshold"], axis="index")
        # For a correlation matrix filled only with the lower half,
        # the first elements up to the diagonal would have to be read
        # with axis="index" and the further elements after the diagonal
        # with axis="column".
    )
    # Remove correlated features from the training data
    uncorrelated_train_df = train_df[train_df.columns[uncorrelated_features_mask]]

    assert len(uncorrelated_train_df.columns) > 1, "No features uncorrelated to the target feature found."

    # insert the 'label' as the first column if it is not already there
    if uncorrelated_train_df.columns[0] != "label":
        uncorrelated_train_df.insert(0, "label", train_df["label"])

    # Return the data frame with uncorrelated features
    return uncorrelated_train_df


def calculate_oob_errors(
    target_feature_name: str,
    train_df: pd.DataFrame,
    corr_matrix_df: pd.DataFrame,
    meta_data: dict,
) -> tuple[list, list | None, int]:
    """Calculate out-of-bag (OOB) error for training data first including the label and again excluding the label.

    Args:
        target_feature_name: The name of the target feature.
        train_df: The training data.
        corr_matrix_df: The correlation matrix of the training data.
        meta_data: The metadata related to the dataset and experiment.

    Returns:
        A tuple containing lists of OOB scores for labeled and unlabeled training data and the number of features in
        the training data.
    """
    # Prepare training data
    y_train = train_df[target_feature_name].to_numpy()

    # Remove features correlated to the target feature
    x_train = remove_features_correlated_to_target_feature(train_df, corr_matrix_df, target_feature_name, meta_data)
    number_of_features_in_training_data = x_train.shape[1]

    oob_errors_labeled: list[float] = []
    oob_errors_unlabeled = []

    # chose 1/10 of the available CPUs for nested parallel processing
    n_jobs = max(1, int(meta_data["n_cpus"] / 10))

    # Perform validation using different random seeds
    for seed in meta_data["random_seeds"]:
        assert isinstance(seed, int)
        # Create a RandomForestRegressor model with specified parameters
        clf1 = RandomForestRegressor(
            oob_score=mean_squared_error,  # Use out-of-bag error for evaluation
            random_state=seed,
            min_samples_leaf=2,
            n_jobs=n_jobs,
        )
        # Create an exact clone of the RandomForestRegressor (clf1)
        # https://scikit-learn.org/stable/modules/generated/sklearn.base.clone.html
        clf2 = clone(clf1)

        # Fit the first model with training data including the label
        clf1.fit(x_train, y_train)
        label_importance_zero = math.isclose(clf1.feature_importances_[0], 0.0)

        # If the feature importance of the label feature is zero, it means the label was not considered in the model
        if label_importance_zero:
            return oob_errors_labeled, None, number_of_features_in_training_data

        # Store the OOB score for the labeled model
        oob_errors_labeled.append(clf1.oob_score_)

        # Fit the second model to the unlabeled training data (excluding 'label' column)
        unlabeled_x_train = x_train.loc[:, x_train.columns != "label"]
        clf2.fit(unlabeled_x_train, y_train)

        # Store the OOB score for the unlabeled model
        oob_errors_unlabeled.append(clf2.oob_score_)
    return oob_errors_labeled, oob_errors_unlabeled, number_of_features_in_training_data


def validate_and_initialize_meta_data(meta_data: dict | None) -> dict:
    """Validate and initialize the metadata dictionary.

    This function ensures that the metadata dictionary contains the required keys with valid values.
    If the metadata is not provided or is missing any required keys, default values are assigned.

    Args:
        meta_data: The metadata dictionary to validate and initialize. If None, a new dictionary
            with default values is created.

    Returns:
        The validated and initialized metadata dictionary.

    Raises:
        AssertionError: If any of the required keys in the metadata dictionary have invalid types or values.

    Default Values:
        - "n_cpus": The number of available CPUs (`multiprocessing.cpu_count()`).
        - "random_seeds": A list of 30 random integers between 1 and 10000.
        - "train_correlation_threshold": A float value of 0.7, representing the absolute correlation threshold.
    """
    # initialize numpy random number generator
    rng = np.random.default_rng()

    # Set default values if meta_data is not defined
    if meta_data is None:
        meta_data = {
            # Use all available CPUs
            "n_cpus": multiprocessing.cpu_count(),
            # Generate a list of 30 random integers to initialize random forests multiple times
            "random_seeds": rng.integers(1, 10001, size=30).tolist(),
            # Absolute correlation threshold for removing features correlated to the target feature
            "train_correlation_threshold": 0.7,
        }
        logger.info(
            f"\nMeta data have been set to the default values of \n"
            f"1. {meta_data['n_cpus']} CPUs for parallel calculation \n"
            f"2. 30 random integers as seeds for random forests \n"
            f"3. 0.7 as absolute correlation threshold for removing features from the training data correlated to the "
            f"target feature"
        )
    # Ensure "n_cpus" is defined, otherwise set to the number of available CPUs
    if "n_cpus" not in meta_data:
        meta_data["n_cpus"] = multiprocessing.cpu_count()
        logger.info(f"Number of CPUs is set to {meta_data['n_cpus']}")
    # Ensure "random_seeds" is defined, otherwise generate a list of 30 random integers
    if "random_seeds" not in meta_data:
        meta_data["random_seeds"] = rng.integers(1, 10001, size=30).tolist()
        logger.info("Seeds for random forests have been set to the default of 30 random integers")
    # Ensure "train_correlation_threshold" is defined, otherwise set to 0.7
    if "train_correlation_threshold" not in meta_data:
        meta_data["train_correlation_threshold"] = 0.7
        logger.info("Absolute correlation threshold has been set to the default of 0.7")

    # Validate the types and values of the metadata keys
    assert isinstance(meta_data["n_cpus"], int), "Number of available CPUs is not an integer."
    assert isinstance(meta_data["random_seeds"], list), "Random seeds are not a list."
    assert all(
        isinstance(seed_random_forest, int) for seed_random_forest in meta_data["random_seeds"]
    ), "Random seeds are not integers."
    assert isinstance(meta_data["train_correlation_threshold"], float), "Correlation threshold is not a float."
    assert 0 <= meta_data["train_correlation_threshold"] <= 1, "Correlation threshold is not between 0 and 1."

    return meta_data


[docs] def select_feature_subset( data_df: pd.DataFrame, train_indices: np.ndarray, label_column_name: str = "label", meta_data: dict | None = None, ) -> pd.DataFrame: """Selects a subset of features based on the mean out-of-bag (OOB) errors for random forests regressors. Calculate the mean out-of-bag (OOB) errors for random forests regressors with different random seeds for training data including the label and without the label for each feature. It then selects a subset of features based on the Mann-Whitney U test which determines whether there is a significant difference between the two error distributions. The test is configured for the hypothesis that the distribution of labeled_error_distribution is shifted to the left of the unlabeled_error_distribution. The Mann-Whitney U test is used to calculate the p-value based on the Out-of-Bag (OOB) scores of the labeled and unlabeled error distributions. The p-value is a measure of the probability that an observed difference could have occurred just by random chance. The smaller the p-value, the greater the statistical evidence to reject the null hypothesis (conclude that both error distributions differ). Args: data_df: The training data. The data must contain the label and features. train_indices: Indices for the training split. The indices are used to select the training data from the data_df DataFrame. label_column_name: The name of the label column in the training data. Default is "label". meta_data: The metadata related to the dataset and experiment. If `meta_data` is `None`, default values for the required keys (`"n_cpus"`, `"random_seeds"`, and `"train_correlation_threshold"`) are used. 1. `n_cpus`: The number of available CPUs is required as an integer and defaults to `multiprocessing.cpu_count()`. 2. `random_seeds`: A list of different seeds to initalize random forests is used to generate comparable error distributions. Define list of random seeds for reproducibility. Default is generating a random list of 30 seeds (int). 3. `train_correlation_threshold`: The absolute correlation threshold for removing features from the training data correlated to the target feature is a float between 0 and 1. The higher the threshold, the more features are deselected. The default value is set to `0.7` and should be adjusted if the results are not satisfactory. Returns: A pandas DateFrame with the selected features in the "feature_subset_selection" column. The feature_subset_selection column contains the fraction difference based on the mean of OOB score distributions, where the p_value is smaller or equal to 0.05. Features with values greater than 0 in this column are selected. The remaining columns provide additional information: * `feature_subset_selection_median`: Contains the feature subset based on the median fraction difference. * `unlabeled_errors`: Lists the OOB scores for the unlabeled training data. * `labeled_errors`: Lists the OOB scores for the labeled training data. * `p_value`: Contains the p-values from the Mann-Whitney U test. * `fraction_mean`: Shows the fraction difference based on the mean of the distributions. * `fraction_median`: Shows the fraction difference based on the median of the distributions. * `train_features_count`: Indicates the number of uncorrelated features in the training data. The index of the DataFrame is the feature names. Raises: AssertionError: If the meta_data dictionary does not contain the required keys or if the values are not of the expected type. Also, if the training data does not contain any features or if the label column is not found in the training data. ValueError: If no features uncorrelated to the target feature are found. """ assert label_column_name in data_df.columns, f"Label column {label_column_name} not found in the training data." if label_column_name != "label": # rename label column data_df[label_column_name] = data_df["label"] assert "label" in data_df.columns, "Label column not found in the training data." # check if feature importance calculation is possible if data_df.shape[1] < 2: raise ValueError("No features found in the training data.") meta_data = validate_and_initialize_meta_data(meta_data) # Split the data and select the training data train_df = data_df.iloc[train_indices, :] # Calculate the correlation matrix of the training data correlation_matrix_df = train_df.corr(method="spearman") # parallel version out = Parallel(n_jobs=meta_data["n_cpus"], verbose=-1)( delayed(calculate_oob_errors)(target_feature_name, train_df, correlation_matrix_df, meta_data) for target_feature_name in data_df.columns[1:] ) p_value_list: list[float | None] = [] fraction_list_mean: list[float | None] = [] fraction_list_median: list[float | None] = [] calculated_seeds_list: list[int] = [] # Unpack the results from the parallel processing labeled_error_distribution_list, unlabeled_error_distribution_list, features_count_in_train_df = zip( *out, strict=True ) # Calculate the p-value and fraction differences based on the mean and median of the distributions for labeled_error_distribution, unlabeled_error_distribution in zip( labeled_error_distribution_list, unlabeled_error_distribution_list, strict=True ): if unlabeled_error_distribution is not None: assert len(labeled_error_distribution) == len(unlabeled_error_distribution) calculated_seeds_list.append(len(unlabeled_error_distribution)) else: # the current target feature was deselected because the feature importance of the label was zero p_value_list.append(None) fraction_list_mean.append(None) fraction_list_median.append(None) calculated_seeds_list.append(len(labeled_error_distribution)) continue # calculate p-value based on the OOB scores and the mann-whitney u test # https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.mannwhitneyu.html p_value = mannwhitneyu(labeled_error_distribution, unlabeled_error_distribution, alternative="less").pvalue p_value_list.append(p_value) # calculate the fraction difference of the two means of the distributions fraction_mean_based = (np.mean(unlabeled_error_distribution) - np.mean(labeled_error_distribution)) / np.mean( unlabeled_error_distribution ) fraction_list_mean.append(fraction_mean_based) # calculate the fraction difference of the two medians of the distributions fraction_median_based = ( np.median(unlabeled_error_distribution) - np.median(labeled_error_distribution) ) / np.median(unlabeled_error_distribution) fraction_list_median.append(fraction_median_based) feature_subset_selection_list = [] feature_subset_selection_list_median = [] # extract feature selection array for p_value, fraction_mean, fraction_median in zip( p_value_list, fraction_list_mean, fraction_list_median, strict=True ): # check if the value of the "p_value" column is smaller or equal than 0.05 if p_value is not None and p_value <= 0.05: feature_subset_selection_list.append(fraction_mean) feature_subset_selection_list_median.append(fraction_median) else: feature_subset_selection_list.append(0.0) feature_subset_selection_list_median.append(0.0) assert ( len(labeled_error_distribution_list) == len(unlabeled_error_distribution_list) == len(p_value_list) == len(fraction_list_mean) == len(fraction_list_median) == len(feature_subset_selection_list) == len(feature_subset_selection_list_median) == len(calculated_seeds_list) == data_df.shape[1] - 1 # exclude label column ) result_df = pd.DataFrame(index=data_df.columns[1:]) result_df["feature_subset_selection"] = feature_subset_selection_list result_df["feature_subset_selection_median"] = feature_subset_selection_list_median result_df["unlabeled_errors"] = unlabeled_error_distribution_list result_df["labeled_errors"] = labeled_error_distribution_list result_df["p_value"] = p_value_list result_df["fraction_mean"] = fraction_list_mean result_df["fraction_median"] = fraction_list_median result_df["train_features_count"] = np.full(len(feature_subset_selection_list), features_count_in_train_df) result_df["number_of_repeated_trainings"] = calculated_seeds_list return result_df