from abc import ABC, abstractmethod
import warnings
from sklearn.base import BaseEstimator, RegressorMixin, MultiOutputMixin
from sklearn.utils.validation import check_is_fitted
from ._solvers import KERNEL_RIDGE_SOLVERS
from ._solvers import WEIGHTED_KERNEL_RIDGE_SOLVERS
from ._hyper_gradient import MULTIPLE_KERNEL_RIDGE_SOLVERS
from ._random_search import solve_kernel_ridge_cv_eigenvalues
from ._kernels import pairwise_kernels
from ._kernels import PAIRWISE_KERNEL_FUNCTIONS
from ._predictions import predict_weighted_kernel_ridge
from ._predictions import predict_and_score_weighted_kernel_ridge
from ._predictions import primal_weights_weighted_kernel_ridge
from ..validation import check_array
from ..validation import check_cv
from ..validation import issparse
from ..validation import _get_string_dtype
from ..backend import get_backend
from ..backend import force_cpu_backend
from ..scoring import r2_score
from ..scoring import r2_score_split
class _BaseKernelRidge(ABC, MultiOutputMixin, RegressorMixin, BaseEstimator):
"""Base class for kernel ridge estimators"""
ALL_KERNELS = PAIRWISE_KERNEL_FUNCTIONS
@property
@classmethod
@abstractmethod
def ALL_SOLVERS(cls):
...
def _call_solver(self, **direct_params):
"""Helper function common to all classes, merging solver parameters."""
# use self.solver_ if it exists, otherwise use self.solver
solver = getattr(self, "solver_", self.solver)
if solver not in self.ALL_SOLVERS:
raise ValueError("Unknown solver=%r." % solver)
function = self.ALL_SOLVERS[solver]
solver_params = self.solver_params or {}
# check duplicated parameters
intersection = set(direct_params.keys()).intersection(
set(solver_params.keys()))
if intersection:
raise ValueError(
'Parameters %s should not be given in solver_params, since '
'they are either fixed or have a direct parameter in %s.' %
(intersection, self.__class__.__name__))
return function(**direct_params, **solver_params)
def _more_tags(self):
return {'requires_y': True}
[docs]class KernelRidge(_BaseKernelRidge):
"""Kernel ridge regression.
Solve the kernel ridge regression::
w* = argmin_w ||K @ w - Y||^2 + alpha (w.T @ K @ w)
where K is a kernel computed on the input X.
The default kernel is linear (K = X @ X.T).
Parameters
----------
alpha : float, or array of shape (n_targets, )
L2 regularization parameter.
kernel : str or callable, default="linear"
Kernel mapping. Available kernels are: 'linear',
'polynomial, 'poly', 'rbf', 'sigmoid', 'cosine', or 'precomputed'.
Set to 'precomputed' in order to pass a precomputed kernel matrix to
the estimator methods instead of samples.
A callable should accept two arguments and the keyword arguments passed
to this object as kernel_params, and should return a floating point
number.
kernel_params : dict or None
Additional parameters for the kernel function.
See more details in the docstring of the function:
``KernelRidge.ALL_KERNELS[kernel]``
solver : str
Algorithm used during the fit, "eigenvalues", "conjugate_gradient",
"gradient_descent", or "auto".
If "auto", use "eigenvalues" if ``alpha`` is a float, and
"conjugate_gradient" is ``alpha`` is an array.
solver_params : dict or None
Additional parameters for the solver.
See more details in the docstring of the function:
``KernelRidge.ALL_SOLVERS[solver]``
fit_intercept : boolean
Whether to fit an intercept.
If False, X and Y must be zero-mean over samples.
force_cpu : bool
If True, computations will be performed on CPU, ignoring the
current backend. If False, use the current backend.
warn : bool
If True, warn if the number of samples is larger than the number of
features, and if the kernel is linear.
Attributes
----------
dual_coef_ : array of shape (n_samples) or (n_samples, n_targets)
Representation of weight vectors in kernel space.
intercept_ : float or array of shape (n_targets, )
Intercept. Only present if fit_intercept is True.
X_fit_ : array of shape (n_samples, n_features)
Training data. If kernel == "precomputed" this is None.
n_features_in_ : int
Number of features (or number of samples if kernel == "precomputed")
used during the fit.
dtype_ : str
Dtype of input data.
Examples
--------
>>> from himalaya.kernel_ridge import KernelRidge
>>> import numpy as np
>>> n_samples, n_features, n_targets = 10, 5, 3
>>> X = np.random.randn(n_samples, n_features)
>>> Y = np.random.randn(n_samples, n_targets)
>>> model = KernelRidge()
>>> model.fit(X, Y)
KernelRidge()
"""
ALL_SOLVERS = KERNEL_RIDGE_SOLVERS
def __init__(self, alpha=1, kernel="linear", kernel_params=None,
solver="auto", solver_params=None, fit_intercept=False,
force_cpu=False, warn=True):
self.alpha = alpha
self.kernel = kernel
self.kernel_params = kernel_params
self.solver = solver
self.solver_params = solver_params
self.fit_intercept = fit_intercept
self.force_cpu = force_cpu
self.warn = warn
@force_cpu_backend
def fit(self, X, y=None, sample_weight=None):
"""Fit the model.
Parameters
----------
X : array of shape (n_samples, n_features)
Training data. If kernel == "precomputed" this is instead
a precomputed kernel array of shape (n_samples, n_samples).
y : array of shape (n_samples,) or (n_samples, n_targets)
Target values.
sample_weight : None, or array of shape (n_samples, )
Individual weights for each sample, ignored if None is passed.
Returns
-------
self : returns an instance of self.
"""
backend = get_backend()
accept_sparse = False if self.kernel == "precomputed" else ("csr",
"csc")
X = check_array(X, accept_sparse=accept_sparse, ndim=2)
self.dtype_ = _get_string_dtype(X)
y = check_array(y, dtype=self.dtype_, ndim=[1, 2])
if X.shape[0] != y.shape[0]:
raise ValueError("Inconsistent number of samples.")
if sample_weight is not None:
sample_weight = check_array(sample_weight, dtype=self.dtype_,
ndim=1)
if sample_weight.shape[0] != y.shape[0]:
raise ValueError("Inconsistent number of samples.")
n_samples, n_features = X.shape
if n_samples > n_features and self.kernel == "linear" and self.warn:
warnings.warn(
"Solving linear kernel ridge is slower than solving ridge when"
f" n_samples > n_features (here {n_samples} > {n_features}). "
"Using himalaya.ridge.Ridge would be faster. Use warn=False to"
" silence this warning.", UserWarning)
K = self._get_kernel(X)
self.X_fit_ = _to_cpu(X) if self.kernel != "precomputed" else None
self.n_features_in_ = X.shape[1]
del X
ravel = False
if y.ndim == 1:
y = y[:, None]
ravel = True
if sample_weight is not None:
# We need to support sample_weight directly because K might be a
# precomputed kernel.
sw = backend.sqrt(sample_weight)[:, None]
y = y * sw
K *= sw @ sw.T
# select solver based on the presence of multiple alphas
if self.solver == "auto":
if backend.atleast_1d(self.alpha).shape[0] == 1:
self.solver_ = "eigenvalues"
else:
self.solver_ = "conjugate_gradient"
else:
self.solver_ = self.solver
# ------------------ call the solver
tmp = self._call_solver(K=K, Y=y, alpha=self.alpha,
fit_intercept=self.fit_intercept)
if self.fit_intercept:
self.dual_coef_, self.intercept_ = tmp
else:
self.dual_coef_ = tmp
if ravel:
self.dual_coef_ = self.dual_coef_[:, 0]
if self.fit_intercept:
self.intercept_ = self.intercept_[0]
return self
@force_cpu_backend
def predict(self, X):
"""Predict using the model.
Parameters
----------
X : array of shape (n_samples_test, n_features)
Samples. If kernel == "precomputed" this is instead a precomputed
kernel array of shape (n_samples_test, n_samples_train).
Returns
-------
Y_hat : array of shape (n_samples,) or (n_samples, n_targets)
Returns predicted values.
"""
check_is_fitted(self)
backend = get_backend()
accept_sparse = False if self.kernel == "precomputed" else ("csr",
"csc")
X = check_array(X, dtype=self.dtype_, accept_sparse=accept_sparse,
ndim=2)
if X.shape[1] != self.n_features_in_:
raise ValueError(
'Different number of features in X than during fit.')
K = self._get_kernel(X, self.X_fit_)
del X
Y_hat = backend.to_cpu(K) @ backend.to_cpu(self.dual_coef_)
if self.fit_intercept:
Y_hat += backend.to_cpu(self.intercept_)
return Y_hat
@force_cpu_backend
def score(self, X, y):
"""Return the coefficient of determination R^2 of the prediction.
Parameters
----------
X : array of shape (n_samples_test, n_features)
Samples. If kernel == "precomputed" this is instead a precomputed
kernel array of shape (n_samples_test, n_samples_train).
y : array-like of shape (n_samples,) or (n_samples, n_targets)
True values for X.
Returns
-------
score : array of shape (n_targets, )
R^2 of self.predict(X) versus y.
"""
y_pred = self.predict(X)
y_true = check_array(y, dtype=self.dtype_, ndim=self.dual_coef_.ndim)
if y_true.ndim == 1:
return r2_score(y_true[:, None], y_pred[:, None])[0]
else:
return r2_score(y_true, y_pred)
def _get_kernel(self, X, Y=None):
backend = get_backend()
kernel_params = self.kernel_params or {}
if Y is not None and not issparse(X):
Y = backend.asarray_like(Y, ref=X)
kernel = pairwise_kernels(X, Y, metric=self.kernel, **kernel_params)
return backend.asarray(kernel)
@property
def _pairwise(self):
return self.kernel == "precomputed"
def get_primal_coef(self, X_fit=None):
"""Returns the primal coefficients, assuming the kernel is linear.
When the kernel is linear, kernel ridge regression is equivalent to
ridge rergession, and the ridge regression (primal) coefficients can be
computed from the kernel ridge regression (dual) coefficients, using
the training features X_fit.
Parameters
----------
X_fit : array of shape (n_samples_fit, n_features) or None
Training features. Used only if self.kernel == "precomputed".
If you used a Kernelizer, you can use the method `get_X_fit`.
Returns
-------
primal_coef : array of shape (n_features, n_targets)
Coefficient of the equivalent ridge regression. The coefficients
are computed on CPU memory, since they can be large.
"""
check_is_fitted(self)
backend = get_backend()
if self.kernel == "linear":
X_fit_T = self.X_fit_.T
elif self.kernel == "precomputed":
if X_fit is None:
raise ValueError(
"get_primal_coef requires the training features `X_fit`. "
"If you used a Kernelizer, you can use the method "
"`get_X_fit`.")
X_fit_T = X_fit.T
else:
raise ValueError("The primal coefficients can only be computed "
"when using a linear kernel.")
X_fit_T = backend.to_cpu(X_fit_T)
dual_coef = backend.to_cpu(self.dual_coef_)
return X_fit_T @ dual_coef
[docs]class KernelRidgeCV(KernelRidge):
"""Kernel ridge regression with efficient cross-validation over alpha.
Parameters
----------
alphas : array of shape (n_alphas, )
List of L2 regularization parameter to try.
kernel : str or callable, default="linear"
Kernel mapping. Available kernels are: 'linear',
'polynomial, 'poly', 'rbf', 'sigmoid', 'cosine', or 'precomputed'.
Set to 'precomputed' in order to pass a precomputed kernel matrix to
the estimator methods instead of samples.
A callable should accept two arguments and the keyword arguments passed
to this object as kernel_params, and should return a floating point
number.
kernel_params : dict or None
Additional parameters for the kernel function.
See more details in the docstring of the function:
``KernelRidgeCV.ALL_KERNELS[kernel]``
solver : str
Algorithm used during the fit, "eigenvalues" only for now.
solver_params : dict or None
Additional parameters for the solver.
See more details in the docstring of the function:
``KernelRidgeCV.ALL_SOLVERS[solver]``
fit_intercept : boolean
Whether to fit an intercept.
If False, X and Y must be zero-mean over samples.
cv : int or scikit-learn splitter
Cross-validation splitter. If an int, KFold is used.
Y_in_cpu : bool
If True, keep the target values ``y`` in CPU memory (slower).
force_cpu : bool
If True, computations will be performed on CPU, ignoring the
current backend. If False, use the current backend.
warn : bool
If True, warn if the number of samples is larger than the number of
features, and if the kernel is linear.
Attributes
----------
dual_coef_ : array of shape (n_samples) or (n_samples, n_targets)
Representation of weight vectors in kernel space.
best_alphas_ : array of shape (n_targets, )
Selected best hyperparameter alphas.
cv_scores_ : array of shape (n_targets, )
Cross-validation scores averaged over splits, for the best alpha.
By default, the scores are computed with l2_neg_loss (in ]-inf, 0]).
The scoring function can be changed with solver_params["score_func"].
X_fit_ : array of shape (n_samples, n_features)
Training data. If kernel == "precomputed" this is None.
n_features_in_ : int
Number of features (or number of samples if kernel == "precomputed")
used during the fit.
Examples
--------
>>> from himalaya.ridge import KernelRidgeCV
>>> import numpy as np
>>> n_samples, n_features, n_targets = 10, 5, 3
>>> X = np.random.randn(n_samples, n_features)
>>> Y = np.random.randn(n_samples, n_targets)
>>> clf = KernelRidgeCV()
>>> clf.fit(X, Y)
KernelRidgeCV()
"""
ALL_SOLVERS = dict(eigenvalues=solve_kernel_ridge_cv_eigenvalues)
def __init__(self, alphas=[0.1, 1], kernel="linear", kernel_params=None,
solver="eigenvalues", solver_params=None, fit_intercept=False,
cv=5, Y_in_cpu=False, force_cpu=False, warn=True):
self.alphas = alphas
self.kernel = kernel
self.kernel_params = kernel_params
self.solver = solver
self.solver_params = solver_params
self.fit_intercept = fit_intercept
self.cv = cv
self.Y_in_cpu = Y_in_cpu
self.force_cpu = force_cpu
self.warn = warn
@force_cpu_backend
def fit(self, X, y=None, sample_weight=None):
"""Fit kernel ridge regression model
Parameters
----------
X : array of shape (n_samples, n_features).
Training data. If kernel == "precomputed" this is instead
a precomputed kernel array of shape (n_samples, n_samples).
y : array of shape (n_samples,) or (n_samples, n_targets)
Target values.
sample_weight : None, or array of shape (n_samples, )
Individual weights for each sample, ignored if None is passed.
Returns
-------
self : returns an instance of self.
"""
backend = get_backend()
X = check_array(X, accept_sparse=("csr", "csc"), ndim=2)
self.dtype_ = _get_string_dtype(X)
device = "cpu" if self.Y_in_cpu else None
y = check_array(y, dtype=self.dtype_, ndim=[1, 2], device=device)
if X.shape[0] != y.shape[0]:
raise ValueError("Inconsistent number of samples.")
if sample_weight is not None:
sample_weight = check_array(sample_weight, dtype=self.dtype_,
ndim=1)
if sample_weight.shape[0] != y.shape[0]:
raise ValueError("Inconsistent number of samples.")
alphas = check_array(self.alphas, dtype=self.dtype_, ndim=1)
n_samples, n_features = X.shape
if n_samples > n_features and self.kernel == "linear" and self.warn:
warnings.warn(
"Solving linear kernel ridge is slower than solving ridge when"
f" n_samples > n_features (here {n_samples} > {n_features}). "
"Using himalaya.ridge.RidgeCV would be faster. "
"Use warn=False to silence this warning.", UserWarning)
K = self._get_kernel(X)
self.X_fit_ = _to_cpu(X) if self.kernel != "precomputed" else None
self.n_features_in_ = X.shape[1]
del X
ravel = False
if y.ndim == 1:
y = y[:, None]
ravel = True
if sample_weight is not None:
# We need to support sample_weight directly because K might be a
# pre-computed kernel.
sw = backend.sqrt(sample_weight)[:, None]
y = y * backend.to_cpu(sw) if self.Y_in_cpu else y * sw
K *= sw @ sw.T
cv = check_cv(self.cv, y)
# ------------------ call the solver
tmp = self._call_solver(K=K, Y=y, cv=cv, alphas=alphas,
Y_in_cpu=self.Y_in_cpu,
fit_intercept=self.fit_intercept)
if self.fit_intercept:
self.best_alphas_, self.dual_coef_, self.cv_scores_ = tmp[:3]
self.intercept_, = tmp[3:]
else:
self.best_alphas_, self.dual_coef_, self.cv_scores_ = tmp
self.cv_scores_ = self.cv_scores_[0]
if ravel:
self.dual_coef_ = self.dual_coef_[:, 0]
if self.fit_intercept:
self.intercept_ = self.intercept_[0]
return self
def _more_tags(self):
return {
'_xfail_checks': {
'check_sample_weights_invariance':
'zero sample_weight is not equivalent to removing samples, '
'because of the cross-validation splits.',
}
}
###############################################################################
###############################################################################
###############################################################################
###############################################################################
class _BaseWeightedKernelRidge(_BaseKernelRidge):
"""Private class for shared implementations.
"""
@force_cpu_backend
def predict(self, X, split=False):
"""Predict using the model.
Parameters
----------
X : array of shape (n_samples_test, n_features)
Samples. If kernels == "precomputed" this is instead a precomputed
kernel array of shape (n_kernels, n_samples_test, n_samples_train).
split : bool
If True, the prediction is split on each kernel, and this method
returns an array with one extra dimension (first dimension).
The sum over this extra dimension corresponds to split=False.
Returns
-------
Y_hat : array of shape (n_samples,) or (n_samples, n_targets)
Returns predicted values.
If parameter split is True, the array is of shape
(n_kernels, n_samples,) or (n_kernels, n_samples, n_targets).
"""
check_is_fitted(self)
ndim = 3 if self.kernels == "precomputed" else 2
accept_sparse = False if self.kernels == "precomputed" else ("csr",
"csc")
X = check_array(X, dtype=self.dtype_, accept_sparse=accept_sparse,
ndim=ndim)
if X.shape[-1] != self.n_features_in_:
raise ValueError(
'Different number of features in X than during fit.')
Ks = self._get_kernels(X, self.X_fit_)
del X
if (self.solver_params is not None
and "n_targets_batch" in self.solver_params):
n_targets_batch = self.solver_params["n_targets_batch"]
else:
n_targets_batch = None
if self.dual_coef_.ndim == 1:
Y_hat = predict_weighted_kernel_ridge(
Ks=Ks, dual_weights=self.dual_coef_[:, None],
deltas=self.deltas_[:, None], split=split,
n_targets_batch=n_targets_batch)[..., 0]
else:
Y_hat = predict_weighted_kernel_ridge(
Ks=Ks, dual_weights=self.dual_coef_, deltas=self.deltas_,
split=split, n_targets_batch=n_targets_batch)
return Y_hat
@force_cpu_backend
def score(self, X, y, split=False):
"""Return the coefficient of determination R^2 of the prediction.
Parameters
----------
X : array of shape (n_samples_test, n_features)
Samples. If kernels == "precomputed" this is instead a precomputed
kernel array of shape (n_kernels, n_samples_test, n_samples_train).
y : array-like of shape (n_samples,) or (n_samples, n_targets)
True values for X.
split : bool
If True, the prediction is split on each kernel, and the R2 score
is decomposed over sub-predictions, adding an extra dimension
in the first axis. The sum over this extra dimension corresponds to
split=False.
Returns
-------
score : array of shape (n_targets, ) or (n_kernels, n_targets)
R^2 of self.predict(X) versus y.
If parameter split is True, the array is of shape
(n_kernels, n_targets).
"""
check_is_fitted(self)
ndim = 3 if self.kernels == "precomputed" else 2
accept_sparse = False if self.kernels == "precomputed" else ("csr",
"csc")
X = check_array(X, dtype=self.dtype_, accept_sparse=accept_sparse,
ndim=ndim)
y = check_array(y, dtype=self.dtype_, ndim=self.dual_coef_.ndim)
if X.shape[-1] != self.n_features_in_:
raise ValueError(
'Different number of features in X than during fit.')
Ks = self._get_kernels(X, self.X_fit_)
del X
if (self.solver_params is not None
and "n_targets_batch" in self.solver_params):
n_targets_batch = self.solver_params["n_targets_batch"]
else:
n_targets_batch = None
score_func = r2_score_split if split else r2_score
if self.dual_coef_.ndim == 1:
score = predict_and_score_weighted_kernel_ridge(
Ks=Ks, dual_weights=self.dual_coef_[:, None],
deltas=self.deltas_[:, None], Y=y[:, None], split=split,
score_func=score_func, n_targets_batch=n_targets_batch)[..., 0]
else:
score = predict_and_score_weighted_kernel_ridge(
Ks=Ks, dual_weights=self.dual_coef_, deltas=self.deltas_, Y=y,
split=split, score_func=score_func,
n_targets_batch=n_targets_batch)
return score
def _get_kernels(self, X, Y=None):
backend = get_backend()
if isinstance(self.kernels, str) and self.kernels == "precomputed":
kernels = backend.asarray(X)
elif not isinstance(self.kernels, (list, tuple)):
raise ValueError("Parameter 'kernels' has to be a list or a tuple "
"of kernel parameters. Got %r instead." %
(self.kernels, ))
else:
n_kernels = len(self.kernels)
kernels_params = self.kernels_params or [{}] * n_kernels
if Y is not None and not issparse(X):
Y = backend.asarray_like(Y, ref=X)
kernels = []
for metric, params in zip(self.kernels, kernels_params):
kernel = pairwise_kernels(X, Y, metric=metric, **params)
kernels.append(kernel)
kernels = backend.stack(kernels)
return kernels
@force_cpu_backend
def get_primal_coef(self, Xs_fit):
"""Returns the primal coefficients, assuming all kernels are linear.
When all kernels are linear, weighted kernel ridge regression is
equivalent to weighted ridge rergession, and the ridge regression
(primal) coefficients can be computed from the kernel ridge regression
(dual) coefficients, using the training features Xs_fit.
This currently only works when self.kernels == "precomputed".
Parameters
----------
Xs_fit : list of array of shape (n_samples_fit, n_features)
Training features. If you used a ColumnKernelizer, you can use the
method `get_X_fit` to get this list of arrays.
Returns
-------
primal_coef : list of array of shape (n_features, n_targets)
Coefficient of the equivalent ridge regression. The coefficients
are computed on CPU memory, since they can be large.
"""
check_is_fitted(self)
if self.kernels == "precomputed":
if Xs_fit is None:
raise ValueError(
"get_primal_coef requires the training features `Xs_fit`. "
"If you used a ColumnKernelizer, you can use the method "
"`get_X_fit`.")
primal_coef = primal_weights_weighted_kernel_ridge(
self.dual_coef_, self.deltas_, Xs_fit)
else:
raise ValueError("The primal coefficients can only be computed "
"when using precomputed kernels.")
return primal_coef
[docs]class MultipleKernelRidgeCV(_BaseWeightedKernelRidge):
"""Multiple-kernel ridge regression with cross-validation.
Solve the kernel ridge regression::
w* = argmin_w ||K @ w - Y||^2 + (w.T @ K @ w)
where the kernel K is a weighted sum of multiple kernels Ks::
K = sum_i exp(deltas[i]) Ks[i]
The solver optimizes the log kernel weight ``deltas`` over
cross-validation, using random search (``solver="random_search"``), or
hyperparameter gradient descent (``solver="hyper_gradient"``).
Parameters
----------
kernels : list of (str or callable), default=["linear", "polynomial"]
List of kernel mapping. Available kernels are: 'linear',
'polynomial, 'poly', 'rbf', 'sigmoid', 'cosine'.
Set to 'precomputed' in order to pass a precomputed kernel matrix to
the estimator methods instead of samples.
A callable should accept two arguments and the keyword arguments passed
to this object as kernel_params, and should return a floating point
number.
kernels_params : list of dict, or None
Additional parameters for the kernel functions.
See more details in the docstring of the function:
``MultipleKernelRidgeCV.ALL_KERNELS[kernel]``
solver : str
Algorithm used during the fit, "random_search", or "hyper_gradient".
solver_params : dict or None
Additional parameters for the solver.
See more details in the docstring of the function:
``MultipleKernelRidgeCV.ALL_SOLVERS[solver]``
cv : int or scikit-learn splitter
Cross-validation splitter. If an int, KFold is used.
random_state : int, or None
Random generator seed. Use an int for deterministic search.
Y_in_cpu : bool
If True, keep the target values ``y`` in CPU memory (slower).
force_cpu : bool
If True, computations will be performed on CPU, ignoring the
current backend. If False, use the current backend.
Attributes
----------
dual_coef_ : array of shape (n_samples) or (n_samples, n_targets)
Representation of weight vectors in kernel space.
deltas_ : array of shape (n_kernels, n_targets)
Log of kernel weights.
cv_scores_ : array of shape (n_iter, n_targets)
Cross-validation scores, averaged over splits.
By default, the scores are computed with l2_neg_loss (in ]-inf, 0]).
The scoring function can be changed with solver_params["score_func"].
X_fit_ : array of shape (n_samples, n_features)
Training data. If ``kernels == "precomputed"`` this is None.
n_features_in_ : int
Number of features (or number of samples if
``kernels == "precomputed"``) used during the fit.
dtype_ : str
Dtype of input data.
best_alphas_ : array of shape (n_targets, )
Equal to ``1. / exp(self.deltas_).sum(0)``. For the "random_search"
solver, it corresponds to the best hyperparameter alphas, assuming that
each kernel weight vector sums to one (in particular, it is the case
when ``solver_params['n_iter']`` is an integer).
Examples
--------
>>> from himalaya.kernel_ridge import MultipleKernelRidgeCV
>>> from himalaya.kernel_ridge import ColumnKernelizer
>>> from himalaya.kernel_ridge import Kernelizer
>>> from sklearn.pipeline import make_pipeline
>>> # create a dataset
>>> import numpy as np
>>> n_samples, n_features, n_targets = 10, 5, 3
>>> X = np.random.randn(n_samples, n_features)
>>> Y = np.random.randn(n_samples, n_targets)
>>> # Kernelize separately the first three columns and the last two
>>> # columns, creating two kernels of shape (n_samples, n_samples).
>>> ck = ColumnKernelizer(
... [("kernel_1", Kernelizer(kernel="linear"), [0, 1, 2]),
... ("kernel_2", Kernelizer(kernel="polynomial"), slice(3, 5))])
>>> # A model with precomputed kernels, as output by ColumnKernelizer
>>> model = MultipleKernelRidgeCV(kernels="precomputed")
>>> pipe = make_pipeline(ck, model)
>>> _ = pipe.fit(X, Y)
"""
ALL_SOLVERS = MULTIPLE_KERNEL_RIDGE_SOLVERS
def __init__(self, kernels=["linear", "polynomial"], kernels_params=None,
solver="random_search", solver_params=None, cv=5,
random_state=None, Y_in_cpu=False, force_cpu=False):
self.kernels = kernels
self.kernels_params = kernels_params
self.solver = solver
self.solver_params = solver_params
self.cv = cv
self.random_state = random_state
self.Y_in_cpu = Y_in_cpu
self.force_cpu = force_cpu
@force_cpu_backend
def fit(self, X, y=None, sample_weight=None):
"""Fit the model.
Parameters
----------
X : array of shape (n_samples, n_features)
Training data. If kernels == "precomputed" this is instead
a precomputed kernel array of shape
(n_kernels, n_samples, n_samples).
y : array of shape (n_samples,) or (n_samples, n_targets)
Target values.
sample_weight : None, or array of shape (n_samples, )
Individual weights for each sample, ignored if None is passed.
Returns
-------
self : returns an instance of self.
"""
backend = get_backend()
ndim = 3 if self.kernels == "precomputed" else 2
accept_sparse = False if self.kernels == "precomputed" else ("csr",
"csc")
X = check_array(X, accept_sparse=accept_sparse, ndim=ndim)
self.dtype_ = _get_string_dtype(X)
device = "cpu" if self.Y_in_cpu else None
y = check_array(y, dtype=self.dtype_, ndim=[1, 2], device=device)
n_samples = X.shape[1] if self.kernels == "precomputed" else X.shape[0]
if n_samples != y.shape[0]:
raise ValueError("Inconsistent number of samples.")
if sample_weight is not None:
sample_weight = check_array(sample_weight, dtype=self.dtype_,
ndim=1)
if sample_weight.shape[0] != y.shape[0]:
raise ValueError("Inconsistent number of samples.")
Ks = self._get_kernels(X)
self.X_fit_ = _to_cpu(X) if self.kernels != "precomputed" else None
self.n_features_in_ = X.shape[-1]
del X
ravel = False
if y.ndim == 1:
y = y[:, None]
ravel = True
if sample_weight is not None:
# We need to support sample_weight directly because K might be a
# precomputed kernel.
sw = backend.sqrt(sample_weight)[:, None]
y = y * backend.to_cpu(sw) if self.Y_in_cpu else y * sw
Ks *= (sw @ sw.T)[None]
cv = check_cv(self.cv, y)
# ------------------ call the solver
tmp = self._call_solver(Ks=Ks, Y=y, cv=cv, return_weights="dual",
Xs=None, random_state=self.random_state,
Y_in_cpu=self.Y_in_cpu)
self.deltas_, self.dual_coef_, self.cv_scores_ = tmp
if self.solver == "random_search":
self.best_alphas_ = 1. / backend.exp(self.deltas_).sum(0)
else:
self.best_alphas_ = None
if ravel:
self.dual_coef_ = self.dual_coef_[:, 0]
self.deltas_ = self.deltas_[:, 0]
return self
def _more_tags(self):
return {
'_xfail_checks': {
'check_sample_weights_invariance':
'zero sample_weight is not equivalent to removing samples, '
'because of the cross-validation splits.',
}
}
[docs]class WeightedKernelRidge(_BaseWeightedKernelRidge):
"""Weighted kernel ridge regression.
Solve the kernel ridge regression::
w* = argmin_w ||K @ w - Y||^2 + alpha (w.T @ K @ w)
where the kernel K is a weighted sum of multiple kernels::
K = sum_i exp(deltas[i]) Ks[i]
Contrarily to ``MultipleKernelRidgeCV``, this model does not optimize the
log kernel-weights ``deltas``. However, it is not equivalent to
``KernelRidge``, since the log kernel-weights ``deltas`` can be different
for each target, therefore the kernel sum is not precomputed.
Parameters
----------
alpha : float, or array of shape (n_targets, )
L2 regularization parameter.
deltas : array of shape (n_kernels, ) or (n_kernels, n_targets)
Kernel weights.
Default to "zeros", an array of shape (n_kernels, ) filled with zeros.
kernels : list of (str or callable), default=["linear", "polynomial"]
List of kernel mapping. Available kernels are: 'linear',
'polynomial, 'poly', 'rbf', 'sigmoid', 'cosine'.
Set to 'precomputed' in order to pass a precomputed kernel matrix to
the estimator methods instead of samples.
A callable should accept two arguments and the keyword arguments passed
to this object as kernel_params, and should return a floating point
number.
kernels_params : list of dict, or None
Additional parameters for the kernel functions.
See more details in the docstring of the function:
``WeightedKernelRidge.ALL_KERNELS[kernel]``
solver : str
Algorithm used during the fit, "conjugate_gradient", or
"gradient_descent".
solver_params : dict or None
Additional parameters for the solver.
See more details in the docstring of the function:
``WeightedKernelRidge.ALL_SOLVERS[solver]``
random_state : int, or None
Random generator seed. Use an int for deterministic search.
force_cpu : bool
If True, computations will be performed on CPU, ignoring the
current backend. If False, use the current backend.
Attributes
----------
dual_coef_ : array of shape (n_samples) or (n_samples, n_targets)
Representation of weight vectors in kernel space.
deltas_ : array of shape (n_kernels, n_targets) or (n_kernels, )
Log of kernel weights.
X_fit_ : array of shape (n_samples, n_features)
Training data. If kernels == "precomputed" this is None.
n_features_in_ : int
Number of features (or number of samples if kernels == "precomputed")
used during the fit.
dtype_ : str
Dtype of input data.
Examples
--------
>>> from himalaya.kernel_ridge import WeightedKernelRidge
>>> from himalaya.kernel_ridge import ColumnKernelizer
>>> from himalaya.kernel_ridge import Kernelizer
>>> from sklearn.pipeline import make_pipeline
>>> # create a dataset
>>> import numpy as np
>>> n_samples, n_features, n_targets = 10, 5, 3
>>> X = np.random.randn(n_samples, n_features)
>>> Y = np.random.randn(n_samples, n_targets)
>>> # Kernelize separately the first three columns and the last two
>>> # columns, creating two kernels of shape (n_samples, n_samples).
>>> ck = ColumnKernelizer(
... [("kernel_1", Kernelizer(kernel="linear"), [0, 1, 2]),
... ("kernel_2", Kernelizer(kernel="polynomial"), slice(3, 5))])
>>> # A model with precomputed kernels, as output by ColumnKernelizer
>>> model = WeightedKernelRidge(kernels="precomputed")
>>> pipe = make_pipeline(ck, model)
>>> pipe.fit(X, Y)
"""
ALL_SOLVERS = WEIGHTED_KERNEL_RIDGE_SOLVERS
def __init__(self, alpha=1., deltas="zeros",
kernels=["linear", "polynomial"], kernels_params=None,
solver="conjugate_gradient", solver_params=None,
random_state=None, force_cpu=False):
self.alpha = alpha
self.deltas = deltas
self.kernels = kernels
self.kernels_params = kernels_params
self.solver = solver
self.solver_params = solver_params
self.random_state = random_state
self.force_cpu = force_cpu
@force_cpu_backend
def fit(self, X, y=None, sample_weight=None):
"""Fit kernel ridge regression model
Parameters
----------
X : array of shape (n_samples, n_features)
Training data. If kernels == "precomputed" this is instead
a precomputed kernel array of shape
(n_kernels, n_samples, n_samples).
y : array of shape (n_samples,) or (n_samples, n_targets)
Target values.
sample_weight : None, or array of shape (n_samples, )
Individual weights for each sample, ignored if None is passed.
Returns
-------
self : returns an instance of self.
"""
backend = get_backend()
ndim = 3 if self.kernels == "precomputed" else 2
accept_sparse = False if self.kernels == "precomputed" else ("csr",
"csc")
X = check_array(X, accept_sparse=accept_sparse, ndim=ndim)
self.dtype_ = _get_string_dtype(X)
y = check_array(y, dtype=self.dtype_, ndim=[1, 2])
n_samples = X.shape[1] if self.kernels == "precomputed" else X.shape[0]
if n_samples != y.shape[0]:
raise ValueError("Inconsistent number of samples.")
if sample_weight is not None:
sample_weight = check_array(sample_weight, dtype=self.dtype_,
ndim=1)
if sample_weight.shape[0] != y.shape[0]:
raise ValueError("Inconsistent number of samples.")
Ks = self._get_kernels(X)
self.X_fit_ = _to_cpu(X) if self.kernels != "precomputed" else None
self.n_features_in_ = X.shape[-1]
del X
ravel = False
if y.ndim == 1:
y = y[:, None]
ravel = True
if sample_weight is not None:
# We need to support sample_weight directly because K might be a
# precomputed kernel.
sw = backend.sqrt(sample_weight)[:, None]
y = y * sw
Ks *= (sw @ sw.T)[None]
n_kernels = Ks.shape[0]
if isinstance(self.deltas, str) and self.deltas == "zeros":
self.deltas_ = backend.zeros_like(Ks, shape=(n_kernels, 1))
else:
self.deltas_ = check_array(self.deltas, ndim=[1, 2])
if self.deltas_.shape[0] != n_kernels:
raise ValueError("Inconsistent number of kernels.")
if (self.deltas.ndim == 2 and y.ndim == 2
and self.deltas_.shape[1] != y.shape[1]):
raise ValueError("Inconsistent number of targets.")
if self.deltas_.ndim == 1:
self.deltas_ = self.deltas_[:, None]
# ------------------ call the solver
self.dual_coef_ = self._call_solver(Ks=Ks, Y=y, alpha=self.alpha,
deltas=self.deltas_,
random_state=self.random_state)
if ravel or self.deltas_.shape[1] != self.dual_coef_.shape[1]:
self.deltas_ = self.deltas_[:, 0]
if ravel:
self.dual_coef_ = self.dual_coef_[:, 0]
return self
def _to_cpu(X):
backend = get_backend()
if issparse(X):
return X
else:
return backend.to_cpu(X)