Source code for gpytorch.distributions.multitask_multivariate_normal

#!/usr/bin/env python3

import torch
from linear_operator import LinearOperator, to_linear_operator
from linear_operator.operators import (
    BlockDiagLinearOperator,
    BlockInterleavedLinearOperator,
    CatLinearOperator,
    DiagLinearOperator,
)

from .multivariate_normal import MultivariateNormal


[docs]class MultitaskMultivariateNormal(MultivariateNormal): """ Constructs a multi-output multivariate Normal random variable, based on mean and covariance Can be multi-output multivariate, or a batch of multi-output multivariate Normal Passing a matrix mean corresponds to a multi-output multivariate Normal Passing a matrix mean corresponds to a batch of multivariate Normals :param torch.Tensor mean: An `n x t` or batch `b x n x t` matrix of means for the MVN distribution. :param ~linear_operator.operators.LinearOperator covar: An `... x NT x NT` (batch) matrix. covariance matrix of MVN distribution. :param bool validate_args: (default=False) If True, validate `mean` and `covariance_matrix` arguments. :param bool interleaved: (default=True) If True, covariance matrix is interpreted as block-diagonal w.r.t. inter-task covariances for each observation. If False, it is interpreted as block-diagonal w.r.t. inter-observation covariance for each task. """ def __init__(self, mean, covariance_matrix, validate_args=False, interleaved=True): if not torch.is_tensor(mean) and not isinstance(mean, LinearOperator): raise RuntimeError("The mean of a MultitaskMultivariateNormal must be a Tensor or LinearOperator") if not torch.is_tensor(covariance_matrix) and not isinstance(covariance_matrix, LinearOperator): raise RuntimeError("The covariance of a MultitaskMultivariateNormal must be a Tensor or LinearOperator") if mean.dim() < 2: raise RuntimeError("mean should be a matrix or a batch matrix (batch mode)") # Ensure that shapes are broadcasted appropriately across the mean and covariance # Means can have singleton dimensions for either the `n` or `t` dimensions batch_shape = torch.broadcast_shapes(mean.shape[:-2], covariance_matrix.shape[:-2]) if mean.shape[-2:].numel() != covariance_matrix.size(-1): if covariance_matrix.size(-1) % mean.shape[-2:].numel(): raise RuntimeError( f"mean shape {mean.shape} is incompatible with covariance shape {covariance_matrix.shape}" ) elif mean.size(-2) == 1: mean = mean.expand(*batch_shape, covariance_matrix.size(-1) // mean.size(-1), mean.size(-1)) elif mean.size(-1) == 1: mean = mean.expand(*batch_shape, mean.size(-2), covariance_matrix.size(-2) // mean.size(-2)) else: raise RuntimeError( f"mean shape {mean.shape} is incompatible with covariance shape {covariance_matrix.shape}" ) else: mean = mean.expand(*batch_shape, *mean.shape[-2:]) self._output_shape = mean.shape # TODO: Instead of transpose / view operations, use a PermutationLinearOperator (see #539) # to handle interleaving self._interleaved = interleaved if self._interleaved: mean_mvn = mean.reshape(*mean.shape[:-2], -1) else: mean_mvn = mean.transpose(-1, -2).reshape(*mean.shape[:-2], -1) super().__init__(mean=mean_mvn, covariance_matrix=covariance_matrix, validate_args=validate_args) @property def base_sample_shape(self): """ Returns the shape of a base sample (without batching) that is used to generate a single sample. """ base_sample_shape = self.event_shape return base_sample_shape @property def event_shape(self): return self._output_shape[-2:]
[docs] @classmethod def from_batch_mvn(cls, batch_mvn, task_dim=-1): """ Reinterprate a batch of multivariate normal distributions as an (independent) multitask multivariate normal distribution. :param ~gpytorch.distributions.MultivariateNormal batch_mvn: The base MVN distribution. (This distribution should have at least one batch dimension). :param int task_dim: Which batch dimension should be interpreted as the dimension for the independent tasks. :returns: the independent multitask distribution :rtype: gpytorch.distributions.MultitaskMultivariateNormal Example: >>> # model is a gpytorch.models.VariationalGP >>> # likelihood is a gpytorch.likelihoods.Likelihood >>> mean = torch.randn(4, 2, 3) >>> covar_factor = torch.randn(4, 2, 3, 3) >>> covar = covar_factor @ covar_factor.transpose(-1, -2) >>> mvn = gpytorch.distributions.MultivariateNormal(mean, covar) >>> print(mvn.event_shape, mvn.batch_shape) >>> # torch.Size([3]), torch.Size([4, 2]) >>> >>> mmvn = MultitaskMultivariateNormal.from_batch_mvn(mvn, task_dim=-1) >>> print(mmvn.event_shape, mmvn.batch_shape) >>> # torch.Size([3, 2]), torch.Size([4]) """ orig_task_dim = task_dim task_dim = task_dim if task_dim >= 0 else (len(batch_mvn.batch_shape) + task_dim) if task_dim < 0 or task_dim > len(batch_mvn.batch_shape): raise ValueError( f"task_dim of {orig_task_dim} is incompatible with MVN batch shape of {batch_mvn.batch_shape}" ) num_dim = batch_mvn.mean.dim() res = cls( mean=batch_mvn.mean.permute(*range(0, task_dim), *range(task_dim + 1, num_dim), task_dim), covariance_matrix=BlockInterleavedLinearOperator(batch_mvn.lazy_covariance_matrix, block_dim=task_dim), ) return res
[docs] @classmethod def from_independent_mvns(cls, mvns): """ Convert an iterable of MVNs into a :obj:`~gpytorch.distributions.MultitaskMultivariateNormal`. The resulting distribution will have ``len(mvns)`` tasks, and the tasks will be independent. :param ~gpytorch.distributions.MultitaskNormal mvn: The base MVN distributions. :returns: the independent multitask distribution :rtype: gpytorch.distributions.MultitaskMultivariateNormal Example: >>> # model is a gpytorch.models.VariationalGP >>> # likelihood is a gpytorch.likelihoods.Likelihood >>> mean = torch.randn(4, 3) >>> covar_factor = torch.randn(4, 3, 3) >>> covar = covar_factor @ covar_factor.transpose(-1, -2) >>> mvn1 = gpytorch.distributions.MultivariateNormal(mean, covar) >>> >>> mean = torch.randn(4, 3) >>> covar_factor = torch.randn(4, 3, 3) >>> covar = covar_factor @ covar_factor.transpose(-1, -2) >>> mvn2 = gpytorch.distributions.MultivariateNormal(mean, covar) >>> >>> mmvn = MultitaskMultivariateNormal.from_independent_mvns([mvn1, mvn2]) >>> print(mmvn.event_shape, mmvn.batch_shape) >>> # torch.Size([3, 2]), torch.Size([4]) """ if len(mvns) < 2: raise ValueError("Must provide at least 2 MVNs to form a MultitaskMultivariateNormal") if any(isinstance(mvn, MultitaskMultivariateNormal) for mvn in mvns): raise ValueError("Cannot accept MultitaskMultivariateNormals") if not all(m.batch_shape == mvns[0].batch_shape for m in mvns[1:]): raise ValueError("All MultivariateNormals must have the same batch shape") if not all(m.event_shape == mvns[0].event_shape for m in mvns[1:]): raise ValueError("All MultivariateNormals must have the same event shape") mean = torch.stack([mvn.mean for mvn in mvns], -1) # TODO: To do the following efficiently, we don't want to evaluate the # covariance matrices. Instead, we want to use the lazies directly in the # BlockDiagLinearOperator. This will require implementing a new BatchLinearOperator: # https://github.com/cornellius-gp/gpytorch/issues/468 covar_blocks_lazy = CatLinearOperator( *[mvn.lazy_covariance_matrix.unsqueeze(0) for mvn in mvns], dim=0, output_device=mean.device ) covar_lazy = BlockDiagLinearOperator(covar_blocks_lazy, block_dim=0) return cls(mean=mean, covariance_matrix=covar_lazy, interleaved=False)
[docs] @classmethod def from_repeated_mvn(cls, mvn, num_tasks): """ Convert a single MVN into a :obj:`~gpytorch.distributions.MultitaskMultivariateNormal`, where each task shares the same mean and covariance. :param ~gpytorch.distributions.MultitaskNormal mvn: The base MVN distribution. :param int num_tasks: How many tasks to create. :returns: the independent multitask distribution :rtype: gpytorch.distributions.MultitaskMultivariateNormal Example: >>> # model is a gpytorch.models.VariationalGP >>> # likelihood is a gpytorch.likelihoods.Likelihood >>> mean = torch.randn(4, 3) >>> covar_factor = torch.randn(4, 3, 3) >>> covar = covar_factor @ covar_factor.transpose(-1, -2) >>> mvn = gpytorch.distributions.MultivariateNormal(mean, covar) >>> print(mvn.event_shape, mvn.batch_shape) >>> # torch.Size([3]), torch.Size([4]) >>> >>> mmvn = MultitaskMultivariateNormal.from_repeated_mvn(mvn, num_tasks=2) >>> print(mmvn.event_shape, mmvn.batch_shape) >>> # torch.Size([3, 2]), torch.Size([4]) """ return cls.from_batch_mvn(mvn.expand(torch.Size([num_tasks]) + mvn.batch_shape), task_dim=0)
def expand(self, batch_size): new_mean = self.mean.expand(torch.Size(batch_size) + self.mean.shape[-2:]) new_covar = self._covar.expand(torch.Size(batch_size) + self._covar.shape[-2:]) res = self.__class__(new_mean, new_covar, interleaved=self._interleaved) return res def get_base_samples(self, sample_shape=torch.Size()): base_samples = super().get_base_samples(sample_shape) if not self._interleaved: # flip shape of last two dimensions new_shape = sample_shape + self._output_shape[:-2] + self._output_shape[:-3:-1] return base_samples.view(new_shape).transpose(-1, -2).contiguous() return base_samples.view(*sample_shape, *self._output_shape) def log_prob(self, value): if not self._interleaved: # flip shape of last two dimensions new_shape = value.shape[:-2] + value.shape[:-3:-1] value = value.view(new_shape).transpose(-1, -2).contiguous() return super().log_prob(value.reshape(*value.shape[:-2], -1)) @property def mean(self): mean = super().mean if not self._interleaved: # flip shape of last two dimensions new_shape = self._output_shape[:-2] + self._output_shape[:-3:-1] return mean.view(new_shape).transpose(-1, -2).contiguous() return mean.view(self._output_shape) @property def num_tasks(self): return self._output_shape[-1] def rsample(self, sample_shape=torch.Size(), base_samples=None): if base_samples is not None: # Make sure that the base samples agree with the distribution mean_shape = self.mean.shape base_sample_shape = base_samples.shape[-self.mean.ndimension() :] if mean_shape != base_sample_shape: raise RuntimeError( "The shape of base_samples (minus sample shape dimensions) should agree with the shape " "of self.mean. Expected ...{} but got {}".format(mean_shape, base_sample_shape) ) sample_shape = base_samples.shape[: -self.mean.ndimension()] base_samples = base_samples.view(*sample_shape, *self.loc.shape) samples = super().rsample(sample_shape=sample_shape, base_samples=base_samples) if not self._interleaved: # flip shape of last two dimensions new_shape = sample_shape + self._output_shape[:-2] + self._output_shape[:-3:-1] return samples.view(new_shape).transpose(-1, -2).contiguous() return samples.view(sample_shape + self._output_shape)
[docs] def to_data_independent_dist(self, jitter_val=1e-4): """ Convert a multitask MVN into a batched (non-multitask) MVNs The result retains the intertask covariances, but gets rid of the inter-data covariances. The resulting distribution will have ``len(mvns)`` tasks, and the tasks will be independent. :returns: the bached data-independent MVN :rtype: gpytorch.distributions.MultivariateNormal """ # Create batch distribution where all data are independent, but the tasks are dependent full_covar = self.lazy_covariance_matrix num_data, num_tasks = self.mean.shape[-2:] if self._interleaved: data_indices = torch.arange(0, num_data * num_tasks, num_tasks, device=full_covar.device).view(-1, 1, 1) task_indices = torch.arange(num_tasks, device=full_covar.device) else: data_indices = torch.arange(num_data, device=full_covar.device).view(-1, 1, 1) task_indices = torch.arange(0, num_data * num_tasks, num_data, device=full_covar.device) task_covars = full_covar[ ..., data_indices + task_indices.unsqueeze(-2), data_indices + task_indices.unsqueeze(-1) ] return MultivariateNormal(self.mean, to_linear_operator(task_covars).add_jitter(jitter_val=jitter_val))
@property def variance(self): var = super().variance if not self._interleaved: # flip shape of last two dimensions new_shape = self._output_shape[:-2] + self._output_shape[:-3:-1] return var.view(new_shape).transpose(-1, -2).contiguous() return var.view(self._output_shape)
[docs] def __getitem__(self, idx) -> MultivariateNormal: """ Constructs a new MultivariateNormal that represents a random variable modified by an indexing operation. The mean and covariance matrix arguments are indexed accordingly. :param Any idx: Index to apply to the mean. The covariance matrix is indexed accordingly. :returns: If indices specify a slice for samples and tasks, returns a MultitaskMultivariateNormal, else returns a MultivariateNormal. """ # Normalize index to a tuple if not isinstance(idx, tuple): idx = (idx,) if ... in idx: # Replace ellipsis '...' with explicit indices ellipsis_location = idx.index(...) if ... in idx[ellipsis_location + 1 :]: raise IndexError("Only one ellipsis '...' is supported!") prefix = idx[:ellipsis_location] suffix = idx[ellipsis_location + 1 :] infix_length = self.mean.dim() - len(prefix) - len(suffix) if infix_length < 0: raise IndexError(f"Index {idx} has too many dimensions") idx = prefix + (slice(None),) * infix_length + suffix elif len(idx) == self.mean.dim() - 1: # Normalize indices ignoring the task-index to include it idx = idx + (slice(None),) new_mean = self.mean[idx] # We now create a covariance matrix appropriate for new_mean if len(idx) <= self.mean.dim() - 2: # We are only indexing the batch dimensions in this case return MultitaskMultivariateNormal( mean=new_mean, covariance_matrix=self.lazy_covariance_matrix[idx], interleaved=self._interleaved, ) elif len(idx) > self.mean.dim(): raise IndexError(f"Index {idx} has too many dimensions") else: # We have an index that extends over all dimensions batch_idx = idx[:-2] if self._interleaved: row_idx = idx[-2] col_idx = idx[-1] num_rows = self._output_shape[-2] num_cols = self._output_shape[-1] else: row_idx = idx[-1] col_idx = idx[-2] num_rows = self._output_shape[-1] num_cols = self._output_shape[-2] if isinstance(row_idx, int) and isinstance(col_idx, int): # Single sample with single task row_idx = _normalize_index(row_idx, num_rows) col_idx = _normalize_index(col_idx, num_cols) new_cov = DiagLinearOperator( self.lazy_covariance_matrix.diagonal()[batch_idx + (row_idx * num_cols + col_idx,)] ) return MultivariateNormal(mean=new_mean, covariance_matrix=new_cov) elif isinstance(row_idx, int) and isinstance(col_idx, slice): # A block of the covariance matrix row_idx = _normalize_index(row_idx, num_rows) col_idx = _normalize_slice(col_idx, num_cols) new_slice = slice( col_idx.start + row_idx * num_cols, col_idx.stop + row_idx * num_cols, col_idx.step, ) new_cov = self.lazy_covariance_matrix[batch_idx + (new_slice, new_slice)] return MultivariateNormal(mean=new_mean, covariance_matrix=new_cov) elif isinstance(row_idx, slice) and isinstance(col_idx, int): # A block of the reversely interleaved covariance matrix row_idx = _normalize_slice(row_idx, num_rows) col_idx = _normalize_index(col_idx, num_cols) new_slice = slice(row_idx.start + col_idx, row_idx.stop * num_cols + col_idx, row_idx.step * num_cols) new_cov = self.lazy_covariance_matrix[batch_idx + (new_slice, new_slice)] return MultivariateNormal(mean=new_mean, covariance_matrix=new_cov) elif ( isinstance(row_idx, slice) and isinstance(col_idx, slice) and row_idx == col_idx == slice(None, None, None) ): new_cov = self.lazy_covariance_matrix[batch_idx] return MultitaskMultivariateNormal( mean=new_mean, covariance_matrix=new_cov, interleaved=self._interleaved, validate_args=False, ) elif isinstance(row_idx, slice) or isinstance(col_idx, slice): # slice x slice or indices x slice or slice x indices if isinstance(row_idx, slice): row_idx = torch.arange(num_rows)[row_idx] if isinstance(col_idx, slice): col_idx = torch.arange(num_cols)[col_idx] row_grid, col_grid = torch.meshgrid(row_idx, col_idx, indexing="ij") indices = (row_grid * num_cols + col_grid).reshape(-1) new_cov = self.lazy_covariance_matrix[batch_idx + (indices,)][..., indices] return MultitaskMultivariateNormal( mean=new_mean, covariance_matrix=new_cov, interleaved=self._interleaved, validate_args=False ) else: # row_idx and col_idx have pairs of indices indices = row_idx * num_cols + col_idx new_cov = self.lazy_covariance_matrix[batch_idx + (indices,)][..., indices] return MultivariateNormal( mean=new_mean, covariance_matrix=new_cov, )
def __repr__(self) -> str: return f"MultitaskMultivariateNormal(mean shape: {self._output_shape})"
def _normalize_index(i: int, dim_size: int) -> int: if i < 0: return dim_size + i else: return i def _normalize_slice(s: slice, dim_size: int) -> slice: start = s.start if start is None: start = 0 elif start < 0: start = dim_size + start stop = s.stop if stop is None: stop = dim_size elif stop < 0: stop = dim_size + stop step = s.step if step is None: step = 1 return slice(start, stop, step)