Source code for gpytorch.variational.lmc_variational_strategy

#!/usr/bin/env python3

from typing import Optional, Union

import torch
from linear_operator.operators import KroneckerProductLinearOperator, RootLinearOperator
from linear_operator.utils.interpolation import left_interp
from torch import LongTensor, Tensor

from .. import settings
from ..distributions import MultitaskMultivariateNormal, MultivariateNormal
from ..module import Module
from ._variational_strategy import _VariationalStrategy

def _select_lmc_coefficients(lmc_coefficients: torch.Tensor, indices: torch.LongTensor) -> torch.Tensor:
    Given a list of indices for ... x N datapoints,
      select the row from lmc_coefficient that corresponds to each datapoint

    lmc_coefficients: torch.Tensor ... x num_latents x ... x num_tasks
    indices: torch.Tesnor ... x N
    batch_shape = torch.broadcast_shapes(lmc_coefficients.shape[:-1], indices.shape[:-1])

    # We will use the left_interp helper to do the indexing
    lmc_coefficients = lmc_coefficients.expand(*batch_shape, lmc_coefficients.shape[-1])[..., None]
    indices = indices.expand(*batch_shape, indices.shape[-1])[..., None]
    res = left_interp(
        torch.ones(indices.shape, dtype=torch.long, device=indices.device),
    return res

[docs]class LMCVariationalStrategy(_VariationalStrategy): r""" LMCVariationalStrategy is an implementation of the "Linear Model of Coregionalization" for multitask GPs. This model assumes that there are :math:`Q` latent functions :math:`\mathbf g(\cdot) = [g^{(1)}(\cdot), \ldots, g^{(q)}(\cdot)]`, each of which is modelled by a GP. The output functions (tasks) are linear combination of the latent functions: .. math:: f_{\text{task } i}( \mathbf x) = \sum_{q=1}^Q a_i^{(q)} g^{(q)} ( \mathbf x ) LMCVariationalStrategy wraps an existing :obj:`~gpytorch.variational.VariationalStrategy`. The output will either be a :obj:`~gpytorch.distributions.MultitaskMultivariateNormal` distribution (if we wish to evaluate all tasks for each input) or a :obj:`~gpytorch.distributions.MultivariateNormal` (if we wish to evaluate a single task for each input). The base variational strategy is assumed to operate on a multi-batch of GPs, where one of the batch dimensions corresponds to the latent function dimension. .. note:: The batch shape of the base :obj:`~gpytorch.variational.VariationalStrategy` does not necessarily have to correspond to the batch shape of the underlying GP objects. For example, if the base variational strategy has a batch shape of `[3]` (corresponding to 3 latent functions), the GP kernel object could have a batch shape of `[3]` or no batch shape. This would correspond to each of the latent functions having different kernels or the same kernel, respectivly. Example: >>> class LMCMultitaskGP(gpytorch.models.ApproximateGP): >>> ''' >>> 3 latent functions >>> 5 output dimensions (tasks) >>> ''' >>> def __init__(self): >>> # Each latent function shares the same inducing points >>> # We'll have 32 inducing points, and let's assume the input dimensionality is 2 >>> inducing_points = torch.randn(32, 2) >>> >>> # The variational parameters have a batch_shape of [3] - for 3 latent functions >>> variational_distribution = gpytorch.variational.MeanFieldVariationalDistribution( >>> inducing_points.size(-1), batch_shape=torch.Size([3]), >>> ) >>> variational_strategy = gpytorch.variational.LMCVariationalStrategy( >>> gpytorch.variational.VariationalStrategy( >>> self, inducing_points, variational_distribution, learn_inducing_locations=True, >>> ), >>> num_tasks=5, >>> num_latents=3, >>> latent_dim=-1, >>> ) >>> >>> # Each latent function has its own mean/kernel function >>> super().__init__(variational_strategy) >>> self.mean_module = gpytorch.means.ConstantMean(batch_shape=torch.Size([3])) >>> self.covar_module = gpytorch.kernels.ScaleKernel( >>> gpytorch.kernels.RBFKernel(batch_shape=torch.Size([3])), >>> batch_shape=torch.Size([3]), >>> ) >>> :param base_variational_strategy: Base variational strategy :param num_tasks: The total number of tasks (output functions) :param num_latents: The total number of latent functions in each group :param latent_dim: (Default: -1) Which batch dimension corresponds to the latent function batch. **Must be negative indexed** :param jitter_val: Amount of diagonal jitter to add for Cholesky factorization numerical stability """ def __init__( self, base_variational_strategy: _VariationalStrategy, num_tasks: int, num_latents: int = 1, latent_dim: int = -1, jitter_val: Optional[float] = None, ): Module.__init__(self) self.base_variational_strategy = base_variational_strategy self.num_tasks = num_tasks batch_shape = self.base_variational_strategy._variational_distribution.batch_shape # Check if no functions if latent_dim >= 0: raise RuntimeError(f"latent_dim must be a negative indexed batch dimension: got {latent_dim}.") if not (batch_shape[latent_dim] == num_latents or batch_shape[latent_dim] == 1): raise RuntimeError( f"Mismatch in num_latents: got a variational distribution of batch shape {batch_shape}, " f"expected the function dim {latent_dim} to be {num_latents}." ) self.num_latents = num_latents self.latent_dim = latent_dim # Make the batch_shape self.batch_shape = list(batch_shape) del self.batch_shape[self.latent_dim] self.batch_shape = torch.Size(self.batch_shape) # LCM coefficients lmc_coefficients = torch.randn(*batch_shape, self.num_tasks) self.register_parameter("lmc_coefficients", torch.nn.Parameter(lmc_coefficients)) if jitter_val is None: self.jitter_val = settings.variational_cholesky_jitter.value( self.base_variational_strategy.inducing_points.dtype ) else: self.jitter_val = jitter_val @property def prior_distribution(self) -> MultivariateNormal: return self.base_variational_strategy.prior_distribution @property def variational_distribution(self) -> MultivariateNormal: return self.base_variational_strategy.variational_distribution @property def variational_params_initialized(self) -> bool: return self.base_variational_strategy.variational_params_initialized def kl_divergence(self) -> Tensor: return super().kl_divergence().sum(dim=self.latent_dim)
[docs] def __call__( self, x: Tensor, prior: bool = False, task_indices: Optional[LongTensor] = None, **kwargs ) -> Union[MultitaskMultivariateNormal, MultivariateNormal]: r""" Computes the variational (or prior) distribution :math:`q( \mathbf f \mid \mathbf X)` (or :math:`p( \mathbf f \mid \mathbf X)`). There are two modes: 1. Compute **all tasks** for all inputs. If this is the case, the task_indices attribute should be None. The return type will be a (... x N x num_tasks) :class:`~gpytorch.distributions.MultitaskMultivariateNormal`. 2. Compute **one task** per inputs. If this is the case, the (... x N) task_indices tensor should contain the indices of each input's assigned task. The return type will be a (... x N) :class:`~gpytorch.distributions.MultivariateNormal`. :param x: (... x N x D) Input locations to evaluate variational strategy :param task_indices: (Default: None) Task index associated with each input. If this **is not** provided, then the returned distribution evaluates every input on every task (returns :class:`~gpytorch.distributions.MultitaskMultivariateNormal`). If this **is** provided, then the returned distribution evaluates each input only on its assigned task. (returns :class:`~gpytorch.distributions.MultivariateNormal`). :param prior: (Default: False) If False, returns the variational distribution :math:`q( \mathbf f \mid \mathbf X)`. If True, returns the prior distribution :math:`p( \mathbf f \mid \mathbf X)`. :return: :math:`q( \mathbf f \mid \mathbf X)` (or the prior), either for all tasks (if `task_indices == None`) or for a specific task (if `task_indices != None`). :rtype: ~gpytorch.distributions.MultitaskMultivariateNormal (... x N x num_tasks) or ~gpytorch.distributions.MultivariateNormal (... x N) """ latent_dist = self.base_variational_strategy(x, prior=prior, **kwargs) num_batch = len(latent_dist.batch_shape) latent_dim = num_batch + self.latent_dim if task_indices is None: num_dim = num_batch + len(latent_dist.event_shape) # Every data point will get an output for each task # Therefore, we will set up the lmc_coefficients shape for a matmul lmc_coefficients = self.lmc_coefficients.expand(*latent_dist.batch_shape, self.lmc_coefficients.size(-1)) # Mean: ... x N x num_tasks latent_mean = latent_dist.mean.permute(*range(0, latent_dim), *range(latent_dim + 1, num_dim), latent_dim) mean = latent_mean @ lmc_coefficients.permute( *range(0, latent_dim), *range(latent_dim + 1, num_dim - 1), latent_dim, -1 ) # Covar: ... x (N x num_tasks) x (N x num_tasks) latent_covar = latent_dist.lazy_covariance_matrix lmc_factor = RootLinearOperator(lmc_coefficients.unsqueeze(-1)) covar = KroneckerProductLinearOperator(latent_covar, lmc_factor).sum(latent_dim) # Add a bit of jitter to make the covar PD covar = covar.add_jitter(self.jitter_val) # Done! function_dist = MultitaskMultivariateNormal(mean, covar) else: # Each data point will get a single output corresponding to a single task # Therefore, we will select the appropriate lmc coefficients for each task lmc_coefficients = _select_lmc_coefficients(self.lmc_coefficients, task_indices) # Mean: ... x N mean = (latent_dist.mean * lmc_coefficients).sum(latent_dim) # Covar: ... x N x N latent_covar = latent_dist.lazy_covariance_matrix lmc_factor = RootLinearOperator(lmc_coefficients.unsqueeze(-1)) covar = (latent_covar * lmc_factor).sum(latent_dim) # Add a bit of jitter to make the covar PD covar = covar.add_jitter(self.jitter_val) # Done! function_dist = MultivariateNormal(mean, covar) return function_dist