import copy
import logging
import warnings
import decorator
import numba as nb
import numpy as np
import pandas as pd
import six
from typing import Iterable, List, Optional
from dataclasses import dataclass
_logger = logging.getLogger(__name__)
LOWER_1_SIGMA_QUANTILE = (1.0 - 0.6827) / 2
LOWER_2_SIGMA_QUANTILE = (1.0 - 0.9545) / 2
[docs]
class ConvergenceError(RuntimeError):
"""Exception type, that should be thrown if an estimator cannot converge
and therefore its result cannot be trusted."""
[docs]
def not_seen_events(x, wx, n_bins):
"""
Return a boolean array that slices x so that only values with
a non-finite weightsum `wx` are used.
Parameters
----------
x: :class:`pandas.DataFrame` or :class:`numpy.ndarray`
Array can be of any dimension. The array has to consist
out of contigous integers.
wx: :class:`numpy.ndarray` weightsum of all bins occuring in x
n_bins: :class:`numpy.ndarray` of shape ``(M,)``
number of bins in each dimension
Example
-------
>>> x = np.c_[np.array([0, 1, 2, 3, 4, np.nan, -1])]
>>> wx = np.array([3, 2, 0, 1, 0])
>>> nbins = np.array([4, 1])
>>> from cyclic_boosting.utils import not_seen_events
>>> not_seen_events(x, wx, nbins)
array([False, False, True, False, True, True, True], dtype=bool)
>>> x = pd.DataFrame({"a": [0, 1, 0, 1, np.nan, 0, -1],
... "b": [0, 0, 1, 1, 1, np.nan, -1]})
>>> wx = np.array([1, 0, 1, 0, 1])
>>> nbins = np.array([2, 2])
>>> not_seen_events(x, wx, nbins)
array([False, False, True, True, False, False, False], dtype=bool)
"""
max_lex_bins = int(np.product(n_bins))
not_seen = wx == 0.0
if len(not_seen) == max_lex_bins:
# Add nan-bin if not present yet.
not_seen = np.r_[not_seen, True]
finite_x = slice_finite_semi_positive(x)
if x.shape[1] > 1:
x, n_bins = multidim_binnos_to_lexicographic_binnos(x, n_bins)
else:
x = np.asarray(x).reshape(-1)
x = np.round(x)
x = np.asarray(x, dtype=int)
x = np.where(~finite_x | (x > max_lex_bins), max_lex_bins, x)
res = not_seen[x].reshape(-1)
return res
[docs]
def get_X_column(X, column, array_for_1_dim=True):
"""
Picks columns from :class:`pandas.DataFrame` or :class:`numpy.ndarray`.
Parameters
----------
X: :class:`pandas.DataFrame` or :class:`numpy.ndarray`
Data Source from which columns are picked.
column:
The format depends on the type of X. For :class:`pandas.DataFrame` you
can give a string or a list/tuple of strings naming the columns. For
:class:`numpy.ndarray` an integer or a list/tuple of integers indexing
the columns.
array_for_1_dim: bool
In default mode (set to True) the return type for a one dimensional
access is a np.ndarray with shape (n, ). If set to False it is a
np.ndarray with shape (1, n).
"""
if isinstance(column, tuple):
column = list(column)
if not array_for_1_dim:
if not isinstance(column, list):
column = [column]
else:
if isinstance(column, list) and len(column) == 1:
column = column[0]
if isinstance(X, pd.DataFrame):
return X[column].values
else:
return X[:, column]
[docs]
def slice_finite_semi_positive(x):
"""
Return slice of all finite and semi positive definite values of x
Parameters
----------
x: :class:`pandas.DataFrame` or :class:`numpy.ndarray`
Array can be of any dimension.
Example
-------
>>> x = np.array([1, np.nan, 3, -2])
>>> from cyclic_boosting.utils import slice_finite_semi_positive
>>> slice_finite_semi_positive(x)
array([ True, False, True, False], dtype=bool)
>>> X = pd.DataFrame({'a': [1, np.nan, 3], 'b': [-1, 2, 3]})
>>> slice_finite_semi_positive(X)
array([False, False, True], dtype=bool)
"""
if isinstance(x, pd.DataFrame):
x = x.values
if len(x.shape) > 1:
finite_semi_positive = np.isfinite(x).all(axis=1)
m_pos = (x[finite_semi_positive] >= 0).all(axis=1)
finite_semi_positive[finite_semi_positive] &= m_pos
else:
finite_semi_positive = np.isfinite(x) & (x >= 0)
return np.asarray(finite_semi_positive)
[docs]
def nans(shape):
"""Create a new numpy array filled with NaNs.
:param shape: shape of the :class:`numpy.ndarray`
:type shape: :obj:`int` or :obj:`tuple`
:rtype: :class:`numpy.ndarray` (numpy.nan, dim=1)
>>> from cyclic_boosting.utils import nans
>>> nans(3)
array([ nan, nan, nan])
>>> nans((2, 2))
array([[ nan, nan],
[ nan, nan]])
"""
result = np.empty(shape)
result.fill(np.nan)
return result
[docs]
def multidim_binnos_to_lexicographic_binnos(binnos, n_bins=None, binsteps=None):
"""Map index-tuples of ``M``-dimensional features to integers.
In the cyclic boosting algorithm there is a one-dimensional array of
factors for each one-dimensional feature. For processing of
multi-dimensional variables (i.e. combinations of several one-dimensional
feature variables) we have bins for all bin combinations. For example, if
you have two one-dimensional features ``p`` and ``q`` with ``n_p`` and
``n_q`` bins, the two-dimensional feature ``(p, q)`` will have a
two-dimensional factor array of shape ``(n_p, n_q)``.
The internal representation of this array, however, is that of a
one-dimensional array. Thus, a function is needed that maps index-tuples
``(p, q,...)`` to integer indices. This is the purpose of this function.
This function performs this mapping for ``N`` rows of index ``M``-tuples.
If there are any missing values in a row, the returned ordinal number is
set to ``np.product(n_bins)``.
Parameters
----------
binnos: :class:`numpy.ndarray` of shape ``(N, M)``
multidimensional bin numbers
n_bins: :class:`numpy.ndarray` of shape ``(M,)`` or `None`
number of bins in each dimension; if `None`, it will be determined from
``binnos``.
binsteps: :class:`numpy.ndarray` of type `int64` and shape ``(M,)``
bin steps as returned by :func:`bin_steps` when called on ``n_bins``;
if `None`, it will be determined from ``binnos``.
Returns
-------
tuple
ordinal numbers of the bins in lexicographic order as
:class:`numpy.ndarray` of type `int64` and maximal bin numbers as
:class:`numpy.ndarray` of shape ``(M,)``
Examples
--------
>>> binnos = np.c_[[1, 1, 0, 0, np.nan, 1, np.nan],
... [0, 1, 2, 1, 1, 2, np.nan]]
>>> n_bins = np.array([2, 3])
>>> binsteps = bin_steps(n_bins)
>>> from cyclic_boosting.utils import multidim_binnos_to_lexicographic_binnos
>>> lex_binnos, n_finite = multidim_binnos_to_lexicographic_binnos(
... binnos, n_bins=n_bins, binsteps=binsteps)
>>> lex_binnos
array([3, 4, 2, 1, 6, 5, 6])
>>> n_finite
array([2, 3])
>>> lex_binnos, n_finite = multidim_binnos_to_lexicographic_binnos(binnos)
>>> lex_binnos
array([3, 4, 2, 1, 6, 5, 6])
>>> n_finite
array([2, 3])
"""
if isinstance(binnos, pd.DataFrame):
binnos = binnos.values
if n_bins is None:
finite_bins = binnos[slice_finite_semi_positive(binnos)]
if len(finite_bins) == 0:
n_bins = np.zeros(binnos.shape[1], dtype=np.int64)
else:
n_bins = np.max(finite_bins, axis=0).astype(np.int64) + 1
if binsteps is None:
binsteps = bin_steps(n_bins)
is_valid = np.isfinite(binnos).all(axis=1)
is_valid[is_valid] &= ((binnos[is_valid] >= 0) & (binnos[is_valid] < n_bins[None, :])).all(axis=1)
if not np.any(is_valid):
result = np.repeat(0, len(binnos))
else:
result = np.repeat(np.product(n_bins), len(binnos))
result[is_valid] = np.dot(np.floor(binnos[is_valid]), binsteps[1:])
if n_bins.prod() >= np.iinfo(np.int32).max:
_logger.warning(
"Enumerating multidimensional bins: A multidimensional feature of "
"shape {n_bins} requires {total_bins} unique bins in total. "
"Please consider whether this is what you want.".format(n_bins=n_bins, total_bins=n_bins.prod())
)
result = result.astype(np.int64)
return result, n_bins
[docs]
@nb.njit()
def bin_steps(n_bins: nb.int64[:]):
"""
Multidimensional bin steps for lexicographical order
:param n_bins: number of bins for each dimension
:type n_bins: ``numpy.ndarray`` (element-type ``float64``, shape ``(M,)``)
:return: in slot ``i + 1`` for dimension ``i``, the number of steps needed
for iterating through the following dimensions in lexicographical order
until the bin number can be increased for dimension ``i``.
See the doctests of :func:`arange_multi`: It's the number of rows between
changes in column ``i``.
In slot 0 this is the number steps needed to iterate through all
dimensions.
:rtype: ``numpy.ndarray`` (element-type ``int64``, shape ``(M + 1,)``)
>>> from cyclic_boosting.utils import bin_steps
>>> bin_steps(np.array([3, 2]))
array([6, 2, 1])
>>> bin_steps(np.array([3, 2, 4, 1, 2]))
array([48, 16, 8, 2, 2, 1])
"""
total_bin_steps = 1
M = n_bins.shape[0]
bin_steps = np.empty(M + 1, dtype=np.int64)
for m in range(M, -1, -1):
bin_steps[m] = total_bin_steps
total_bin_steps = n_bins[m - 1] * total_bin_steps
return bin_steps
[docs]
@nb.njit()
def arange_multi(stops) -> np.ndarray:
"""
Multidimensional generalization of :func:`numpy.arange`
:param stops: upper limits (exclusive) for the corresponding dimensions in
the result
:type stops: sequence of numbers
:return: matrix of combinations of range values in lexicographic order
:rtype: ``numpy.ndarray`` (element-type ``int64``, shape
``(n_bins, len(stops))``)
>>> from cyclic_boosting.utils import arange_multi
>>> arange_multi([2, 3])
array([[0, 0],
[0, 1],
[0, 2],
[1, 0],
[1, 1],
[1, 2]])
In this example, the last dimension has only one bin:
>>> arange_multi([3, 4, 1])
array([[0, 0, 0],
[0, 1, 0],
[0, 2, 0],
[0, 3, 0],
[1, 0, 0],
[1, 1, 0],
[1, 2, 0],
[1, 3, 0],
[2, 0, 0],
[2, 1, 0],
[2, 2, 0],
[2, 3, 0]])
>>> arange_multi([2])
array([[0],
[1]])
"""
stops1 = np.asarray(stops)
M = stops1.shape[0]
bin_steps1 = bin_steps(stops1)
total_bin_steps = bin_steps1[0]
result = np.zeros((total_bin_steps, M), dtype=np.int64)
for m in range(M):
bin_steps_m = bin_steps1[m + 1]
binno = 0
for i in range(0, total_bin_steps, bin_steps_m):
for j in range(bin_steps_m):
result[i + j, m] = binno
binno += 1
if binno >= stops1[m]:
binno = 0
return result
[docs]
def calc_linear_bins(x, nbins):
"""Calculate a linear binning for all values in x with nbins
:param x: Input array.
:type x: :obj:`list` or :class:`numpy.ndarray` (float64, dim=1)
:param nbins: number of bins desired
:type nbins: int
**Example**
>>> x = np.array([0.0, 0.1, 0.3, 0.4, 0.8, 0.9, 0.95, 1.1, 1.3, 1.5])
>>> nbins = 3
>>> from cyclic_boosting.utils import calc_linear_bins
>>> bin_boundaries, bin_centers = calc_linear_bins(x, nbins)
>>> bin_centers
array([ 0.25, 0.75, 1.25])
>>> bin_boundaries
array([ 0. , 0.5, 1. , 1.5])
"""
minx, maxx = np.min(x), np.max(x)
bin_boundaries = np.linspace(minx, maxx, nbins + 1)
bin_centers = 0.5 * (bin_boundaries[1:] + bin_boundaries[:-1])
return bin_boundaries, bin_centers
[docs]
def digitize(x, bins):
"""
This is an alternative version of :func:`numpy.digitize`. It puts x values
greater or equal to bins.max() on the last index of `bins`.
Values smaller than bins.min() are put on the first index of `bins`.
:param x: Input array to be binned. It has to be 1-dimensional.
:type x: :obj:`list` or :class:`numpy.ndarray` (float64, dim=1)
:param bins: Array of bins. It has to be 1-dimensional and monotonic.
:type bins: :obj:`list` or :class:`numpy.ndarray` (float64, dim=1)
:return: Output array of indices, of same shape as `x`.
:rtype: :obj:`list` or :class:`numpy.ndarray` (float64, dim=1)
**Raises**
ValueError
If the input is not 1-dimensional, or if `bins` is not monotonic.
TypeError
If the type of the input is complex.
**See Also**
:func:`numpy.digitize`
**Notes**
If values in `x` are such that they fall outside the bin range,
attempting to index `bins` with the indices that `digitize` returns
will result in an IndexError.
**Examples**
>>> x = np.array([-1000, -0.2, 0.2, 6.4, 3.0, 10, 11, 1000])
>>> bins = np.array([0.0, 1.0, 2.5, 4.0, 10.0])
>>> from cyclic_boosting.utils import digitize
>>> inds = digitize(x, bins)
>>> inds
array([0, 0, 1, 4, 3, 4, 4, 4])
"""
bin_numbers = np.digitize(x, bins)
is_max = x >= bins.max()
bin_numbers[is_max] -= 1
return bin_numbers
[docs]
def calc_weighted_quantile(binnumbers, y, weights, quantile):
df = pd.DataFrame({"y": y, "weights": weights, "binnumbers": binnumbers})
return df.groupby("binnumbers").apply(_weighted_quantile_of_dataframe(quantile))
def _calc_means_medians_evenly_weighted(binnumbers, y):
"""Calculate the means, medians, counts, and errors for y grouped over the
binnumbers.
Parameters
----------
binnumbers: :obj:`list` or :class:`numpy.ndarray` (float64, dim=1)
binnumbers
y: :obj:`list` or :class:`numpy.ndarray` (float64, dim=1)
target values
"""
groupby = pd.Series(y).groupby(binnumbers)
means = groupby.mean()
medians = groupby.median()
counts = groupby.size()
quantiles = [
groupby.quantile(LOWER_1_SIGMA_QUANTILE),
groupby.quantile(1 - LOWER_1_SIGMA_QUANTILE),
groupby.quantile(LOWER_2_SIGMA_QUANTILE),
groupby.quantile(1 - LOWER_2_SIGMA_QUANTILE),
]
errors = [series - medians for series in quantiles]
return means, medians, counts, errors
def _calc_means_medians_with_weights(binnumbers, y, weights):
"""Calculate the means, medians, counts, and errors for y grouped over the
binnumbers using weights.
Parameters
----------
binnumbers: :obj:`list` or :class:`numpy.ndarray` (float64, dim=1)
binnumbers
y: :obj:`list` or :class:`numpy.ndarray` (float64, dim=1)
target values
weights: :obj:`list` or :class:`numpy.ndarray` (float64, dim=1)
array of event weights
"""
df = pd.DataFrame({"y": y, "weights": weights, "binnumbers": binnumbers})
groupby = df.groupby("binnumbers")
means = groupby.apply(_weighted_mean_of_dataframe)
medians = groupby.apply(_weighted_median_of_dataframe)
counts = groupby.apply(_weighted_size_of_dataframe)
quantiles = [
groupby.apply(_weighted_quantile_of_dataframe(LOWER_1_SIGMA_QUANTILE)),
groupby.apply(_weighted_quantile_of_dataframe(1 - LOWER_1_SIGMA_QUANTILE)),
groupby.apply(_weighted_quantile_of_dataframe(LOWER_2_SIGMA_QUANTILE)),
groupby.apply(_weighted_quantile_of_dataframe(1 - LOWER_2_SIGMA_QUANTILE)),
]
errors = [series - medians for series in quantiles]
return means, medians, counts, errors
def _weighted_quantile_of_dataframe(quantile):
"""
Generates a function which calculates the weighted quantile of a given pandas.DataFrame
Parameters
----------
quantile: float between 0 and 1
"""
def quantile_calculation(x):
"""
Calculates the weighted quantile of a given pandas.DataFrame.
Expects two columns: y and weights
Parameters
----------
x: :class:`pandas.DataFrame` with (at least) two columns y and weights
"""
x = x.sort_values("y")
cumsum = x.weights.cumsum()
cutoff = x.weights.sum() * quantile
try:
return x.y[cumsum >= cutoff].iloc[0]
except IndexError:
return min(x.y)
return quantile_calculation
def _weighted_median_of_dataframe(x):
"""
Calculates the weighted size of a given pandas.DataFrame.
Expects two columns: y and weights
Parameters
----------
x: :class:`pandas.DataFrame` with (at least) two columns y and weights
"""
x = x.sort_values("y")
cumsum = x.weights.cumsum()
cutoff = x.weights.sum() / 2.0
try:
return x.y[cumsum >= cutoff].iloc[0]
except IndexError:
return min(x.y)
def _weighted_mean_of_dataframe(x):
"""
Calculates the weighted mean of a given pandas.DataFrame.
Expects two columns: y and weights
Parameters
----------
x: :class:`pandas.DataFrame` with (at least) two columns y and weights
"""
weighted_sum = (x.y * x.weights).sum()
sum_of_weights = x.weights.sum()
return weighted_sum / sum_of_weights
def _weighted_size_of_dataframe(x):
"""
Calculates the weighted size of a given pandas.DataFrame.
Expects one column: weights
Parameters
----------
x: :class:`pandas.DataFrame` with (at least) one column called weights
"""
return x.weights.sum()
[docs]
def weighted_stddev(values, weights):
r"""Calculates the weighted standard deviation :math:`\sigma`:
.. math::
\sigma = \frac{\sum_i w_i \cdot (x_i - /mu_{wx})^2}{\sum_i w_i}
Parameters
----------
values: :class:`numpy.ndarray` (float64, dim=1)
Values of the samples :math:`x_i`.
weights: :class:`numpy.ndarray` (float64, dim=1)
Weights of the samples :math:`w_i`.
"""
mean_w = np.sum(weights * values) / np.sum(weights)
numerator = np.sum(weights * (values - mean_w) ** 2)
denominator = np.sum(weights)
stddev_w = np.sqrt(numerator / denominator)
return stddev_w
[docs]
def clone(estimator, safe=True):
"""Constructs a new estimator with the same constructor parameters.
A better name for this function would be 'reconstruct'.
This is a reimplemenation of :func:`sklearn.base.clone` respecting wishes of
estimators and their subestimators to avoid reconstructing certain
attributes.
Clone performs a deep copy of the model in an estimator
without actually copying attached data. It yields a new estimator
with the same parameters that has not been fit on any data.
"""
estimator_type = type(estimator)
if estimator_type in (list, tuple, set, frozenset):
return estimator_type([clone(e, safe=safe) for e in estimator])
elif not hasattr(estimator, "get_params"):
if not safe:
return copy.deepcopy(estimator)
else:
raise TypeError(
"Cannot clone object '%s' (type %s): "
"it does not seem to be a scikit-learn estimator as "
"it does not implement a 'get_params' methods." % (repr(estimator), type(estimator))
)
klass = estimator.__class__
new_object_params = estimator.get_params(deep=False)
if hasattr(estimator, "no_deepcopy"):
no_deepcopy = estimator.no_deepcopy
for name, param in six.iteritems(new_object_params):
if name not in no_deepcopy:
new_object_params[name] = clone(param, safe=False)
else:
for name, param in six.iteritems(new_object_params):
new_object_params[name] = clone(param, safe=False)
new_object = klass(**new_object_params)
return new_object
[docs]
def regularize_to_prior_expectation(values, uncertainties, prior_expectation, threshold=2.5):
r"""Regularize values with uncertainties to a prior expectation.
:param values: measured values
:type values: :class:`numpy.ndarray` (float64, dim=1)
:param uncertainties: uncertainties for the values.
:type uncertainties: :class:`numpy.ndarray` (float64, dim=1)
:param prior_expectation: The prior expectations dominate
the regularized value if the uncertainties are large.
:type prior_expectation: :class:`numpy.ndarray` (float64, dim=1) or float
:param threshold: Threshold in terms of sigma. If the significance of a
value:
.. math::
\text{sig}(x_i) = \frac{x_i - \text{prior\_expectation}_i}
{\text{uncertainty}_i}
is below the threshold, the prior expectation replaces the value.
:type threshold: float
**Doctests**
>>> values = np.array([0, 1, 2, 3])
>>> uncertainties = np.ones_like(values)*0.1
>>> prior_expectation = 1.
>>> from cyclic_boosting.utils import regularize_to_prior_expectation
>>> regularized_values = regularize_to_prior_expectation(
... values, uncertainties, prior_expectation)
>>> regularized_values
array([ 0.03175416, 1. , 1.96824584, 2.98431348])
>>> np.allclose(1 - np.sqrt(((values[0] - 1) / 0.1)**2 - 2.5**2) * 0.1,
... regularized_values[0])
True
>>> np.allclose(1 + np.sqrt(((values[-1] - 1) / 0.1)**2 - 2.5**2) * 0.1,
... regularized_values[-1])
True
"""
significance = (values - prior_expectation) / uncertainties
return prior_expectation + uncertainties * np.where(
np.abs(significance) > threshold,
np.sign(significance) * np.sqrt(np.abs(significance**2.0 - threshold**2.0)),
0,
)
[docs]
def regularize_to_error_weighted_mean(values, uncertainties, prior_prediction=None):
r"""Regularize values with uncertainties to its error-weighted mean.
:param values: measured values
:type values: :class:`numpy.ndarray` (float64, dim=1)
:param uncertainties: uncertainties for the values.
:type uncertainties: :class:`numpy.ndarray` (float64, dim=1)
:param prior_prediction: If the `prior_prediction` is specified, all values
are regularized with it and not with the error weighted mean.
:type prior_prediction: float
:returns: regularized values
:rtype: :class:`numpy.ndarray` (float64, dim=1)
The error weighted mean is defined as:
.. math::
\bar{x}_{\sigma} = \frac{\sum\limits_{i} \frac{1}
{\sigma^2_i} \cdot x_i}{\sum\limits_{i} \frac{1}{\sigma^2_i}}
with the uncertainties :math:`\sigma_i` and values :math:`x_i`.
The uncertainty :math:`\sigma_{\bar{x}}` for :math:`\bar{x}`
is calculated as:
.. math::
\sigma^2_{\bar{x}} = \frac{\sum\limits_{i} \frac{1}{\sigma^2_i}
(x - \bar{x}_{\sigma})^2}{\sum\limits_{i} \frac{1}{\sigma^2_i}}
The regularized values are calculated as follows:
.. math::
x_i^{'} = \frac{\frac{1}{\sigma^2_i} \cdot x_i +
\frac{1}{\sigma^2_{\bar{x}}} \cdot \bar{x}}
{\frac{1}{\sigma^2_i} + \frac{1}{\sigma^2_{\bar{x}}}}
>>> values = np.array([100., 100., 100., 90., 110., 180., 180., 20., 20.])
>>> uncertainties = np.array([10., 10., 10., 10., 10., 10., 15., 10., 15.])
>>> from cyclic_boosting.utils import regularize_to_error_weighted_mean
>>> regularize_to_error_weighted_mean(values, uncertainties)
array([ 100. , 100. , 100. , 90.40501997,
109.59498003, 176.75984027, 173.06094747, 23.24015973,
26.93905253])
>>> values = np.array([100.])
>>> uncertainties = np.array([10.])
>>> np.allclose(regularize_to_error_weighted_mean(values, uncertainties),
... values)
True
>>> values = np.array([100., 101.])
>>> uncertainties = np.array([10., 10.])
>>> regularize_to_error_weighted_mean(
... values, uncertainties, prior_prediction=50.)
array([ 98.11356348, 99.07583475])
>>> values = np.array([100., 10])
>>> uncertainties = np.array([10.])
>>> regularize_to_error_weighted_mean(values, uncertainties)
Traceback (most recent call last):
ValueError: <values> and <uncertainties> must have the same shape
"""
if values.shape != uncertainties.shape:
raise ValueError("values and uncertainties must have the same shape")
if len(values) < 1 or (prior_prediction is None and len(values) == 1):
return values
x = values
wx = 1.0 / np.square(uncertainties)
sum_wx = np.sum(wx)
if prior_prediction is None:
if np.allclose(x, x[0]):
# if all values are the same,
# regularizing to the mean makes no sense
return x
x_mean = np.sum(wx * x) / sum_wx
else:
if np.allclose(x, prior_prediction):
return x
x_mean = prior_prediction
wx_incl = 1.0 / (np.sum(wx * np.square(x - x_mean)) / sum_wx)
res = (wx * x + wx_incl * x_mean) / (wx + wx_incl)
return res
[docs]
def regularize_to_error_weighted_mean_neighbors(
values: np.ndarray, uncertainties: np.ndarray, window_size: Optional[int] = 3
) -> np.ndarray:
"""
Regularize values with uncertainties to its error-weighted mean, using a
sliding window.
Parameters
----------
values : np.ndarray
data (`float` type) to be regularized
uncertainties : np.ndarray
uncertainties (`float` type) of values
window_size : int
size of the sliding window to be used (e.g., 3 means include direct
left and right neighbors)
Returns
-------
np.ndarray
regularized values
"""
if values.shape != uncertainties.shape:
raise ValueError("values and uncertainties must have the same shape")
if len(values) < 3:
return regularize_to_error_weighted_mean(values, uncertainties)
window_arr = np.ones(window_size)
x = values
wx = np.where(uncertainties > 0, 1.0 / np.square(uncertainties), 0.0)
sum_wx = np.convolve(wx, window_arr, "same")
x_mean = np.convolve(wx * x, window_arr, "same") / sum_wx
wx_incl = np.ones(len(x))
for i in range(len(x)):
wx_incl[i] = (sum_wx / np.convolve(wx * np.square(x - x_mean[i]), window_arr, "same"))[i]
res = (wx * x + wx_incl * x_mean) / (wx + wx_incl)
return res
[docs]
def neutralize_one_dim_influence(values, uncertainties):
"""
Neutralize influence of one dimensional features in a two-dimensional factor matrix
:param values: measured values
:type values: :class:`numpy.ndarray` (float64, dim=2)
:param uncertainties: uncertainties for the values.
:type uncertainties: :class:`numpy.ndarray` (float64, dim=2)
returns an updated 2D-table that has no net shift on any of the 2 projections
"""
deviation = 100.0
iteration = 0
T0 = np.einsum("ij->i", uncertainties)
T1 = np.einsum("ij->j", uncertainties)
S0 = np.einsum("ij->i", values * uncertainties)
while deviation > 0.01 and iteration < 4:
iteration += 1
# row corrections
values = values - np.outer(S0 / T0, np.ones(values.shape[1]))
# recalc column sum after row correction
S1 = np.sum(values * uncertainties, axis=0)
# column corrections
values = values - np.outer(np.ones(values.shape[0]), S1 / T1)
# recalc column and row sums after column correction
S0 = np.sum(values * uncertainties, axis=1)
S1 = np.sum(values * uncertainties, axis=0)
# check total absolute row and column sum for determining convergence
deviation = abs(S0).sum() + abs(S1).sum()
return values
[docs]
def get_bin_bounds(binners, feat_group):
"""
Gets the bin boundaries for each feature group.
Parameters
----------
binners: list
List of binners.
feat_group: str or tuple of str
A feature property for which the bin boundaries should be extracted
from the binners.
"""
if binners is None:
return None
bin_bounds = {}
for binner in binners:
bin_bounds.update(binner.get_feature_bin_boundaries())
if feat_group in bin_bounds and bin_bounds[feat_group] is not None:
return bin_bounds[feat_group][:, 0]
else:
return None
[docs]
def generator_to_decorator(gen):
"""Turn a generator into a decorator.
The mechanism is similar to :func:`contextlib.contextmanager` which turns
a generator into a contextmanager. :mod:`decorator` is used internally.
Thanks to :mod:`decorator`, this function preserves the docstring and the
signature of the function to be decorated.
The docstring of the resulting decorator will include the original
docstring of the generator and an additional remark stating
that this is the corresponding decorator.
:param gen: wrapping the function call.
:type gen: generator function
:return: decorator
For a detailed example, see
:func:`generator_to_decorator_and_contextmanager`.
"""
@decorator.decorator
def created_decorator(func, *args, **kwargs):
gen_instance = gen()
try:
next(gen_instance)
return func(*args, **kwargs)
finally:
gen_instance.close()
doc = gen.__doc__ or ""
created_decorator.__doc__ = doc + "\n This is the corresponding decorator."
return created_decorator
[docs]
def linear_regression(x, y, w):
r"""Performs a linear regression allowing uncertainties in y.
:math:`f(x) = y = \alpha + \beta \cdot x`
:param x: x vector
:type x: :class:`numpy.ndarray`
:param y: y vector
:type y: :class:`numpy.ndarray`
:param w: weight vector
:type w: :class:`numpy.ndarray`
:returns: The coefficients `alpha` and `beta`.
:rtype: :obj:`tuple` of 2 :obj:`float`
"""
s_w = np.sum(w)
s_wx = np.dot(w, x)
s_wxx = np.dot(np.square(x), w)
s_wy = np.dot(w, y)
s_wxy = np.sum(w * x * y)
det = s_w * s_wxx - s_wx * s_wx
alpha = (s_wxx * s_wy - s_wx * s_wxy) / det
beta = (s_w * s_wxy - s_wx * s_wy) / det
return alpha, beta
[docs]
def get_feature_column_names(X, exclude_columns=[]):
features = list(X.columns)
for col in exclude_columns:
if col in features:
features.remove(col)
return features
[docs]
def continuous_quantile_from_discrete_pdf(y, quantile, weights):
"""
Calculates a continous quantile value approximation for a given quantile
from an array of potentially discrete values.
Parameters
----------
y : np.ndarray
containing data with `float` type (potentially discrete)
quantile : float
desired quantile
Returns
-------
float
calculated quantile value
"""
y = np.asarray(y)
weights = np.asarray(weights)
sorting = y.argsort()
sorted_y = y[sorting]
cumsum = weights[sorting].cumsum()
quantile_index = weights.sum() * quantile
quantile_y = sorted_y[cumsum >= quantile_index][0]
all_quantile_y = np.where(sorted_y == quantile_y)[0]
index_low = all_quantile_y[0]
index_high = all_quantile_y[-1]
if index_high > index_low:
quantile_y += (int(quantile_index) - index_low) / (index_high - index_low)
return quantile_y
[docs]
@dataclass
class ConvergenceParameters:
"""Class for registering the convergence parameters"""
loss_change: float = 1e20
delta: float = 100.0
[docs]
def set_loss_change(self, updated_loss_change: float) -> None:
self.loss_change = updated_loss_change
[docs]
def set_delta(self, updated_delta: float) -> None:
self.delta = updated_delta
[docs]
def get_normalized_values(values: Iterable) -> List[float]:
values_total = sum(values)
if round(values_total, 6) != 0.0:
return [value / values_total for value in values]
return [value for value in values]
[docs]
def smear_discrete_cdftruth(cdf_func: callable, y: int) -> float:
"""
Smearing of the CDF value of a sample from a discrete random variable. Main
usage is for a histogram of CDF values to check an estimated individual
probability distribution (should be flat).
Parameters
----------
y : int
value from discrete random variable
cdf_func : callable
cumulative distribution function
Returns
-------
float
smeared CDF value for y
"""
cdf_high = cdf_func(y)
if y >= 1:
cdf_low = cdf_func(y - 1)
else:
cdf_low = 0.0
cdf_truth = cdf_low + np.random.uniform(0, 1) * (cdf_high - cdf_low)
return cdf_truth
[docs]
def smear_discrete_cdftruth_qpd(qpds: np.ndarray, y: np.ndarray) -> np.ndarray:
"""
Smearing of the CDF values of samples from discrete random variables. Main
usage is for a histogram of CDF values to check an estimated individual
probability distribution (should be flat).
Parameters
----------
y : np.ndarray
values from discrete random variables
qpds : np.ndarray
array of QPDs
Returns
-------
np.ndarray
smeared CDF values for y
"""
cdf_high = qpds.cdf(y, inner=True)
cdf_low = np.where(y >= 1, qpds.cdf(y - 1, inner=True), 0.0)
cdf_truth = cdf_low + np.random.uniform(0, 1, len(y)) * (cdf_high - cdf_low)
return cdf_truth