#!/usr/bin/env python3
from typing import Optional, Tuple
import torch
from .. import settings
from ..distributions import Delta, MultivariateNormal
from ..lazy import DiagLazyTensor, MatmulLazyTensor, SumLazyTensor, lazify
from ..module import Module
from ..utils import linear_cg
from ..utils.broadcasting import _mul_broadcast_shape
from ..utils.memoize import cached
from ._variational_strategy import _VariationalStrategy
from .natural_variational_distribution import NaturalVariationalDistribution
class _NgdInterpTerms(torch.autograd.Function):
"""
This function takes in
- the kernel interpolation term K_ZZ^{-1/2} k_ZX
- the natural parameters of the variational distribution
and returns
- the predictive distribution mean/covariance
- the inducing KL divergence KL( q(u) || p(u))
However, the gradients will be with respect to the **cannonical parameters**
of the variational distribution, rather than the **natural parameters**.
This corresponds to performing natural gradient descent on the variational distribution.
"""
@staticmethod
def forward(
ctx, interp_term: torch.Tensor, natural_vec: torch.Tensor, natural_mat: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
# Compute precision
prec = natural_mat.mul(-2.0)
diag = prec.diagonal(dim1=-1, dim2=-2).unsqueeze(-1)
# Make sure that interp_term and natural_vec are the same batch shape
batch_shape = _mul_broadcast_shape(interp_term.shape[:-2], natural_vec.shape[:-1])
expanded_interp_term = interp_term.expand(*batch_shape, *interp_term.shape[-2:])
expanded_natural_vec = natural_vec.expand(*batch_shape, natural_vec.size(-1))
# Compute necessary solves with the precision. We need
# m = expec_vec = S * natural_vec
# S K^{-1/2} k
solves = linear_cg(
prec.matmul,
torch.cat([expanded_natural_vec.unsqueeze(-1), expanded_interp_term], dim=-1),
n_tridiag=0,
max_iter=settings.max_cg_iterations.value(),
tolerance=min(settings.eval_cg_tolerance.value(), settings.cg_tolerance.value()),
max_tridiag_iter=settings.max_lanczos_quadrature_iterations.value(),
preconditioner=lambda x: x / diag,
)
expec_vec = solves[..., 0]
s_times_interp_term = solves[..., 1:]
# Compute the interpolated mean
# k^T K^{-1/2} m
interp_mean = (s_times_interp_term.transpose(-1, -2) @ natural_vec.unsqueeze(-1)).squeeze(-1)
# Compute the interpolated variance
# k^T K^{-1/2} S K^{-1/2} k = k^T K^{-1/2} (expec_mat - expec_vec expec_vec^T) K^{-1/2} k
interp_var = (s_times_interp_term * interp_term).sum(dim=-2)
# Let's not bother actually computing the KL-div in the foward pass
# 1/2 ( -log | S | + tr(S) + m^T m - len(m) )
# = 1/2 ( -log | expec_mat - expec_vec expec_vec^T | + tr(expec_mat) - len(m) )
kl_div = torch.zeros_like(interp_mean[..., 0])
# We're done!
ctx.save_for_backward(interp_term, s_times_interp_term, interp_mean, natural_vec, expec_vec, prec)
return interp_mean, interp_var, kl_div
@staticmethod
def backward(
ctx, interp_mean_grad: torch.Tensor, interp_var_grad: torch.Tensor, kl_div_grad: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
# Get the saved terms
interp_term, s_times_interp_term, interp_mean, natural_vec, expec_vec, prec = ctx.saved_tensors
# Expand data-depenedent gradients
interp_mean_grad = interp_mean_grad.unsqueeze(-2)
interp_var_grad = interp_var_grad.unsqueeze(-2)
# Compute gradient of interp term (K^{-1/2} k)
# interp_mean component: m
# interp_var component: S K^{-1/2} k
# kl component: 0
interp_term_grad = (interp_var_grad * s_times_interp_term).mul(2.0) + (
interp_mean_grad * expec_vec.unsqueeze(-1)
)
# Compute gradient of expected vector (m)
# interp_mean component: K^{-1/2} k
# interp_var component: (k^T K^{-1/2} m) K^{-1/2} k
# kl component: S^{-1} m
expec_vec_grad = sum(
[
(interp_var_grad * interp_mean.unsqueeze(-2) * interp_term).sum(dim=-1).mul(-2),
(interp_mean_grad * interp_term).sum(dim=-1),
(kl_div_grad.unsqueeze(-1) * natural_vec),
]
)
# Compute gradient of expected matrix (mm^T + S)
# interp_mean component: 0
# interp_var component: K^{-1/2} k k^T K^{-1/2}
# kl component: 1/2 ( I - S^{-1} )
eye = torch.eye(expec_vec.size(-1), device=expec_vec.device, dtype=expec_vec.dtype)
expec_mat_grad = torch.add(
(interp_var_grad * interp_term) @ interp_term.transpose(-1, -2),
(kl_div_grad.unsqueeze(-1).unsqueeze(-1) * (eye - prec).mul(0.5)),
)
# We're done!
return interp_term_grad, expec_vec_grad, expec_mat_grad, None # Extra "None" for the kwarg
[docs]class CiqVariationalStrategy(_VariationalStrategy):
r"""
Similar to :class:`~gpytorch.variational.VariationalStrategy`,
except the whitening operation is performed using Contour Integral Quadrature
rather than Cholesky (see `Pleiss et al. (2020)`_ for more info).
See the `CIQ-SVGP tutorial`_ for an example.
Contour Integral Quadrature uses iterative matrix-vector multiplication to approximate
the :math:`\mathbf K_{\mathbf Z \mathbf Z}^{-1/2}` matrix used for the whitening operation.
This can be more efficient than the standard variational strategy for large numbers
of inducing points (e.g. :math:`M > 1000`) or when the inducing points have structure
(e.g. they lie on an evenly-spaced grid).
.. note::
It is recommended that this object is used in conjunction with
:obj:`~gpytorch.variational.NaturalVariationalDistribution` and
`natural gradient descent`_.
:param ~gpytorch.models.ApproximateGP model: Model this strategy is applied to.
Typically passed in when the VariationalStrategy is created in the
__init__ method of the user defined model.
:param torch.Tensor inducing_points: Tensor containing a set of inducing
points to use for variational inference.
:param ~gpytorch.variational.VariationalDistribution variational_distribution: A
VariationalDistribution object that represents the form of the variational distribution :math:`q(\mathbf u)`
:param learn_inducing_locations: (Default True): Whether or not
the inducing point locations :math:`\mathbf Z` should be learned (i.e. are they
parameters of the model).
:type learn_inducing_locations: `bool`, optional
.. _Pleiss et al. (2020):
https://arxiv.org/pdf/2006.11267.pdf
.. _CIQ-SVGP tutorial:
examples/04_Variational_and_Approximate_GPs/SVGP_CIQ.html
.. _natural gradient descent:
examples/04_Variational_and_Approximate_GPs/Natural_Gradient_Descent.html
"""
def _ngd(self):
return isinstance(self._variational_distribution, NaturalVariationalDistribution)
@property
@cached(name="prior_distribution_memo")
def prior_distribution(self):
zeros = torch.zeros(
self._variational_distribution.shape(),
dtype=self._variational_distribution.dtype,
device=self._variational_distribution.device,
)
ones = torch.ones_like(zeros)
res = MultivariateNormal(zeros, DiagLazyTensor(ones))
return res
@property
@cached(name="variational_distribution_memo")
def variational_distribution(self):
if self._ngd():
raise RuntimeError(
"Variational distribution for NGD-CIQ should be computed during forward calls. "
"This is probably a bug in GPyTorch."
)
return super().variational_distribution
def forward(
self,
x: torch.Tensor,
inducing_points: torch.Tensor,
inducing_values: torch.Tensor,
variational_inducing_covar: Optional[MultivariateNormal] = None,
**kwargs,
) -> MultivariateNormal:
# Compute full prior distribution
full_inputs = torch.cat([inducing_points, x], dim=-2)
full_output = self.model.forward(full_inputs)
full_covar = full_output.lazy_covariance_matrix
# Covariance terms
num_induc = inducing_points.size(-2)
test_mean = full_output.mean[..., num_induc:]
induc_induc_covar = full_covar[..., :num_induc, :num_induc].evaluate_kernel().add_jitter(1e-2)
induc_data_covar = full_covar[..., :num_induc, num_induc:].evaluate()
data_data_covar = full_covar[..., num_induc:, num_induc:].add_jitter(1e-4)
# Compute interpolation terms
# K_XZ K_ZZ^{-1} \mu_z
# K_XZ K_ZZ^{-1/2} \mu_Z
with settings.max_preconditioner_size(0): # Turn off preconditioning for CIQ
interp_term = lazify(induc_induc_covar).sqrt_inv_matmul(induc_data_covar)
# Compute interpolated mean and variance terms
# We have separate computation rules for NGD versus standard GD
if self._ngd():
interp_mean, interp_var, kl_div = _NgdInterpTerms.apply(
interp_term,
self._variational_distribution.natural_vec,
self._variational_distribution.natural_mat,
)
# Compute the covariance of q(f)
predictive_var = data_data_covar.diag() - interp_term.pow(2).sum(dim=-2) + interp_var
predictive_var = torch.clamp_min(predictive_var, settings.min_variance.value(predictive_var.dtype))
predictive_covar = DiagLazyTensor(predictive_var)
# Also compute and cache the KL divergence
if not hasattr(self, "_memoize_cache"):
self._memoize_cache = dict()
self._memoize_cache["kl"] = kl_div
else:
# Compute interpolated mean term
interp_mean = torch.matmul(
interp_term.transpose(-1, -2), (inducing_values - self.prior_distribution.mean).unsqueeze(-1)
).squeeze(-1)
# Compute the covariance of q(f)
middle_term = self.prior_distribution.lazy_covariance_matrix.mul(-1)
if variational_inducing_covar is not None:
middle_term = SumLazyTensor(variational_inducing_covar, middle_term)
predictive_covar = SumLazyTensor(
data_data_covar.add_jitter(1e-4),
MatmulLazyTensor(interp_term.transpose(-1, -2), middle_term @ interp_term),
)
# Compute the mean of q(f)
# k_XZ K_ZZ^{-1/2} (m - K_ZZ^{-1/2} \mu_Z) + \mu_X
predictive_mean = interp_mean + test_mean
# Return the distribution
return MultivariateNormal(predictive_mean, predictive_covar)
[docs] def kl_divergence(self):
"""
Compute the KL divergence between the variational inducing distribution :math:`q(\mathbf u)`
and the prior inducing distribution :math:`p(\mathbf u)`.
:rtype: torch.Tensor
"""
if self._ngd():
if hasattr(self, "_memoize_cache") and "kl" in self._memoize_cache:
return self._memoize_cache["kl"]
else:
raise RuntimeError(
"KL divergence for NGD-CIQ should be computed during forward calls."
"This is probably a bug in GPyTorch."
)
else:
return super().kl_divergence()
def __call__(self, x: torch.Tensor, prior: bool = False, **kwargs) -> MultivariateNormal:
# This is mostly the same as _VariationalStrategy.__call__()
# but with special rules for natural gradient descent (to prevent O(M^3) computation)
# If we're in prior mode, then we're done!
if prior:
return self.model.forward(x)
# Delete previously cached items from the training distribution
if self.training:
self._clear_cache()
# (Maybe) initialize variational distribution
if not self.variational_params_initialized.item():
if self._ngd():
noise = torch.randn_like(self.prior_distribution.mean).mul_(1e-3)
eye = torch.eye(noise.size(-1), dtype=noise.dtype, device=noise.device).mul(-0.5)
self._variational_distribution.natural_vec.data.copy_(noise)
self._variational_distribution.natural_mat.data.copy_(eye)
self.variational_params_initialized.fill_(1)
else:
prior_dist = self.prior_distribution
self._variational_distribution.initialize_variational_distribution(prior_dist)
self.variational_params_initialized.fill_(1)
# Ensure inducing_points and x are the same size
inducing_points = self.inducing_points
if inducing_points.shape[:-2] != x.shape[:-2]:
x, inducing_points = self._expand_inputs(x, inducing_points)
# Get q(f)
if self._ngd():
return Module.__call__(
self,
x,
inducing_points,
inducing_values=None,
variational_inducing_covar=None,
**kwargs,
)
else:
# Get p(u)/q(u)
variational_dist_u = self.variational_distribution
if isinstance(variational_dist_u, MultivariateNormal):
return Module.__call__(
self,
x,
inducing_points,
inducing_values=variational_dist_u.mean,
variational_inducing_covar=variational_dist_u.lazy_covariance_matrix,
**kwargs,
)
elif isinstance(variational_dist_u, Delta):
return Module.__call__(
self,
x,
inducing_points,
inducing_values=variational_dist_u.mean,
variational_inducing_covar=None,
ngd=False,
**kwargs,
)
else:
raise RuntimeError(
f"Invalid variational distribuition ({type(variational_dist_u)}). "
"Expected a multivariate normal or a delta distribution."
)