Source code for himalaya.ridge._random_search

import warnings
import numbers

import numpy as np

from ..backend import get_backend
from ..backend._utils import _dtype_to_str
from ..progress_bar import bar
from ..scoring import l2_neg_loss
from ..validation import check_random_state
from ..validation import check_cv
from ..kernel_ridge import generate_dirichlet_samples
from ..kernel_ridge._random_search import _select_best_alphas






def _decompose_ridge(Xtrain, alphas, n_alphas_batch=None, method="svd",
                     negative_eigenvalues="zeros"):
    """Precompute resolution matrices for ridge predictions.

    To compute the prediction::

        Ytest_hat = Xtest @ (XTX + alphas * Id)^-1 @ Xtrain^T @ Ytrain

        where XTX = Xtrain^T @ Xtrain,

    this function precomputes::

        matrices = (XTX + alphas * Id)^-1 @ Xtrain^T.

    Parameters
    ----------
    Xtrain : array of shape (n_samples_train, n_features)
        Concatenated input features.
    alphas : float, or array of shape (n_alphas, )
        Range of ridge regularization parameter.
    n_alphas_batch : int or None
        If not None, returns a generator over batches of alphas.
    method : str in {"svd"}
        Method used to diagonalize the kernel.
    negative_eigenvalues : str in {"nan", "error", "zeros"}
        If the decomposition leads to negative eigenvalues (wrongly emerging
        from float32 errors):
            - "error" raises an error.
            - "zeros" replaces them with zeros.
            - "nan" returns nans if the regularization does not compensate
                twice the smallest negative value, else it ignores the problem.

    Returns
    -------
    matrices : array of shape (n_alphas, n_samples_test, n_samples_train) or \
        (n_alphas, n_features, n_samples_train) if test is not None
        Precomputed resolution matrices.
    alpha_batch : slice
        Slice of the batch of alphas.
    """
    backend = get_backend()

    use_alpha_batch = n_alphas_batch is not None
    if n_alphas_batch is None:
        n_alphas_batch = len(alphas)

    if method == "svd":
        # SVD: X = U @ np.diag(eigenvalues) @ Vt
        U, eigenvalues, Vt = backend.svd(Xtrain, full_matrices=False)
    else:
        raise ValueError("Unknown method=%r." % (method, ))

    for start in range(0, len(alphas), n_alphas_batch):
        batch = slice(start, start + n_alphas_batch)

        ev_weighting = eigenvalues / (alphas[batch, None] + eigenvalues ** 2)

        # negative eigenvalues can emerge from incorrect kernels,
        # or from float32
        if eigenvalues[0] < 0:
            if negative_eigenvalues == "nan":
                ev_weighting[alphas[batch] < -eigenvalues[0] * 2, :] = \
                    backend.asarray(backend.nan, type=ev_weighting.dtype)

            elif negative_eigenvalues == "zeros":
                eigenvalues[eigenvalues < 0] = 0

            elif negative_eigenvalues == "error":
                raise RuntimeError(
                    "Negative eigenvalues. Make sure the kernel is positive "
                    "semi-definite, increase the regularization alpha, or use"
                    "another solver.")
            else:
                raise ValueError("Unknown negative_eigenvalues=%r." %
                                 (negative_eigenvalues, ))

        matrices = backend.matmul(Vt.T, ev_weighting[:, :, None] * U.T)

        if use_alpha_batch:
            yield matrices, batch
        else:
            return matrices, batch

        del matrices


#: Dictionary with all group ridge solvers
GROUP_RIDGE_SOLVERS = {
    "random_search": solve_group_ridge_random_search,
}


[docs] def solve_ridge_cv_svd(X, Y, alphas=1.0, fit_intercept=False, score_func=l2_neg_loss, cv=5, local_alpha=True, n_targets_batch=None, n_targets_batch_refit=None, n_alphas_batch=None, conservative=False, Y_in_cpu=False, warn=True): """Solve ridge regression with a grid search over alphas. Parameters ---------- X : array of shape (n_samples, n_features) Input features. Y : array of shape (n_samples, n_targets) Target data. alphas : float or array of shape (n_alphas, ) Range of ridge regularization parameter. fit_intercept : boolean Whether to fit an intercept. If False, X and Y must be zero-mean over samples. score_func : callable Function used to compute the score of predictions versus Y. cv : int or scikit-learn splitter Cross-validation splitter. If an int, KFold is used. local_alpha : bool If True, alphas are selected per target, else shared over all targets. n_targets_batch : int or None Size of the batch for over targets during cross-validation. Used for memory reasons. If None, uses all n_targets at once. n_targets_batch_refit : int or None Size of the batch for over targets during refit. Used for memory reasons. If None, uses all n_targets at once. n_alphas_batch : int or None Size of the batch for over alphas. Used for memory reasons. If None, uses all n_alphas at once. conservative : bool If True, when selecting the hyperparameter alpha, take the largest one that is less than one standard deviation away from the best. If False, take the best. Y_in_cpu : bool If True, keep the target values ``Y`` in CPU memory (slower). warn : bool If True, warn if the number of samples is smaller than the number of features. Returns ------- best_alphas : array of shape (n_targets, ) Selected best hyperparameter alphas. coefs : array of shape (n_samples, n_targets) Ridge coefficients refit on the entire dataset, using selected best hyperparameters alpha. Always stored on CPU memory. cv_scores : array of shape (n_targets, ) Cross-validation scores averaged over splits, for the best alpha. intercept : array of shape (n_targets,) Intercept. Only returned when fit_intercept is True. """ backend = get_backend() n_samples, n_features = X.shape if n_samples < n_features and warn: warnings.warn( "Solving ridge is slower than solving kernel ridge when n_samples " f"< n_features (here {n_samples} < {n_features}). " "Using a linear kernel in himalaya.kernel_ridge.KernelRidgeCV or " "himalaya.kernel_ridge.solve_kernel_ridge_cv_eigenvalues would be " "faster. Use warn=False to silence this warning.", UserWarning) n_iter = backend.ones_like(X, shape=(1, 1)) fixed_params = dict(return_weights=True, progress_bar=False, concentration=None, jitter_alphas=False, random_state=None, n_iter=n_iter, warn=False) copied_params = dict(alphas=alphas, score_func=score_func, cv=cv, local_alpha=local_alpha, fit_intercept=fit_intercept, n_targets_batch=n_targets_batch, n_targets_batch_refit=n_targets_batch_refit, n_alphas_batch=n_alphas_batch, conservative=conservative, Y_in_cpu=Y_in_cpu) tmp = solve_group_ridge_random_search([X], Y, **copied_params, **fixed_params) if fit_intercept: deltas, coefs, cv_scores, intercept = tmp best_alphas = backend.exp(-deltas[0]) return best_alphas, coefs, cv_scores, intercept else: deltas, coefs, cv_scores = tmp best_alphas = backend.exp(-deltas[0]) return best_alphas, coefs, cv_scores