Source code for himalaya.kernel_ridge._random_search

import os
import warnings
import numbers

import numpy as np

from ..backend import get_backend
from ..backend._utils import _dtype_to_str
from ..progress_bar import bar
from ..scoring import l2_neg_loss
from ..validation import check_random_state
from ..validation import check_cv
from ._solvers import _helper_intercept
from ._kernels import KernelCenterer





def _select_best_alphas(scores, alphas, local_alpha, conservative):
    """Helper to select the best alphas

    Parameters
    ----------
    scores : array of shape (n_splits, n_alphas, n_targets)
        Cross validation scores.
    alphas :array of shape (n_alphas, )
        Regularization parameters.
    local_alpha : bool
        If True, alphas are selected per target, else shared over all targets.
    conservative : bool
        If True, when selecting the hyperparameter alpha, take the largest one
        that is less than one standard deviation away from the best.
        If False, take the best.

    Returns
    -------
    alphas_argmax : array of shape (n_targets, )
        Indices of the best alphas.
    best_scores_mean : arrya of shape (n_targets, )
        Scores, averaged over splits, and maximized over alphas.
    """
    backend = get_backend()

    # average scores over splits
    scores_mean = backend.mean_float64(scores, axis=0)
    # add epsilon slope to select larger alphas if scores are equal
    scores_mean += (backend.log(alphas) * 1e-10)[:, None]

    # compute the max over alphas
    axis = 0
    if local_alpha:
        alphas_argmax = backend.argmax(scores_mean, axis)

        if conservative:
            # take a conservative alpha, the largest that beats the best
            # score minus one standard deviation
            scores_std = scores.std(0)
            to_beat = backend.apply_argmax(scores_mean - scores_std,
                                           alphas_argmax, axis)
            does_beat = backend.asarray(scores_mean > to_beat, dtype="float32")
            # add bias toward large alphas (scaled to be << 1)
            does_beat += backend.log(alphas)[:, None] * 1e-4
            alphas_argmax = backend.argmax(does_beat, axis)
    else:
        if conservative:
            raise NotImplementedError()
        else:
            alphas_argmax = backend.argmax(scores_mean.mean(1))
            alphas_argmax = backend.full_like(alphas_argmax,
                                              shape=scores_mean.shape[1],
                                              fill_value=alphas_argmax)
    best_scores_mean = backend.apply_argmax(scores_mean, alphas_argmax, axis)

    return alphas_argmax, best_scores_mean


[docs]def generate_dirichlet_samples(n_samples, n_kernels, concentration=[.1, 1.], random_state=None): """Generate samples from a Dirichlet distribution. Parameters ---------- n_samples : int Number of samples to generate. n_kernels : int Number of dimension of the distribution. concentration : float, or list of float Concentration parameters of the Dirichlet distribution. A value of 1 corresponds to uniform sampling over the simplex. A value of infinity corresponds to equal weights. If a list, samples cycle through the list. random_state : int, or None Random generator seed. Use an int for deterministic samples. Returns ------- gammas : array of shape (n_samples, n_kernels) Dirichlet samples. """ import numpy as np random_generator = check_random_state(random_state) concentration = np.atleast_1d(concentration) n_concentrations = len(concentration) n_samples_per_concentration = int( np.ceil(n_samples / float(n_concentrations))) # generate the gammas gammas = [] for conc in concentration: if conc == np.inf: gamma = np.full(n_kernels, fill_value=1. / n_kernels)[None] gamma = np.tile(gamma, (n_samples_per_concentration, 1)) else: gamma = random_generator.dirichlet([conc] * n_kernels, n_samples_per_concentration) gammas.append(gamma) gammas = np.vstack(gammas) # reorder the gammas to alternate between concentrations: # [a0, a1, a2, a0, a1, a2] instead of [a0, a0, a1, a1, a2, a2] gammas = gammas.reshape(n_concentrations, n_samples_per_concentration, n_kernels) gammas = np.swapaxes(gammas, 0, 1) gammas = gammas.reshape(n_concentrations * n_samples_per_concentration, n_kernels) # remove extra gammas gammas = gammas[:n_samples] # cast to current backend backend = get_backend() gammas = backend.asarray(gammas) return gammas
def _decompose_kernel_ridge(Ktrain, alphas, Ktest=None, n_alphas_batch=None, method="eigh", negative_eigenvalues="zeros"): """Precompute resolution matrices for kernel ridge predictions. To compute the prediction:: Ytest_hat = Ktest @ (Ktrain + alphas * Id)^-1 @ Ytrain this function precomputes:: matrices = Ktest @ (Ktrain + alphas * Id)^-1 or just:: matrices = (Ktrain + alphas * Id)^-1 if Ktest is None. Parameters ---------- Ktrain : array of shape (n_samples_train, n_samples_train) Training kernel for one feature space. alphas : float, or array of shape (n_alphas, ) Range of ridge regularization parameter. Ktest : array of shape (n_samples_test, n_samples_train) Testing kernel for one feature space. n_alphas_batch : int or None If not None, returns a generator over batches of alphas. method : str in {"eigh", "svd"} Method used to diagonalize the kernel. negative_eigenvalues : str in {"nan", "error", "zeros"} If the decomposition leads to negative eigenvalues (wrongly emerging from float32 errors): - "error" raises an error. - "zeros" remplaces them with zeros. - "nan" returns nans if the regularization does not compensate twice the smallest negative value, else it ignores the problem. Returns ------- matrices : array of shape (n_alphas, n_samples_train, n_samples_train) or \ (n_alphas, n_samples_test, n_samples_train) if Ktest is not None Precomputed resolution matrices. alpha_batch : slice Slice of the batch of alphas. """ backend = get_backend() use_alpha_batch = n_alphas_batch is not None if n_alphas_batch is None: n_alphas_batch = len(alphas) if method == "eigh": # diagonalization: K = V @ np.diag(eigenvalues) @ V.T eigenvalues, V = backend.eigh(Ktrain) # match SVD notations: K = U @ np.diag(eigenvalues) @ Vt U = V Vt = V.T elif method == "svd": # SVD: K = U @ np.diag(eigenvalues) @ Vt U, eigenvalues, Vt = backend.svd(Ktrain) else: raise ValueError("Unknown method=%r." % (method, )) if Ktest is not None: Ktest_V = backend.matmul(Ktest, Vt.T) for start in range(0, len(alphas), n_alphas_batch): batch = slice(start, start + n_alphas_batch) ev_weighting = (alphas[batch, None] + eigenvalues) ** -1 # negative eigenvalues can emerge from incorrect kernels, # or from float32 if eigenvalues[0] < 0: if negative_eigenvalues == "nan": ev_weighting[alphas[batch] < -eigenvalues[0] * 2, :] = backend.asarray(backend.nan, dtype=ev_weighting.dtype) elif negative_eigenvalues == "zeros": eigenvalues[eigenvalues < 0] = 0 elif negative_eigenvalues == "error": raise RuntimeError( "Negative eigenvalues. Make sure the kernel is positive " "semi-definite, increase the regularization alpha, or use" "another solver.") else: raise ValueError("Unknown negative_eigenvalues=%r." % (negative_eigenvalues, )) if Ktest is not None: matrices = backend.matmul(Ktest_V, ev_weighting[:, :, None] * U.T) else: matrices = backend.matmul(Vt.T, ev_weighting[:, :, None] * U.T) if use_alpha_batch: yield matrices, batch else: return matrices, batch del matrices
[docs]def solve_kernel_ridge_cv_eigenvalues(K, Y, alphas=1.0, score_func=l2_neg_loss, cv=5, fit_intercept=False, local_alpha=True, n_targets_batch=None, n_targets_batch_refit=None, n_alphas_batch=None, conservative=False, Y_in_cpu=False): """Solve kernel ridge regression with a grid search over alphas. Parameters ---------- K : array of shape (n_samples, n_samples) Input kernel. Y : array of shape (n_samples, n_targets) Target data. alphas : float or array of shape (n_alphas, ) Range of ridge regularization parameter. score_func : callable Function used to compute the score of predictions versus Y. cv : int or scikit-learn splitter Cross-validation splitter. If an int, KFold is used. fit_intercept : boolean Whether to fit an intercept. If False, K should be centered (see KernelCenterer), and Y must be zero-mean over samples. local_alpha : bool If True, alphas are selected per target, else shared over all targets. n_targets_batch : int or None Size of the batch for over targets during cross-validation. Used for memory reasons. If None, uses all n_targets at once. n_targets_batch_refit : int or None Size of the batch for over targets during refit. Used for memory reasons. If None, uses all n_targets at once. n_alphas_batch : int or None Size of the batch for over alphas. Used for memory reasons. If None, uses all n_alphas at once. conservative : bool If True, when selecting the hyperparameter alpha, take the largest one that is less than one standard deviation away from the best. If False, take the best. Y_in_cpu : bool If True, keep the target values ``Y`` in CPU memory (slower). Returns ------- best_alphas : array of shape (n_targets, ) Selected best hyperparameter alphas. dual_weights : array of shape (n_samples, n_targets) Kernel ridge coefficients refit on the entire dataset, using selected best hyperparameters alpha. Always stored on CPU memory. cv_scores : array of shape (n_targets, ) Cross-validation scores averaged over splits, for the best alpha. """ backend = get_backend() n_iter = backend.ones_like(K, shape=(1, 1)) fixed_params = dict(return_weights="dual", Xs=None, progress_bar=False, concentration=None, jitter_alphas=False, random_state=None, n_iter=n_iter) copied_params = dict(alphas=alphas, score_func=score_func, cv=cv, local_alpha=local_alpha, fit_intercept=fit_intercept, n_targets_batch=n_targets_batch, n_targets_batch_refit=n_targets_batch_refit, n_alphas_batch=n_alphas_batch, conservative=conservative, Y_in_cpu=Y_in_cpu) tmp = solve_multiple_kernel_ridge_random_search(K[None], Y, **copied_params, **fixed_params) if fit_intercept: deltas, dual_weights, cv_scores, intercept = tmp best_alphas = backend.exp(-deltas[0]) dual_weights /= backend.to_cpu(best_alphas) return best_alphas, dual_weights, cv_scores, intercept else: deltas, dual_weights, cv_scores = tmp best_alphas = backend.exp(-deltas[0]) dual_weights /= backend.to_cpu(best_alphas) return best_alphas, dual_weights, cv_scores