Source code for cyclic_boosting.binning.ecdf_transformer

from __future__ import absolute_import, division, print_function

import logging
from collections import defaultdict
from typing import Union, Optional, List

import numpy as np
import pandas as pd
import sklearn.base as sklearnb

from cyclic_boosting import flags

from ._binary_search import eq_multi, ge_lim, le_interp_multi
from ._utils import (
    _read_feature_property,
    check_frame_empty,
    get_column_index,
    minimal_difference,
)

_logger = logging.getLogger(__name__)


[docs] class ConstFunction(object): def __init__(self, val): self.val = val def __call__(self): return self.val
[docs] class ECdfTransformer(sklearnb.BaseEstimator, sklearnb.TransformerMixin): r"""Transform features to the empirical CDF scale of the training data. CDF = :math:`P\left(X \leq x\right)` = cumulative distribution function. See `CDF on wikipedia <http://en.wikipedia.org/wiki/Cumulative_distribution_function>`_ Each feature found in ``feature_properties`` is considered in separation. In :meth:`fit`, (up to) ``n_bins`` bin boundaries with approximately equal number of data points are determined. For discrete values, the complete CDF is stored and ``n_bins`` is ignored. In :meth:`transform`, each feature value is associated with the corresponding bin by binary search. For features with :obj:`cyclic_boosting.flags.IS_CONTINUOUS` set the empirical CDF is then interpolated between the left and the right bin boundary. For out-of-range features, the bin boundaries are taken. For features with :obj:`cyclic_boosting.flags.IS_ORDERED` or :obj:`cyclic_boosting.flags.IS_UNORDERED` only values that have been seen in the fit are transformed to the corresponding empirical CDF values. For all values, not within epsilon of the values seen in the fit, `numpy.nan` is returned. Missing values(:obj:`numpy.nan`) stay missing values and are not transformed regardless of the ``feature_properties`` set and feature values seen in :meth:`fit`. For all features the feature property :obj:`cyclic_boosting.flags.HAS_MISSING` is assumed. Parameters ---------- n_bins: int, dict Maximum number of bins used to estimate the empirical CDF. ``n_bins`` is ignored for features with discrete preprocessing. If a dict is passed, the feature names/indices should be the keys and the n_bins are the values. Example : ``{'feature a': 150, 'feature b': 20}`` feature_properties: dict Dictionary listing the names of all features as keys and their preprocessing flags as values. When using a numpy feature matrix X with no column names the keys of the feature properties are the column indices. If no ``feature_properties`` are passed, all columns in ``X`` are treated as `cyclic_boosting.flags.IS_CONTINUOUS`. For more information about feature properties: .. seealso:: :mod:`cyclic_boosting.flags` weight_column Optional column label or column index for the weight column. If not set all samples receive the same weight 1. epsilon: float Used thresholds for the comparison of float values: * ``epsilon * 1.0`` for the comparison of CDF values * ``epsilon * minimal_bin_width`` for the comparison with bin boundaries of a given feature Default value for epsilon: 1e-9 tolerance: double Relative tolerance of the minimum bin weight. (E.g. if you specify 100 bins and a tolerance of 0.05 the bins are required to have only 0.95% of the total bin weights instead of 1.0%) **Guarantees for continuous features** (cyclic_boosting.flags.IS_CONTINUOUS set for feature) * The estimated number of bins :math:`n_\text{bins\_estimated}` is always smaller equal than the number of bins requested by the user :math:`n_\text{bins}`. .. math:: n_\text{bins\_estimated} \leq n_\text{bins} * The bin boundaries are chosen such that each bin contains at least a fraction of :math:`\frac{1}{n_\text{bins}}` of all values. **Guarantees for discrete features** (flags.UNORDERED or flags.ORDERED set for feature) * The estimated number of bins :math:`n_\text{bins\_estimated}` is equal to the number of unique values :math:`n_\text{unique\_values}` found. .. math:: n_\text{bins\_estimated} \Leftrightarrow n_\text{unique\_values} **Estimated parameters** Attributes ---------- bins_and_cdfs_ For each feature, a tuple containing * the column name or index * the epsilon used for comparisons to bin boundaries; it is the constructor parameter ``epsilon`` multiplied by the smallest bin width * and the :class:`numpy.ndarray` of shape ``(at most n_bins + 1, 2)`` This is a matrix containing the **bin boundaries** (column 0) and the **corresponding cumulative probabilities** (column 1) is learned in the fit. The matrix looks for one feature ``x`` like this: .. math:: \begin{pmatrix} x_\text{min} & P\left(X < x_\text{min}\right) = 0 \\ x_\text{boundary1} & P\left(X \leq x_\text{boundary1}\right) \\ x_\text{boundary2} & P\left(X \leq x_\text{boundary2}\right) \\ \ldots & \ldots \\ x_\text{max} & P\left(X \leq x_\text{max}\right) = 1 \\ \end{pmatrix} For mixed discrete and continuous features, there might be fewer than ``n_bins`` bins. For discrete features ``n_bins`` is ignored and the ``cdf`` is calculated for each unique value. type of `bins_and_cdfs_`: item :obj:`list` of :obj:`tuple` Examples -------- >>> feature_1 = np.asarray([2.1, 2.2, 2.5, 3.1, 3.3, 3.7, 4.1, 4.4]) >>> X = np.c_[feature_1] >>> eps = 1e-8 >>> from cyclic_boosting.binning import ECdfTransformer >>> trans = ECdfTransformer(n_bins=4, epsilon=eps) >>> trans = trans.fit(X) >>> # only one input column >>> column, epsilon, bins_cdfs = trans.bins_and_cdfs_[0] >>> assert column == 0 and np.allclose(epsilon, eps * 0.1) >>> bins_cdfs array([[ 2.1 , 0. ], [ 2.2 , 0.25], [ 3.1 , 0.5 ], [ 3.7 , 0.75], [ 4.4 , 1. ]]) >>> X_test = np.c_[[1.9, 2.4, 2.2, 3.6, 3.5, 4.3, 5.1]] >>> trans.transform(X_test) array([[ 0. ], [ 0.30555556], [ 0.25 ], [ 0.70833333], [ 0.66666667], [ 0.96428571], [ 1. ]]) """ def __init__( self, n_bins=100, feature_properties=None, weight_column=None, epsilon=1e-9, tolerance=0.1, ): self.n_bins = n_bins self.feature_properties = feature_properties self.weight_column = weight_column self.epsilon = epsilon self.tolerance = tolerance self.bins_and_cdfs_ = None @staticmethod def _normalize_bins(n_bins): if isinstance(n_bins, int): return defaultdict(ConstFunction(n_bins)) else: return n_bins
[docs] def fit(self, X, y=None): self._nbins_per_feature = self._normalize_bins(self.n_bins) self.bins_and_cdfs_ = [] if check_frame_empty(X): raise ValueError("Your input matrix for the binning is empty.") feature_columns = get_feature_column_names_or_indices(X, exclude_columns=[self.weight_column]) weights = get_weight_column(X, self.weight_column) for col in feature_columns: _logger.info("{0} column: {1}".format(self.__class__.__name__, col)) x_col = get_X_column(X, col) feature_prop = _read_feature_property(col, self.feature_properties) if feature_prop is None: continue bins_x, cdf_x, _wsum, _n_nan = calculate_cdf_from_weighted_data(x_col.astype(float), weights) if len(bins_x) == 0 or len(cdf_x) == 0: self.bins_and_cdfs_.append((col, self.epsilon, None)) continue if flags.is_ordered_set(feature_prop) or flags.is_unordered_set(feature_prop): bin_boundaries = np.r_[bins_x[0], bins_x] cdf = np.r_[0.0, cdf_x] else: bin_boundaries, cdf = reduce_cdf_and_boundaries_to_nbins( bins_x, cdf_x, self._nbins_per_feature[col], self.epsilon, self.tolerance, ) n = len(cdf) bins_and_cdfs = np.empty((n, 2)) bins_and_cdfs[:, 0] = bin_boundaries bins_and_cdfs[:, 1] = cdf epsilon = self.epsilon * minimal_difference(bin_boundaries) self.bins_and_cdfs_.append((col, epsilon, bins_and_cdfs)) return self
def _check_input_for_transform(self, X): if self.bins_and_cdfs_ is None: raise RuntimeError("Fit was not called before.") columns = get_feature_column_names_or_indices(X, exclude_columns=[self.weight_column]) if self.feature_properties is not None: columns = [col for col in columns if col in self.feature_properties] n_cols = len(columns) if n_cols != len(self.bins_and_cdfs_): raise ValueError( "Input Matrix X has not the same number" " of feature columns (%s) as " "the matrix in the fit (%s)." % (n_cols, len(self.bins_and_cdfs_)) )
[docs] def transform(self, X, y=None): self._check_input_for_transform(X) if check_frame_empty(X): return X Xnp = np.asarray(X, dtype=float) Xt = Xnp for col, epsilon, bins_and_cdfs in self.bins_and_cdfs_: j = get_column_index(X, col) feature_property = _read_feature_property(col, self.feature_properties) if feature_property is None: continue if bins_and_cdfs is not None: if flags.is_continuous_set(feature_property): le_interp_multi( bins_and_cdfs[:, 0], Xnp[:, j], bins_and_cdfs[:, 1], 0.0, epsilon, Xt[:, j], ) else: eq_multi( bins_and_cdfs[:, 0], Xnp[:, j], bins_and_cdfs[:, 1], epsilon, Xt[:, j], ) elif bins_and_cdfs is None: Xnp[:, j] = np.nan if isinstance(X, pd.DataFrame): return pd.DataFrame(Xt, columns=X.columns) else: return Xt
[docs] def get_feature_column_names_or_indices( X: Union[pd.DataFrame, np.ndarray], exclude_columns: Optional[Union[List[str], List[int]]] = None ) -> Union[List[str], List[int]]: """ Extract the column names from `X`. If `X` is a numpy matrix each column is labeled with an integer starting from zero. :param X: input matrix :type X: numpy.ndarray(dim=2) or pandas.DataFrame :param exclude_columns: column names or indices to omit. :type exclude_columns: list of int or str :rtype: list >>> X = np.c_[[0, 1], [1,0], [3, 5]] >>> from cyclic_boosting.binning import get_feature_column_names_or_indices >>> get_feature_column_names_or_indices(X) [0, 1, 2] >>> get_feature_column_names_or_indices(X, exclude_columns=[1]) [0, 2] >>> get_feature_column_names_or_indices(X, exclude_columns=[1, 1]) [0, 2] >>> get_feature_column_names_or_indices(X, exclude_columns=[0, 1, 2]) [] >>> X = pd.DataFrame(X, columns = ['b', 'c', 'a']) >>> get_feature_column_names_or_indices(X, exclude_columns=['a']) ['b', 'c'] >>> get_feature_column_names_or_indices(X, exclude_columns=['d']) ['b', 'c', 'a'] """ if isinstance(X, pd.DataFrame): columns = list(X.columns) elif isinstance(X, np.ndarray): assert X.ndim == 2, "X must be a 2D matrix" columns = list(range(0, X.shape[1])) else: raise ValueError("X must be a pandas.DataFrame or a numpy.ndarray") if exclude_columns is not None: exclude_columns = set(exclude_columns) return [x for x in columns if x not in exclude_columns] else: return columns
[docs] def get_weight_column(X, weight_column=None): """ Check if a weight column is present and return it if possible. If no weight columns is present in `X` a weight column with only ``ones`` of same length than `X` is created and returned. :param X: Samples feature matrix. :type X: numpy.ndarray(dim=2) or pandas.DataFrame :param weight_column: Name or index of the weight column or None. :type weight_column: int or string or ``NoneType`` :rtype: numpy.ndarray >>> X = np.c_[[0., 1], [1,0], [3, 5]] >>> from cyclic_boosting.binning import get_weight_column >>> get_weight_column(X) array([ 1., 1.]) >>> get_weight_column(X, 0) array([ 0., 1.]) >>> get_weight_column(X, 2) array([ 3., 5.]) >>> X = pd.DataFrame(X, columns = ['b', 'c', 'a']) >>> get_weight_column(X) array([ 1., 1.]) >>> get_weight_column(X, 'c') array([ 1., 0.]) """ if weight_column is not None: if isinstance(X, pd.DataFrame): try: return np.asarray(X[weight_column]) except: raise ValueError("Weight column {} not found in X.".format(str(weight_column))) else: try: return X[:, weight_column] except: raise ValueError("Index {} defining weight column not found in X.".format(str(weight_column))) else: return np.ones(X.shape[0], dtype=np.float64)
[docs] def reduce_cdf_and_boundaries_to_nbins(bins_x, cdf_x, n_bins, epsilon, tolerance): """ Section the cdf spectrum into `n_bin` parts of equal statistics, and find all events beloning into these bins by filtering all suitable events in the event-wise `cdf_x` array. Often, events cannot be distributed exactly with equal statistics over all bins, therefore the ``tolerance`` argument allows for bins to be of a weight below 1.0 / n_bins. A minimum weight of 1.0 / n_bins - tolerance per bin is guaranteed. This function is used internally in the method :meth:`cyclic_boosting.binning.ECdfTransformer`. Parameters ---------- bins_x: np.ndarray strictly increasing array containing all bin boundaries, length is the number of evenets. cdf_x: np.ndarray Strictly increasing array containing the cdf values corresponding to the bin boundaries in `bin_x`. Contains one value for each event. n_bins: int Maximum number of bins that ought to be returned. This also determines the minimum weight per bin, which is 1 / n_bins. epsilon: double Threshold for the comparison of CDFs tolerance: double Relative tolerance of the minimum bin weight. (E.g. if you specify 100 bins and a tolerance of 0.05 the bins are required to have only 0.95% of the total bin weights instead of 1.0%) Returns ------- The ``reduced`` input arrays `bins_x` and `cdf_x`, now with **maximum** length n_bins, tuple of numpy.ndarrays(dim=1) """ if n_bins < 2: raise ValueError("N_bins = %s has to greater than 1!", n_bins) n_cdf = n_bins + 1 bin_boundaries = np.zeros(n_cdf, dtype=np.float64) cdf = np.zeros(n_cdf, dtype=np.float64) bin_boundaries[0] = bins_x[0] n = cdf_x.shape[0] index = 0 cdf_share = 1.0 / n_bins previous_cdf = 0.0 for i in range(1, n_bins + 1): cdf_rest = previous_cdf % cdf_share cdf_searched = previous_cdf + cdf_share if cdf_rest <= tolerance * cdf_share: cdf_searched -= cdf_rest if cdf_searched <= 1.0 + tolerance * cdf_share: index = ge_lim(cdf_x, cdf_searched - epsilon, 1, index, n) previous_cdf = cdf_x[index] cdf[i] = cdf_x[index] bin_boundaries[i] = bins_x[index] else: cdf[i - 1] = 1.0 bin_boundaries[i - 1] = bins_x[n - 1] n_cdf = i break return np.array(bin_boundaries[:n_cdf]), np.array(cdf[:n_cdf])
[docs] def get_X_column(X, column, array_for_1_dim=True): """ Picks columns from :class:`pandas.DataFrame` or :class:`numpy.ndarray`. Parameters ---------- X: :class:`pandas.DataFrame` or :class:`numpy.ndarray` Data Source from which columns are picked. column: The format depends on the type of X. For :class:`pandas.DataFrame` you can give a string or a list/tuple of strings naming the columns. For :class:`numpy.ndarray` an integer or a list/tuple of integers indexing the columns. array_for_1_dim: bool In default mode (set to True) the return type for a one dimensional access is a np.ndarray with shape (n, ). If set to False it is a np.ndarray with shape (1, n). """ if isinstance(column, tuple): column = list(column) if not array_for_1_dim: if not isinstance(column, list): column = [column] else: if isinstance(column, list) and len(column) == 1: column = column[0] if isinstance(X, pd.DataFrame): return X[column].values else: return X[:, column]
[docs] def calculate_cdf_from_weighted_data(z, w): """ Calculate the cdf value for each unique value in `z` weighted with the sample weights in `w`. All values not finite values in `z` and unique values of z with weight zero are ignored. Parameters ---------- z: numpy.ndarray of float64 input array w: numpy.ndarray sample weights Returns ------- tuple of two :class:`numpy.ndarray`, a double and an int Tuple consisting of an array containing the valid unique `z` values, an array containing the cdf values for the valid `z` values, the total weight sum and the number of non finite values in `z`. Examples -------- >>> z = np.array([1., 2., 3., 4., 5., 6., np.nan, 6.]) >>> w = np.array([4., 2., 2., 1., 0., 1., 1., 0.]) >>> z_unique, cdfs, wsum, n_nan = calculate_cdf_from_weighted_data(z, w) >>> wsum 10.0 >>> n_nan 1 >>> z_unique # array of unique values of z array([ 1., 2., 3., 4., 6.]) >>> cdfs # corresponding cdf values to z_unique array([ 0.4, 0.6, 0.8, 0.9, 1. ]) """ if z.shape[0] != w.shape[0]: raise ValueError("input vectors must be of same shape") n_nan = np.count_nonzero(np.isnan(z)) z_unique = np.unique(z[~np.isnan(z)]) wsum = np.nansum(w[~np.isnan(z)]) # Accumulate the weights for the unique values. uniques = pd.DataFrame({"z": z, "w": w}).groupby(["z"]).agg({"w": "sum"}).reset_index() uniques = uniques.loc[uniques["w"] != 0] cdf = np.nancumsum(uniques["w"]) / wsum return z_unique, cdf, wsum, n_nan