import warnings
from .backend import get_backend
[docs]def l2_neg_loss(y_true, y_pred):
"""L2 negative loss, computed for multiple predictions.
Parameters
----------
y_true : array or Tensor of shape (n_samples, n_targets)
Ground truth.
y_pred : array or Tensor of shape (n_predictions, n_samples, n_targets) or\
(n_samples, n_targets)
Predictions.
Returns
-------
l2 : array of shape (n_predictions, n_targets) or (n_targets, )
L2 negative losses.
"""
backend = get_backend()
y_true, y_pred = backend.check_arrays(y_true, y_pred)
y_pred = _check_finite(y_pred)
# broadcasting if y_pred has more dimensions
axis = 1 if len(y_pred.shape) > len(y_true.shape) else 0
n_samples = y_true.shape[0]
error = ((y_true - y_pred) ** 2).sum(axis)
l2 = -error / n_samples
return l2
[docs]def r2_score(y_true, y_pred):
"""R2 score, computed for multiple predictions (e.g. multiple alphas).
Parameters
----------
y_true : array or Tensor of shape (n_samples, n_targets)
Ground truth.
y_pred : array or Tensor of shape (n_predictions, n_samples, n_targets) or\
(n_samples, n_targets)
Predictions.
Returns
-------
r2 : array of shape (n_predictions, n_targets) or (n_targets, )
R2 scores.
"""
backend = get_backend()
y_true, y_pred = backend.check_arrays(y_true, y_pred)
y_pred = _check_finite(y_pred)
# broadcasting if y_pred has more dimensions
axis = 1 if len(y_pred.shape) > len(y_true.shape) else 0
error = ((y_true - y_pred) ** 2).sum(axis)
var = ((y_true - y_true.mean(0)) ** 2).sum(0)
r2 = 1. - error / var
if axis == 0:
r2[var == 0] = 0
else:
r2[:, var == 0] = 0
return r2
[docs]def correlation_score(y_true, y_pred):
"""Correlation score, computed for multiple predictions.
Parameters
----------
y_true : array or Tensor of shape (n_samples, n_targets)
Ground truth.
y_pred : array or Tensor of shape (n_predictions, n_samples, n_targets) or\
(n_samples, n_targets)
Predictions.
Returns
-------
correlations : array of shape (n_predictions, n_targets) or (n_targets, )
Correlation scores.
"""
backend = get_backend()
y_true, y_pred = backend.check_arrays(y_true, y_pred)
y_pred = _check_finite(y_pred)
# broadcasting if y_pred has more dimensions
axis = 1 if len(y_pred.shape) > len(y_true.shape) else 0
product = _zscore(y_true, axis=0) * _zscore(y_pred, axis=axis)
correlations = product.mean(axis)
return correlations
[docs]def r2_score_split(y_true, y_pred, include_correlation=True):
"""Split the R2 score into individual components using the product measure.
When estimating a linear joint model, the predictions of each feature space
are summed::
Yhat_joint = Yhat_A + Yhat_B + ... + Yhat_Z
The joint model R2 can be computed as::
R2_joint = R2(Yhat_joint, Y)
This function estimates the contribution of each feature space to the joint
model R2 such that::
R2_joint = R2_A + R2_B + ... + R2_Z
Mathematically, this is achieved by taking into account the correlations
between predictions (i.e. Yhat_A*Yhat_B,..., Yhat_A*Yhat_Z). The function
can also returns an estimate that ignores these correlations.
This function differs from r2_score_split_svd in the method used to
decompose the variance. The function r2_score_split is based on the product
measure method, while the function r2_score_split_svd is based on the
relative weights method.
This function assumes that y_true is zero-mean over samples.
Parameters
----------
y_true : array of shape (n_samples, n_targets)
Observed data. Has to be zero-mean over samples.
y_pred : array of shape (n_kernels, n_samples, n_targets) or \
(n_samples, n_targets)
Predictions.
include_correlation : bool
Whether to include correlation between feature spaces.
If True, individual feature space R2 sum is equivalent to the joint
model R2 (i.e. from `y_pred.sum(0)`).
Returns
-------
r2 : array (n_kernels, n_targets) or (n_targets, )
Individual feature space R2 scores.
"""
backend = get_backend()
y_true, y_pred = backend.check_arrays(y_true, y_pred)
y_pred = _check_finite(y_pred)
if backend.any(backend.abs(y_true.mean(0)) > 1e-6):
warnings.warn(
'y_true has to be zero-mean over samples to compute '
'the split r2 scores.', UserWarning)
sst = (y_true ** 2).sum(0)
no_split = y_pred.ndim == 2
if no_split:
y_pred = y_pred[None]
n_kernels, n_samples, n_targets = y_pred.shape
r2 = backend.zeros_like(y_pred[:, 0, :])
for fsi in range(n_kernels):
if include_correlation:
asst = (y_pred[fsi] * y_pred).sum(0)
else:
asst = (y_pred[fsi] ** 2)
inter = y_true * y_pred[fsi]
r2[fsi, :] = ((2 * inter - asst) / sst).sum(0)
del inter, asst
if no_split:
r2 = r2[0]
return r2
[docs]def r2_score_split_svd(y_true, y_pred):
"""Split the R2 score into individual components using relative weights.
When estimating a linear joint model, the predictions of each feature space
are summed::
Yhat_joint = Yhat_A + Yhat_B + ... + Yhat_Z
The joint model R2 can be computed as::
R2_joint = R2(Yhat_joint, Y)
This function estimates the contribution of each feature space to the joint
model R2 such that::
R2_joint = R2_A + R2_B + ... + R2_Z
This function differs from r2_score_split in the method used to decompose
the variance. The function r2_score_split is based on the product measure
method, while the function r2_score_split_svd is based on the relative
weights method.
This function assumes that y_true is zero-mean over samples.
Parameters
----------
y_true : array of shape (n_samples, n_targets)
Observed data. Has to be zero-mean over samples.
y_pred : array of shape (n_kernels, n_samples, n_targets) or \
(n_samples, n_targets)
Predictions.
Returns
-------
r2 : array (n_kernels, n_targets) or (n_targets, )
Individual feature space R2 scores.
"""
backend = get_backend()
y_true, y_pred = backend.check_arrays(y_true, y_pred)
y_pred = _check_finite(y_pred)
if backend.any(backend.abs(y_true.mean(0)) > 1e-6):
warnings.warn(
'y_true has to be zero-mean over samples to compute '
'the split r2 scores.', UserWarning)
no_split = y_pred.ndim == 2
if no_split:
y_pred = y_pred[None]
# compute the R2 score on the sum of sub-predictions
y_pred_sum = y_pred.sum(0)
full_r2 = r2_score(y_true, y_pred_sum)
# normalize the prediction sum
norms_pred_sum = backend.norm(y_pred_sum, axis=0, keepdims=True)
norms_pred_sum[norms_pred_sum == 0] = 1
y_pred_sum = y_pred_sum / norms_pred_sum
y_pred = y_pred / norms_pred_sum
norms_pred = backend.norm(y_pred, axis=1)
norms_pred[norms_pred == 0] = 1
# Compute the singular value decomposition. It is done on non-normalized
# y_pred to preserve the scaling of sub-predictions.
U, s, Vt = backend.svd(backend.transpose(y_pred, (2, 1, 0)),
full_matrices=False)
V = backend.transpose(Vt, (0, 2, 1))
Ut = backend.transpose(U, (0, 2, 1))
# regression of y_pred over V.U^T
VsVt = V @ (s[:, :, None] * Vt)
# regression of y_pred_sum over V.U^T
beta = (V @ Ut) @ y_pred_sum.T[:, :, None]
# Aggregates VsVt and beta to compute the relative weights. It uses a
# scaling by norms_pred because the SVD was computed on non-normalized
# y_pred.
r2 = backend.matmul(VsVt ** 2 / norms_pred.T[:, None, :] ** 2,
beta ** 2)[:, :, 0].T
# fix rounding errors
r2 /= r2.sum(0)[None, :]
# scale by the total R2
r2 *= full_r2[None, :]
if no_split:
r2 = r2[0]
return r2
[docs]def correlation_score_split(y_true, y_pred):
"""Split the correlation score into individual components.
When estimating a linear joint model, the predictions of each feature space
are summed::
Yhat_joint = Yhat_A + Yhat_B + ... + Yhat_Z
The joint model correlation score r can be computed as::
r_joint = r(Y, Yhat_joint)
This function estimates the contribution of each feature space to the joint
model correlation score r such that::
r_joint = r_A + r_B + ... + r_Z
Parameters
----------
y_true : array or Tensor of shape (n_samples, n_targets)
Ground truth.
y_pred : array or Tensor of shape (n_predictions, n_samples, n_targets) or\
(n_samples, n_targets)
Predictions.
Returns
-------
correlations : array of shape (n_predictions, n_targets) or (n_targets, )
Contributions of each individual feature space to the joint correlation
score.
"""
backend = get_backend()
y_true, y_pred = backend.check_arrays(y_true, y_pred)
y_pred = _check_finite(y_pred)
# broadcasting if y_pred has more dimensions
axis = 1 if len(y_pred.shape) > len(y_true.shape) else 0
# demean to take the covariance
y_true = y_true - y_true.mean(0, keepdims=True)
y_pred = y_pred - y_pred.mean(axis, keepdims=True)
y_true_std = backend.std_float64(y_true, 0, demean=False, keepdims=False)
y_true_std[y_true_std == 0] = 1
split = y_pred.ndim == 3
if split:
y_true = y_true[None]
y_pred_sum = y_pred.sum(0, keepdims=True)
y_pred_std = backend.std_float64(y_pred_sum, axis, demean=True,
keepdims=False)
else:
y_pred_std = backend.std_float64(y_pred, axis, demean=True,
keepdims=True)
y_pred_std[y_pred_std == 0] = 1
correlations = (y_true * y_pred).mean(axis) / (y_true_std * y_pred_std)
if not split:
correlations = correlations[0]
return correlations
###############################################################################
def _check_finite(y_pred):
backend = get_backend()
is_nan = backend.isnan(y_pred)
if backend.any(is_nan):
warnings.warn('nan in y_pred.')
y_pred[is_nan] = 0
isinf = backend.isinf(y_pred)
if backend.any(isinf):
warnings.warn('inf in y_pred.')
y_pred[isinf] = 0
return y_pred
def _zscore(X, axis):
backend = get_backend()
Xz = X - X.mean(axis, keepdims=True)
std = backend.std_float64(Xz, axis, demean=False, keepdims=True)
std[std == 0] = 1
Xz /= std
return Xz