from __future__ import absolute_import, division, print_function
import abc
import logging
import warnings
import numexpr
import numpy as np
import pandas as pd
from pandas.api.extensions import ExtensionArray
import scipy.special
import six
from sklearn import base as sklearnb
from cyclic_boosting import common_smoothers, learning_rate, link
from cyclic_boosting.binning import get_feature_column_names_or_indices
from cyclic_boosting.common_smoothers import SmootherChoice
from cyclic_boosting.features import create_features, Feature, FeatureList, FeatureTypes, create_feature_id
from cyclic_boosting.link import IdentityLinkMixin, LogLinkMixin
from cyclic_boosting.utils import (
slice_finite_semi_positive,
nans,
get_X_column,
ConvergenceError,
ConvergenceParameters,
)
from cyclic_boosting.utils import get_normalized_values
from typing import Any, Dict, Optional, Union, Tuple, List, Set
_logger = logging.getLogger(__name__)
def get_influence_category(feature: Feature, influence_categories: dict) -> Union[None, str]:
influence_category = None
if influence_categories is not None:
influence_category = influence_categories.get(feature.feature_group, None)
if (
influence_category is None
): # this is due to the flaws in create features, i would propose to only allow tuple
if len(feature.feature_group) == 1:
fg = feature.feature_group[0]
influence_category = influence_categories.get(fg, None)
if influence_category is None:
raise KeyError(f"Please add {feature.feature_group} to influence_categories")
return influence_category
class UpdateMixin(object):
def __init__(self, df: Optional[pd.DataFrame] = None, price_feature_seen: Optional[bool] = None):
self.df = df
self.price_feature_seen = price_feature_seen
def include_price_contrib(self, pred: np.ndarray) -> None:
self.df["exponents"] += pred
self.price_feature_seen = True
def include_factor_contrib(self, pred: np.ndarray, influence_category: Union[None, str]) -> None:
self.df["factors"] += pred
if influence_category is not None:
if influence_category in self.df.columns:
self.df[influence_category] += pred
else:
self.df[influence_category] = pred
def remove_price_contrib(self, pred) -> None:
self.df["exponents"] -= pred
def remove_factor_contrib(self, pred: np.ndarray, influence_category: Union[None, str]) -> None:
self.df["factors"] -= pred
if influence_category is not None:
if influence_category in self.df.columns:
self.df[influence_category] -= pred
else:
self.df[influence_category] = pred
def update_predictions(
self, pred: np.ndarray, feature: Feature, influence_categories: Optional[dict] = None
) -> None:
if feature.feature_type == FeatureTypes.external:
self.include_price_contrib(pred)
else:
self.include_factor_contrib(pred, get_influence_category(feature, influence_categories))
def remove_predictions(
self, pred: np.ndarray, feature: Feature, influence_categories: Optional[dict] = None
) -> None:
if feature.feature_type == FeatureTypes.external:
self.remove_price_contrib(pred)
else:
self.remove_factor_contrib(pred, get_influence_category(feature, influence_categories))
class CBLinkPredictionsFactors(UpdateMixin):
"""Support for prediction of type log(p) = factors"""
def __init__(self, predictions: np.ndarray):
super().__init__(df=pd.DataFrame({"factors": predictions}))
def predict_link(self) -> Union[ExtensionArray, np.ndarray]:
return self.factors()
def factors(self) -> Union[ExtensionArray, np.ndarray]:
return self.df["factors"].values
class ZeroSmoother(sklearnb.BaseEstimator, sklearnb.RegressorMixin):
def fit(self, X_for_smoother: np.ndarray, y: np.ndarray) -> Union[sklearnb.BaseEstimator, sklearnb.RegressorMixin]:
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return np.repeat(0.0, len(X))
def _predict_factors(feature: Feature, X_for_smoother: np.ndarray, neutral_factor: float) -> np.ndarray:
isfinite_all = slice_finite_semi_positive(X_for_smoother)
prediction_smoother = nans(len(X_for_smoother))
if isfinite_all.any() and (feature.smoother is not None):
prediction_smoother[isfinite_all] = feature.learn_rate * feature.smoother.predict(X_for_smoother[isfinite_all])
prediction_smoother[~isfinite_all] = feature.factors_link[-1]
prediction_smoother[~np.isfinite(prediction_smoother)] = neutral_factor
return prediction_smoother
def _factors_deviation(features: FeatureList) -> float:
"""Calculate mean absolute deviation of all factors.
Parameters
----------
features: :class:`~cyclic_boosting.base.FeatureList`
Collection of all features.
"""
mean_factor_devs = [np.mean(np.abs(feature.factors_link - feature.factors_link_old)) for feature in features]
return np.mean(mean_factor_devs)
[docs]
@six.add_metaclass(abc.ABCMeta)
class CyclicBoostingBase(
link.LinkFunction,
sklearnb.BaseEstimator,
):
r"""The ``CyclicBoostingBase`` class implements the core skeleton for all
Cyclic Boosting algorithms. It learns parameters (e.g., factors) for each
feature-bin combination and applies smoothing algorithms (supplied by the
``smoother`` argument) on the parameters.
By virtue of the ``feature_group`` parameter, each
:mod:`cyclic_boosting` estimator can consider one- or higher-dimensional
feature combinations.
The parameter estimates are iteratively optimized until one of the stop
criteria (``min_loss_change``, ``minimal_factor_change``,
``maximal_iterations``) is reached.
Parameters
----------
feature_groups: sequence of column labels
(:obj:`str` or :obj:`int`) or tuples of such labels or
:class:`cyclic_boosting.base.FeatureID`.
For each feature or feature tuple in the sequence, a
one- or multidimensional factor profile will be determined,
respectively, and used in the prediction.
If this argument is omitted, all columns except a possible
``weight_column`` are considered as one-dimensional feature_groups.
hierarchical_feature_groups: sequence of column labels
(:obj:`str` or :obj:`int`) or tuples of such labels or
:class:`cyclic_boosting.base.FeatureID`.
In the first three iterations of the training, only the feature groups
defined here are used, i.e., all other feature groups are excluded.
From the fourth iteration onwards, all feature groups are used. The
idea of such hierarchical iterations is to support the modeling of
hierarchical or causal effects (e.g., mitigate confounding).
If this argument is not explicitly set, no such hierarchical iterations
are run.
feature_properties: :obj:`dict` of :obj:`int`
Dictionary listing the names of all features for the training as keys
and their pre-processing flags as values. When using a numpy feature
matrix X with no column names the keys of the feature properties are
the column indices.
By default, ``flags.HAS_MISSING`` is assumed.
If ``flags.MISSING_NOT_LEARNED`` is set for a feature group,
the neutral factor 1 is used for all non-finite values.
weight_column: string or int
Column name or index used to include a weight column presented in X.
If weight_column is ``None`` (default) equal weights for all samples
are used.
prior_prediction_column: string, int or None
Column name or index used to include prior predictions. Instead of
the target mean (``prior_prediction_column=None``), the predictions
found in the ``prior_prediction_column`` column are used as a base
model in :meth:`fit` and :meth:`predict`.
minimal_loss_change: float
Stop if the relative loss change of current
and previous prediction falls below this value.
minimal_factor_change: float
Stop if the mean of the factor change falls below this value. The
mean of the factor change is the mean of all absolute factor
differences between the current and the previous iteration for each
factor.
maximal_iterations: int
Maximal number of iteration.
observers : list
list of observer objects from the
:mod:`~cyclic_boosting.observers` module.
smoother_choice: subclass of :class:`cyclic_boosting.SmootherChoice`
Selects smoothers
output_column: string
In case of the usage as a transformer, this column is added and
filled with the predictions
learn_rate: Functor or None
Functor that defines the learning rate of each cyclic boosting iteration.
It has to satisfy the interface of type :func:`learning_rate.constant_learn_rate_one`.
If None is specified, which is the default, the learning rate is always 1.
aggregate: boolean or None
Description is required
Notes
-----
**Preconditions**
* Non-negative integer values starting at 0 **without gaps** are expected
in each feature column.
* The number of unique values in each feature column ``n_unique_feature``
should be much smaller than the number of samples ``n``.
Features transformed by
:class:`cyclic_boosting.binning.BinNumberTransformer` satisfy these
preconditions.
* The target ``y`` has to be positive semi-definite.
Attributes
----------
global_scale_link_ : float
The global scale of the target ``y`` in the linked space.
stop_criteria_ : tuple
tuple of three stop criteria are satisfied:
``(stop_iterations, stop_factor_change, stop_loss_change)``
"""
supports_pandas = True
inc_fitting = False
no_deepcopy = {"feature_properties"}
def __init__(
self,
feature_groups=None,
hierarchical_feature_groups=None,
training_iterations_hierarchical_features=3,
feature_properties: Optional[Dict[int, int]] = None,
weight_column: Optional[Union[str, int, None]] = None,
prior_prediction_column: Optional[Union[str, int, None]] = None,
minimal_loss_change: Optional[float] = 1e-6,
minimal_factor_change: Optional[float] = 1e-4,
maximal_iterations: Optional[int] = 10,
observers: Optional[list] = None,
smoother_choice: Optional[SmootherChoice] = None,
output_column: Optional[str] = None,
learn_rate: Optional[float] = None,
aggregate: Optional[bool] = True,
):
if smoother_choice is None:
self.smoother_choice = common_smoothers.SmootherChoiceWeightedMean()
else:
self.smoother_choice = smoother_choice
if not isinstance(smoother_choice, common_smoothers.SmootherChoice):
raise ValueError("smoother_choice needs to be of type SmootherChoice")
self.feature_groups = feature_groups
self.hierarchical_feature_groups = hierarchical_feature_groups
self.feature_properties = feature_properties
self.features = None
self.hierarchical_features = []
if self.hierarchical_feature_groups is not None:
for fg in self.hierarchical_feature_groups:
hierarchical_feature = create_feature_id(fg)
self.hierarchical_features.append(hierarchical_feature.feature_group)
self.training_iterations_hierarchical_features = training_iterations_hierarchical_features
self.feature_importances = {}
self.aggregate = aggregate
self.weight_column = weight_column
self.weights = None
self.prior_prediction_column = prior_prediction_column
self.prior_pred_link_offset_ = 0
self.global_scale_link_ = None
self._check_parameters()
self.minimal_loss_change = minimal_loss_change
self.minimal_factor_change = minimal_factor_change
self.maximal_iterations = maximal_iterations
self.observers = observers
if self.observers is None:
self.observers = []
self.iteration_ = None
self.insample_loss_ = None
self.stop_criteria_ = None
self.neutral_factor_link = 0.0
self.output_column = output_column
if learn_rate is None:
self.learn_rate = learning_rate.half_linear_learn_rate
else:
self.learn_rate = learn_rate
if hierarchical_feature_groups is not None:
self.learn_rate = learning_rate.constant_learn_rate_one
self._init_features()
[docs]
def loss(self, prediction: np.ndarray, y: np.ndarray, weights: np.ndarray) -> np.ndarray:
if not len(y) > 0:
raise ValueError("Loss cannot be computed on empty data")
else:
sum_weighted_error = numexpr.evaluate("sum((prediction - y) * (prediction - y) * weights)")
sum_weights = numexpr.evaluate("sum(weights)")
return sum_weighted_error / sum_weights
@property
def global_scale_(self) -> np.ndarray:
"""
Obtain the global scale in the space of the original target ``y``.
"""
return self.unlink_func(self.global_scale_link_)
[docs]
def learning_rate(self, feature: Feature) -> float:
lr = self.learn_rate(self.iteration_ + 1, self.maximal_iterations, feature)
return lr
def _init_weight_column(self, X: Union[pd.DataFrame, np.ndarray]) -> None:
"""Sets the ``weights``.
If a ``weight_column`` is given, it is retrieved
from ``X``, else all weights are set to one.
Parameters
----------
X: np.ndarray
samples features matrix
"""
if self.weight_column is None:
self.weights = np.ones(len(X))
else:
self.weights = get_X_column(X, self.weight_column)
def _init_external_column(self, X: Union[pd.DataFrame, np.ndarray], is_fit: bool) -> None:
"""Init the external column.
Parameters
----------
X: np.ndarray
samples features matrix
is_fit: bool
are we called in fit() ?
"""
def _init_default_feature_groups(self, X: Union[pd.DataFrame, np.ndarray]) -> None:
"""
Initializes the ``feature_groups`` to be a list of all
columns available in ``X`` except the ``prior_prediction_column`` and
the ``weight_column``.
"""
exclude_columns = []
if self.prior_prediction_column is not None:
exclude_columns.append(self.prior_prediction_column)
if self.weight_column is not None:
exclude_columns.append(self.weight_column)
self.feature_groups = get_feature_column_names_or_indices(X, exclude_columns=exclude_columns)
def _init_global_scale(self, X: Union[pd.DataFrame, np.ndarray], y: np.ndarray) -> None:
"""
Initializes the ``global_scale_link_`` and the
``prior_pred_link_offset_``. The ``global_scale_link_`` is set to the
weighted mean of the target ``y`` transformed into the link space.
The ``prior_pred_link_offset_`` is the difference between the
``global_scale_link_`` and the weighted mean of the prior prediction
transformed to the link space.
"""
if self.weights is None:
raise RuntimeError("The weights have to be initialized.")
minimal_prediction = 0.001
self.global_scale_link_ = self.link_func(np.sum(y * self.weights) / np.sum(self.weights) + minimal_prediction)
if self.prior_prediction_column is not None:
prior_pred = get_X_column(X, self.prior_prediction_column)
finite = np.isfinite(prior_pred)
if not np.all(finite):
_logger.warning(
"Found a total number of {} non-finite values in the prior prediction column".format(
np.sum(~finite)
)
)
prior_pred_mean = np.sum(prior_pred[finite] * self.weights[finite]) / np.sum(self.weights[finite])
prior_pred_link_mean = self.link_func(prior_pred_mean)
if np.isfinite(prior_pred_link_mean):
self.prior_pred_link_offset_ = self.global_scale_link_ - prior_pred_link_mean
else:
warnings.warn(
"The mean prior prediction in link-space is not finite. "
"Therefore no individualization is done "
"and no prior mean subtraction is necessary."
)
self.prior_pred_link_offset_ = float(self.global_scale_link_)
def _check_weights(self) -> None:
if self.weights is None:
raise RuntimeError("The weights have to be initialized.")
def _init_features(self) -> None:
"""
Initializes the ``features`` and binds the data belonging to a feature
to it.
"""
self.features = create_features(self.feature_groups, self.feature_properties, self.smoother_choice)
def _get_prior_predictions(self, X: Union[pd.DataFrame, np.ndarray]) -> np.ndarray:
if self.prior_prediction_column is None:
prior_prediction_link = np.repeat(self.global_scale_link_, X.shape[0])
else:
prior_pred = get_X_column(X, self.prior_prediction_column)
prior_prediction_link = self.link_func(prior_pred) + self.prior_pred_link_offset_
finite = np.isfinite(prior_prediction_link)
prior_prediction_link[~finite] = self.global_scale_link_
return prior_prediction_link
def _feature_contribution(self, feature: Feature, contribution: np.ndarray) -> np.ndarray:
"""The contribution of a feature group to the full model in link space.
The link spaces have been chosen such that contributions in link space
are always additive.
Parameters
----------
feature: :class:`Feature`
feature providing the contribution
contribution: ndarray
the smoothed values in link space for input samples and the given
feature group
Returns
-------
ndarray
the contribution to the full model in link space;
"""
return contribution
[docs]
def is_diverged(self, loss) -> bool:
return loss > self.initial_loss_ * 2
def _check_convergence(self) -> None:
msg = (
"Your cyclic boosting training seems to be "
"diverging. In the {0}. iteration the "
"current loss: {1}, is greater "
"than the trivial loss with just mean "
"predictions: {2}.".format(self.iteration_ + 1, self.insample_loss_, self.initial_loss_)
)
if self.is_diverged(self.insample_loss_):
self.is_diverging = True
self.diverging += 1
warnings.warn(msg)
if self.diverging >= 5:
raise ConvergenceError(msg)
else:
self.is_diverging = False
def _update_loss(self, prediction: np.ndarray, y: np.ndarray) -> float:
insample_loss_old = self.insample_loss_
self.insample_loss_ = self.loss(prediction, y, self.weights)
self.insample_msd_ = self.insample_loss_
loss_change = insample_loss_old - self.insample_loss_
self._check_convergence()
if insample_loss_old != 0:
loss_change /= insample_loss_old
return loss_change
def _log_iteration_info(self, convergence_parameters: ConvergenceParameters) -> None:
if self.iteration_ == 0:
_logger.info("Factor model iteration 1")
else:
_logger.info(
"Factor model iteration: {iteration}, "
"factors_deviation = {deviation}, "
"loss_change = {loss_change}".format(
iteration=self.iteration_ + 1,
loss_change=convergence_parameters.loss_change,
deviation=convergence_parameters.delta,
)
)
def _call_observe_feature_iterations(
self, iteration: int, i: int, X: np.ndarray, y: np.ndarray, prediction: np.ndarray
) -> None:
for observer in self.observers:
observer.observe_feature_iterations(iteration, i, X, y, prediction, self.weights, self.get_state())
def _call_observe_iterations(self, iteration, X, y, prediction, delta) -> None:
for observer in self.observers:
observer.observe_iterations(iteration, X, y, prediction, self.weights, self.get_state(), delta)
[docs]
def get_state(self) -> Dict[str, Any]:
est_state = {
"link_function": self,
"features": self.features,
"globale_scale": self.global_scale_,
"insample_loss": self.insample_loss_,
}
if (
self.hierarchical_feature_groups is not None
and self.iteration_ < self.training_iterations_hierarchical_features
):
est_state["features"] = [
feature for feature in self.features if feature.feature_group in self.hierarchical_features
]
return est_state
[docs]
def remove_preds(self, pred: CBLinkPredictionsFactors, X: np.ndarray) -> None:
for feature in self.features:
if feature.feature_type is None:
weights = self.weights
else:
weights = self.weights_external
feature.bind_data(X, weights)
feature_predictions = self._pred_feature(X, feature, True)
pred.remove_predictions(feature_predictions, feature)
feature.fitted_aggregated -= feature.factors_link
feature.unbind_data()
def _set_factors_per_feature(self, feature: Feature) -> Tuple[float, int]:
if feature.feature_type is not None:
if len(feature.feature_group) == 1:
min_f = 0.05
max_f = 20
else:
min_f = 0.1
max_f = 5
else:
if len(feature.feature_group) == 1:
min_f = 0.01
max_f = 1000
else:
min_f = 0.1
max_f = 100
return min_f, max_f
[docs]
def visit_factors(
self, feature: Feature, unfitted_factors, X: np.ndarray, y: np.ndarray, pred: CBLinkPredictionsFactors
) -> None:
clip = isinstance(self, LogLinkMixin)
if clip and self.aggregate:
# Maximal/Minimal updates
feature.factors_link = np.clip(feature.factors_link, np.log(0.5), np.log(2), out=feature.factors_link)
# Absolute size of factors/exponents per feature
if feature.is_fitted:
min_f, max_f = self._set_factors_per_feature(feature=feature)
feature.factors_link = np.clip(
feature.factors_link,
np.log(min_f) - feature.fitted_aggregated,
np.log(max_f) - feature.fitted_aggregated,
out=feature.factors_link,
)
if self.is_diverging:
def check_diverged() -> bool:
feature_predictions = self._pred_feature(X, feature, True)
pred.update_predictions(feature_predictions, feature)
prediction = self.unlink_func(pred.predict_link())
loss = self.loss(prediction, y, self.weights)
pred.remove_predictions(feature_predictions, feature)
return self.is_diverged(loss)
if self.diverging > 1:
if check_diverged():
msg = "Feature factors for {} will be ignored due to diverging loss".format(
feature.feature_group
)
warnings.warn(msg)
feature.factors_link = -feature.fitted_aggregated
feature.smoother = ZeroSmoother()
elif check_diverged():
msg = "Feature factors for {} will be clipped due to diverging loss".format(feature.feature_group)
warnings.warn(msg)
np.clip(
feature.factors_link,
unfitted_factors + np.log(0.8),
unfitted_factors + np.log(1.2),
out=feature.factors_link,
)
if feature.is_fitted:
if self.aggregate:
feature.fitted_aggregated += feature.factors_link
else:
feature.fitted_aggregated = feature.factors_link.copy()
else:
feature.fitted_aggregated = feature.factors_link.copy()
feature.is_fitted = True
[docs]
def feature_iteration(
self, X: np.ndarray, y: np.ndarray, feature: Feature, pred: CBLinkPredictionsFactors, prefit_data
):
if not self.aggregate:
feature_predictions = self._pred_feature(X, feature, True)
pred.remove_predictions(feature_predictions, feature)
factors_link, uncertainties_link = self.calc_parameters(feature, y, pred, prefit_data=prefit_data)
X_for_smoother = feature.update_factors(
factors_link.copy(),
uncertainties_link,
self.neutral_factor_link,
self.learning_rate(feature),
)
feature.factors_link = _predict_factors(
feature=feature, X_for_smoother=X_for_smoother[:, : feature.dim], neutral_factor=self.neutral_factor_link
)
feature.factors_link = self.calibrate_to_weighted_mean(feature)
self.visit_factors(feature, factors_link, X, y, pred)
feature_predictions = self._pred_feature(X, feature, is_fit=True)
pred.update_predictions(feature_predictions, feature)
return pred
[docs]
def cb_features(
self, X: np.ndarray, y: np.ndarray, pred: CBLinkPredictionsFactors, prefit_data
) -> Tuple[int, Any, Any]:
for i, feature in enumerate(self.features):
if feature.feature_type is None:
weights = self.weights
else:
weights = self.weights_external
if self.iteration_ == 0:
feature.bind_data(X, weights)
feature.factors_link = np.ones(feature.n_bins) * self.neutral_factor_link
if prefit_data is not None:
prefit_data[i] = self.precalc_parameters(feature, y, pred)
else:
feature.bind_data(X, weights)
yield i, feature, prefit_data[i]
[docs]
def fit(
self, X: Union[pd.DataFrame, np.ndarray], y: Optional[np.ndarray] = None
) -> Union[link.LinkFunction, sklearnb.BaseEstimator]:
_ = self._fit_predict(X, y)
return self
def _init_fit(self, X: pd.DataFrame, y: np.ndarray) -> None:
self._check_len_data(X, y)
self._check_y(y)
self._init_weight_column(X)
self._init_external_column(X, True)
if self.feature_groups is None:
self._init_default_feature_groups(X)
self._check_weights()
self._init_features()
self._init_global_scale(X, y)
def _fit_main(self, X: np.ndarray, y: np.ndarray, pred: CBLinkPredictionsFactors) -> np.ndarray:
self.diverging = 0
self.is_diverging = False
_logger.info("Cyclic Boosting global scale {}".format(self.global_scale_))
prediction = self.unlink_func(pred.predict_link())
self.insample_loss_ = self.loss(prediction, y, self.weights)
self.initial_loss_ = self.insample_loss_
self.initial_msd_ = self.insample_loss_
self.iteration_ = 0
prefit_data = [None for _ in self.features]
convergence_parameters = ConvergenceParameters()
while (not self._check_stop_criteria(self.iteration_, convergence_parameters)) or self.is_diverging:
self._call_observe_iterations(self.iteration_, X, y, prediction, convergence_parameters.delta)
self._log_iteration_info(convergence_parameters)
for i, feature, pf_data in self.cb_features(X, y, pred, prefit_data):
if (
self.hierarchical_feature_groups is not None
and self.iteration_ < self.training_iterations_hierarchical_features
and feature.feature_group not in self.hierarchical_features
):
feature.factors_link_old = feature.factors_link.copy()
continue
pred = self.feature_iteration(X, y, feature, pred, pf_data)
self._call_observe_feature_iterations(self.iteration_, i, X, y, prediction)
if feature.factor_sum is None:
feature.factor_sum = [np.sum(np.abs(feature.fitted_aggregated))]
else:
feature.factor_sum.append(np.sum(np.abs(feature.fitted_aggregated)))
prediction = self.unlink_func(pred.predict_link())
updated_loss_change = self._update_loss(prediction, y)
convergence_parameters.set_loss_change(updated_loss_change=updated_loss_change)
updated_delta = _factors_deviation(self.features)
convergence_parameters.set_delta(updated_delta=updated_delta)
if self.is_diverging:
self.remove_preds(pred, X)
self.iteration_ += 1
# compute feature importances
self.set_feature_importances()
if len(self.observers) > 0:
self.prepare_plots(X, y, prediction)
self._call_observe_iterations(-1, X, y, prediction, convergence_parameters.delta)
self._check_convergence()
_logger.info("Cyclic Boosting, final global scale {}".format(self.global_scale_))
for feature in self.features:
feature.clear_feature_reference(observers=self.observers)
return prediction
[docs]
def prepare_plots(self, X: np.ndarray, y: np.ndarray, prediction: np.ndarray) -> None:
for feature in self.features:
if feature.feature_type is None:
weights = self.weights
else:
weights = self.weights_external
feature.bind_data(X, weights)
sum_w, sum_yw, sum_pw = (
np.bincount(feature.lex_binned_data, weights=w, minlength=feature.n_bins)
for w in [weights, weights * y, weights * prediction]
)
mean_target_binned = (sum_yw + 1) / (sum_w + 1)
if feature.n_bins > 1:
mean_y_finite = np.sum(sum_yw[:-1]) / np.sum(sum_w[:-1])
mean_prediction_finite = np.sum(sum_pw[:-1]) / np.sum(sum_w[:-1])
else:
mean_y_finite = np.mean(y)
mean_prediction_finite = np.mean(prediction)
mean_prediction_binned = (sum_pw + 1) / (sum_w + 1)
if isinstance(self, IdentityLinkMixin):
feature.mean_dev = mean_prediction_binned - mean_target_binned
feature.y = mean_target_binned - mean_y_finite
feature.prediction = mean_prediction_binned - mean_prediction_finite
else:
feature.mean_dev = np.log(mean_prediction_binned + 1e-12) - np.log(mean_target_binned + 1e-12)
feature.y = np.log(mean_target_binned / mean_y_finite + 1e-12)
feature.prediction = np.log(mean_prediction_binned / mean_prediction_finite + 1e-12)
feature.y_finite = mean_y_finite
feature.prediction_finite = mean_prediction_finite
feature.learn_rate = 1.0
def _fit_predict(self, X: Union[pd.DataFrame, np.ndarray], y: Optional[np.ndarray] = None) -> np.ndarray:
"""Fit the estimator to the training samples.
Iterate to calculate the factors and the global scale.
"""
y = np.asarray(y)
self._init_fit(X, y)
pred = CBLinkPredictionsFactors(self._get_prior_predictions(X))
prediction = self._fit_main(X, y, pred)
del self.weights
return prediction
def _pred_feature(self, X: Union[pd.DataFrame, np.ndarray], feature: Feature, is_fit: bool):
if is_fit:
return feature.factors_link[feature.lex_binned_data]
else:
X_for_predict = get_X_column(X, feature.feature_group, array_for_1_dim=False)
pred = _predict_factors(feature, X_for_predict, self.neutral_factor_link)
return pred
[docs]
def predict(self, X: Union[pd.DataFrame, np.ndarray], y: Optional[np.ndarray] = None) -> np.ndarray:
pred = self.predict_extended(X, None)
return self.unlink_func(pred.predict_link())
[docs]
def predict_extended(
self, X: Union[pd.DataFrame, np.ndarray], influence_categories: Optional[dict] = None
) -> CBLinkPredictionsFactors:
self._check_fitted()
self._init_external_column(X, False)
prediction_link = self._get_prior_predictions(X)
pred = CBLinkPredictionsFactors(prediction_link)
for feature in self.features:
feature_predictions = self._pred_feature(X, feature, is_fit=False)
pred.update_predictions(feature_predictions, feature, influence_categories)
return pred
def _check_stop_criteria(self, iterations: int, convergence_parameters: ConvergenceParameters) -> bool:
"""
Checks the stop criteria and returns True if at least one is satisfied.
You can check the stop criteria in the estimated parameter
`stop_criteria_`.
:rtype: bool
"""
stop_iterations = False
stop_factor_change = False
stop_loss_change = False
veto_hierarchical = False
delta = convergence_parameters.delta
loss_change = convergence_parameters.loss_change
if iterations >= self.maximal_iterations:
_logger.info(
"Cyclic Boosting stopped because the number of "
"iterations reached the maximum of {}.".format(self.maximal_iterations)
)
stop_iterations = True
if delta <= self.minimal_factor_change:
_logger.info(
"Cyclic Boosting stopped because the change of "
"the factors {0} was lower than the "
"required minimum {1}.".format(delta, self.minimal_factor_change)
)
stop_factor_change = True
if abs(loss_change) <= self.minimal_loss_change:
_logger.info(
"Cyclic Boosting stopped because the change of "
"the loss {0} was lower "
"than the required minimum {1}.".format(loss_change, self.minimal_loss_change)
)
stop_loss_change = True
if loss_change < 0:
_logger.warning(
"Encountered negative change of loss. "
"This might not be a problem, as long as the "
"model converges. Check the LOSS changes in the "
"analysis plots."
)
if (
iterations <= self.training_iterations_hierarchical_features
and self.hierarchical_feature_groups is not None
):
veto_hierarchical = True
self.stop_criteria_ = (stop_iterations, stop_factor_change, stop_loss_change)
return (stop_iterations or stop_factor_change or stop_loss_change) and not veto_hierarchical
def _check_parameters(self) -> None:
if self.feature_groups is not None and len(self.feature_groups) == 0:
raise ValueError("Please add some elements to `feature_groups`")
def _check_fitted(self) -> None:
"""Check if fit was called"""
if self.features is None:
raise ValueError("fit must be called first.")
def _check_len_data(self, X: Union[pd.DataFrame, np.ndarray], y: np.ndarray) -> None:
"""Check that input arrays are not empty"""
if len(X) == 0 | len(y) == 0:
raise ValueError("Estimator should be applied on non-empty data")
def _check_y(self, y: np.ndarray) -> None:
"""Check that y has the correct values. Has to be
implemented by the child class.
"""
raise NotImplementedError(
"You have to implement the function _check_y" " to ensure your target has correct values."
)
[docs]
def get_subestimators_as_items(self, prototypes=True) -> List[Tuple]:
"""
For prototypes=False, the clones are indexed by the column name
or index.
"""
if prototypes:
return []
else:
self._check_fitted()
return [(feature.feature_id, feature.smoother) for feature in self.features]
[docs]
def set_feature_importances(self) -> None:
for feature in self.features:
feature.prepare_feature()
feature.set_feature_bin_deviations_from_neutral(self.neutral_factor_link)
self.feature_importances[feature] = feature.bin_weighted_average
[docs]
def get_feature_importances(self) -> Dict[str, float]:
"""
Returns the relative importance of each input feature.
"""
if not self.feature_importances:
raise ValueError("_fit_main has to be called first to compute the feature importance.")
else:
normalized_values = get_normalized_values(self.feature_importances.values())
norm_feature_importances = {
"_".join(feature.feature_id.feature_group): normalized_values[i]
for i, feature in enumerate(self.feature_importances.keys())
}
return norm_feature_importances
[docs]
def get_feature_contributions(self, X: Union[pd.DataFrame, np.ndarray]) -> Dict[str, np.ndarray]:
"""Returns the contributions of each feature for each individual
prediction on a given data set, offering individual explainability of
the model. That means the learned parameter, e.g., factor for
multiplicative modes, of the corresponding feature bin is accessed.
These contributions should be interpreted in respect to the neutral
element of the mode, i.e., 1 for multiplicative and 0 for additive
modes.
Parameters
----------
X: np.ndarray
data set to be predicted, needs to include the features of the
model
Returns
-------
dict
contributions from each feature for each sample of X
"""
contribution = {}
for feature in self.features:
ft = "" if feature.feature_type is None else "_{}".format(feature.feature_type)
fg = " ".join(feature.feature_group)
contribution[fg + ft] = self.unlink_func(self._pred_feature(X, feature, is_fit=False))
return contribution
[docs]
@abc.abstractmethod
def calc_parameters(self, feature: Feature, y: np.ndarray, pred: CBLinkPredictionsFactors, prefit_data):
"""Calculates factors and uncertainties of the bins of a feature group
in the original space (not the link space) and transforms them to the
link space afterwards
The factors and uncertainties cannot be determined in link space, not
least because target values like 0 diverge in link spaces like `log`
or `logit`.
The main loop will call the smoothers on the results returned by this
method. Smoothing is always done in link space.
This function must be implemented by the subclass according to its
statistical model.
Parameters
----------
feature: :class:`~.Feature`
class containing all features
y: np.ndarray
target, truth
pred
(in-sample) predictions from all other features (excluding the one
at hand)
prefit_data
data returned by :meth:`~.precalc_parameters` during fit
Returns
-------
tuple
This method must return a tuple of ``factors`` and
``uncertainties`` in the **link space**.
"""
raise NotImplementedError("implement in subclass")
[docs]
@abc.abstractmethod
def precalc_parameters(self, feature: Feature, y: np.ndarray, pred: CBLinkPredictionsFactors):
"""Calculations that are not dependent on intermediate predictions. If
these are not needed, return :obj:`None` in the subclass.
Results returned by this method will be served to
:meth:`factors_and_uncertainties` as the ``prefit_data`` argument.
Parameters
----------
feature: :class:`~.Feature`
class containing all features
y: np.ndarray
target, truth
pred
(in-sample) predictions from all other features (excluding the one
at hand)
"""
return None
[docs]
def required_columns(self) -> Set:
required_columns = set()
if self.weight_column is not None:
required_columns.add(self.weight_column)
if self.prior_prediction_column is not None:
required_columns.add(self.prior_prediction_column)
for feature_id in self.features:
for col in feature_id.feature_group:
required_columns.add(col)
return required_columns
[docs]
def calibrate_to_weighted_mean(self, feature: Feature) -> np.ndarray:
return feature.factors_link
def _coefficient_for_gaussian_matching_by_quantiles(perc: float) -> float:
"""The coefficient in the linear system of equations for the Gaussian
matching by quantiles
For details, see :func:`gaussian_matching_by_quantiles`.
"""
return np.sqrt(2) * scipy.special.erfinv(2 * perc - 1)
[docs]
def gaussian_matching_by_quantiles(
dist, link_func, perc1: Union[float, np.ndarray], perc2: Union[float, np.ndarray]
) -> Tuple[float, float]:
r"""Gaussian matching of the distribution transformed by the link function
by demanding the equality of two quantiles in link space
Since each quantile of a distribution is transformed just by the link
function itself (by contrast to the mean or variance of the distribution),
the Gaussian matching is quite easy and general::
link_func(dist.ppf(perc<i>)) = gaussian_quantile<i>
The connection between a Gaussian quantile and the distribution parameters
is:
.. math::
\text{gaussian\_quantile}_i = \mu + \sigma \sqrt{2} \text{erf}^{-1}(
2 \cdot \text{perc}_i - 1)
Applied to both ``perc<i>``, this leads to a system of two linear equations
that can be solved for ``mu`` and ``sigma``.
Parameters
----------
dist: scipy.stats distribution object
the assumed distribution in the original space, e.g. a
``scipy.stats.beta`` in the case of
:class:`~cyclic_boosting.CBClassifier`; it is assumed that this
distribution has been initialized with vectorized parameters with one
value for each cyclic boosting bin
link_func: callable
the transformation function to link space, e.g. the method
:meth:`~cyclic_boosting.link.LinkFunction.link_func` inherited in
the cyclic boosting class
perc1: float or ndarray of floats
percentage (a fixed value or an individual value for each of the bins)
of the first quantile to match
perc2: float or ndarray of floats
percentage (a fixed value or an individual value for each of the bins)
of the second quantile to match; it must be different from perc1.
Returns
-------
tuple
mu, sigma of the matched Gaussian, each being a ndarray with one value
for each bin
Example
-------
As example, consider the case of a beta distribution with link function
logit which is used in :mod:`cyclic_boosting.classification`. It compares
the old and new Gaussian matching for different alphas and betas.
Derivation of the transformation of the beta distribution to logit space:
.. math::
y = \text{logit}(x) = \log(x / (1 - x))
\Leftrightarrow x = \frac{1}{1 + e^{-x}}
\Rightarrow \frac{\mathrm{d} x}{\mathrm{d} y} =
\frac{e^{-y}}{\left(1 + e^{-y}\right)^2}
p_{\text{beta}'(\alpha, \beta)}(y)\,\mathrm{d} y
= p_{\text{beta}(\alpha, \beta)}(x)\,\mathrm{d} x
= p_{\text{beta}(\alpha, \beta)}(x(y))
\,\frac{\mathrm{d} x}{\mathrm{d} y}\,\mathrm{d} y
= p_{\text{beta}(\alpha, \beta)}\left(\left(1 + e^{-y}\right)^2\right)
\frac{e^{-y}}{\left(1 + e^{-y}\right)^2}\,\mathrm{d} y
"""
quant1_link: float = link_func(dist.ppf(perc1))
quant2_link: float = link_func(dist.ppf(perc2))
a1 = _coefficient_for_gaussian_matching_by_quantiles(perc1)
a2 = _coefficient_for_gaussian_matching_by_quantiles(perc2)
sigma = (quant1_link - quant2_link) / (a1 - a2)
mu = quant1_link - sigma * a1
return mu, sigma
[docs]
def calc_factors_generic(
lex_binnumbers: np.ndarray,
w_x: np.ndarray,
w: np.ndarray,
w_x2: np.ndarray,
external_weights: np.ndarray,
minlength: int,
x0: float,
w0: float,
) -> Tuple[float, float]:
r"""Generic calculation of factors and uncertainties
It is always possible to reparametrise :math:`\chi^2`
of the different cyclic boosting models to the following standard form
.. math::
\chi^2 =\sum_i w_i \cdot (x_i - \hat{x})^2
where the sum goes over the samples in one specific feature bin and
:math:`\hat{x}` is the parameter to be estimated.
**Multiplicative Ansatz:**
Here we have
.. math::
\chi^2=\sum_i v_i \cdot (y_i - \hat{x} \cdot p_i)^2 / \sigma^2_{y_i}
=\sum_i v_i p_i^2 \cdot (y_i/p_i - \hat{x})^2 / \sigma^2_{y_i}
where :math:`v_i` is the external weight, :math:`y_i` is the target,
:math:`p_i` is the prediction without the contribution of this feature
and :math:`\sigma^2_{y_i}` is the variance of the target.
By setting
.. math::
x_i = y_i / p_i
.. math::
w_i = v_i \cdot p_i^2 / \sigma^2_{y_i}
we get the standard form.
**Sum Ansatz:**
Here we have
.. math::
\chi^2=\sum_i v_i \cdot (y_i - (\hat{x} - p_i))^2 / \sigma^2_{y_i}
=\sum_i v_i \cdot ((y_i - p_i) - \hat{x})^2 / \sigma^2_{y_i}
By setting
.. math::
x_i = y_i - p_i
.. math::
w_i = v_i / \sigma^2_{y_i}
we get the standard form.
**Slope Ansatz:**
Here we have
.. math::
\chi^2=\sum_i v_i \cdot (y_i -
(\hat{x} \cdot u_i + p_i))^2 / \sigma^2_{y_i}
=\sum_i v_i u_i^2 \cdot ((y_i - p_i)/u_i - \hat{x})^2 / \sigma^2_{y_i}
where :math:`u_i` is the external column, e.g. a price ratio for dynamic
pricing models. By setting
.. math::
x_i = (y_i- p_i) / u_i
.. math::
w_i = v_i \cdot u_i^2 / \sigma^2_{y_i}
we get the standard form.
All models above fit in the **general linear ansatz:**
.. math::
\chi^2=\sum_i v_i \cdot (y_i -
(\hat{x} \cdot m_i + b_i))^2 / \sigma^2_{y_i}
=\sum_i v_i m_i^2 \cdot ((y_i - b_i)/m_i - \hat{x})^2 / \sigma^2_{y_i}
By setting
.. math::
x_i = (y_i - b_i) / m_i
.. math::
w_i = v_i \cdot m_i^2 / \sigma^2_{y_i}
= \frac{v_i}{\sigma^2_{y_i} \cdot (\frac{d x_i}{d y_i})^2}
we get the standard form.
**Solution for the general linear ansatz:**
If we solve for :math:`\hat{x}`, we get the formula for the weighted
mean:
.. math::
\hat{x} = \frac{\sum_i w_i \cdot x_i} {\sum_i w_i}
The variance of :math:`\hat{x}` is
.. math::
\sigma^2_{\hat{x}} =
\sum_i \left(\frac{d \hat{x}}{d x_i}\right)^2 \cdot \sigma^2_{x_i}
= \sum_i \left(\frac{w_i}{\sum_i w_i}\right)^2 \cdot \sigma^2_{x_i}
with
.. math::
w_i \cdot \sigma^2_{x_i} = \frac{v_i}{\sigma^2_{y_i}
\cdot (\frac{d x_i}{d y_i})^2} \cdot \sigma^2_{y_i}
\cdot \left(\frac{d x_i}{d y_i}\right)^2 = v_i
we can write the variance as
.. math::
\sigma_{\hat{x}} = \frac{\sum_i w_i \cdot v_i} {(\sum_i w_i)^2}
**Solution for the general linear ansatz with a prior on:** :math:`\hat{x}`
To handle bins with low statistics or empty bins we introduce
a gaussian prior with precision :math:`w_0` and mean :math:`x_0`
Our :math:`\chi^2` now has the following form:
.. math::
\chi^2 =\sum_i w_i \cdot (x_i - \hat{x})^2 + w_0 \cdot (\hat{x} - x_0)^2
Solving for :math:`\hat{x}` gives
.. math::
\hat{x} = \frac{x_0 \cdot w_0 + \sum_i w_i \cdot x_i} {w_0 + \sum_i w_i}
The variance of :math:`\hat{x}` is
.. math::
\sigma^2_{\hat{x}} =
\left(\frac{d \hat{x}}{d x_0}\right)^2 \cdot \sigma^2_{x_0} +
\sum_i \left(\frac{d \hat{x}}{d x_i}\right)^2 \cdot \sigma^2_{x_i}
= \frac{w_0 + \sum_i w_i \cdot v_i} {(w_0 + \sum_i w_i)^2}
Note
----
The quadratic sum is split in summands because parts of it may not be
finite (e.g. in slope estimation).
Parameters
----------
lex_binnumbers: :class:`numpy.ndarray`
1-dimensional numpy array containing the bin numbers.
w_x: :class:`numpy.ndarray`
:math:`w_i \cdot x_i` for each array element i
w: :class:`numpy.ndarray`
:math:`w_i` for each array element i
w_x2: :class:`numpy.ndarray`
:math:`w_i \cdot x_i^2` for each array element i
external_weights: :class:`numpy.ndarray`
:math:`v_i` for each array element i
minlength: int
number of bins for this feature including the `nan` bin
x0: float
mean of the prior on the mean
w0: float
prior precision of the prior on the mean
Returns
-------
tuple
A tuple of ``factors`` and ``uncertainties``
in the original space.
"""
sum_w_x, sum_w, sum_vw, sum_w_x2 = (
np.bincount(lex_binnumbers, weights=w, minlength=minlength) for w in [w_x, w, external_weights * w, w_x2]
)
sum_w_x += w0 * x0
sum_w += w0
sum_w_x2 += w0 * x0**2
sum_vw += w0
weighted_mean = sum_w_x / sum_w
variance_weighted_mean = sum_vw / sum_w**2
return weighted_mean, np.sqrt(variance_weighted_mean)
__all__ = [
"_factors_deviation",
"CyclicBoostingBase",
"gaussian_matching_by_quantiles",
"calc_factors_generic",
]