diff --git a/src/probnum/quad/integration_measures/__init__.py b/src/probnum/quad/integration_measures/__init__.py index a23a65c93..2ae11a905 100644 --- a/src/probnum/quad/integration_measures/__init__.py +++ b/src/probnum/quad/integration_measures/__init__.py @@ -1,6 +1,8 @@ """Integration measures for Bayesian quadrature methods.""" -from ._integration_measures import GaussianMeasure, IntegrationMeasure, LebesgueMeasure +from ._gaussian_measure import GaussianMeasure +from ._integration_measure import IntegrationMeasure +from ._lebesgue_measure import LebesgueMeasure # Public classes and functions. Order is reflected in documentation. __all__ = [ diff --git a/src/probnum/quad/integration_measures/_gaussian_measure.py b/src/probnum/quad/integration_measures/_gaussian_measure.py new file mode 100644 index 000000000..a7459fe1b --- /dev/null +++ b/src/probnum/quad/integration_measures/_gaussian_measure.py @@ -0,0 +1,76 @@ +"""The Gaussian measure.""" + + +from __future__ import annotations + +from typing import Optional, Union + +import numpy as np + +from probnum.randvars import Normal +from probnum.typing import IntLike + +from ._integration_measure import IntegrationMeasure + + +# pylint: disable=too-few-public-methods +class GaussianMeasure(IntegrationMeasure): + """Gaussian measure on Euclidean space with given mean and covariance. + + If ``mean`` and ``cov`` are scalars but ``input_dim`` is larger than one, ``mean`` + and ``cov`` are extended to a constant vector and diagonal matrix, respectively, + of appropriate dimensions. + + Parameters + ---------- + mean + *shape=(input_dim,)* -- Mean of the Gaussian measure. + cov + *shape=(input_dim, input_dim)* -- Covariance matrix of the Gaussian measure. + input_dim + Dimension of the integration domain. + """ + + def __init__( + self, + mean: Union[float, np.floating, np.ndarray], + cov: Union[float, np.floating, np.ndarray], + input_dim: Optional[IntLike] = None, + ) -> None: + + # Extend scalar mean and covariance to higher dimensions if input_dim has been + # supplied by the user + if ( + (np.isscalar(mean) or mean.size == 1) + and (np.isscalar(cov) or cov.size == 1) + and input_dim is not None + ): + mean = np.full((input_dim,), mean) + cov = cov * np.eye(input_dim) + + # Set dimension based on the mean vector + if np.isscalar(mean): + input_dim = 1 + else: + input_dim = mean.size + + # Set domain as whole R^n + domain = (np.full((input_dim,), -np.Inf), np.full((input_dim,), np.Inf)) + super().__init__(input_dim=input_dim, domain=domain) + + # Exploit random variables to carry out mean and covariance checks + # squeezes are needed due to the way random variables are currently implemented + # pylint: disable=no-member + self.random_variable = Normal(mean=np.squeeze(mean), cov=np.squeeze(cov)) + self.mean = np.reshape(self.random_variable.mean, (self.input_dim,)) + self.cov = np.reshape( + self.random_variable.cov, (self.input_dim, self.input_dim) + ) + + # Set diagonal_covariance flag + if input_dim == 1: + self.diagonal_covariance = True + else: + self.diagonal_covariance = ( + np.count_nonzero(self.cov - np.diag(np.diagonal(self.cov))) == 0 + ) diff --git a/src/probnum/quad/integration_measures/_integration_measure.py b/src/probnum/quad/integration_measures/_integration_measure.py new file mode 100644 index 000000000..2bf8c0410 --- /dev/null +++ b/src/probnum/quad/integration_measures/_integration_measure.py @@ -0,0 +1,77 @@ +"""Base class of an integration measure for Bayesian quadrature.""" + +from __future__ import annotations + +import abc +from typing import Optional, Union + +import numpy as np + +from probnum.quad._utils import as_domain +from probnum.quad.typing import DomainLike +from probnum.typing import FloatLike, IntLike + + +class IntegrationMeasure(abc.ABC): + """An abstract class for a measure against which a target function is integrated. + + Child classes implement specific integration measures and, if available, make use + of random variables for sampling and evaluation of the density function. + + Parameters + ---------- + domain + Domain of integration. Contains lower and upper bound as a scalar or + ``np.ndarray``. + input_dim + Dimension of the integration domain. + """ + + def __init__( + self, + domain: DomainLike, + input_dim: Optional[IntLike], + ) -> None: + + self.domain, self.input_dim = as_domain(domain, input_dim) + + def __call__(self, points: Union[FloatLike, np.ndarray]) -> np.ndarray: + """Evaluate the density function of the integration measure. + + Parameters + ---------- + points + *shape=(n_points, input_dim)* -- Input locations. + + Returns + ------- + density_evals : + *shape=(n_points,)* -- Density evaluated at given locations. + """ + # pylint: disable=no-member + return self.random_variable.pdf(points).reshape(-1) + + def sample( + self, + n_sample: IntLike, + rng: Optional[np.random.Generator] = np.random.default_rng(), + ) -> np.ndarray: + """Sample ``n_sample`` points from the integration measure. + + Parameters + ---------- + n_sample + Number of points to be sampled + rng + Random number generator. Optional. Default is `np.random.default_rng()`. + + Returns + ------- + points : + *shape=(n_sample,input_dim)* -- Sampled points + """ + # pylint: disable=no-member + return np.reshape( + self.random_variable.sample(size=n_sample, rng=rng), + newshape=(n_sample, self.input_dim), + ) diff --git a/src/probnum/quad/integration_measures/_integration_measures.py b/src/probnum/quad/integration_measures/_integration_measures.py deleted file mode 100644 index 6fb210c2b..000000000 --- a/src/probnum/quad/integration_measures/_integration_measures.py +++ /dev/null @@ -1,199 +0,0 @@ -"""Contains integration measures.""" -from __future__ import annotations - -import abc -from typing import Optional, Union - -import numpy as np -import scipy.stats - -from probnum.quad._utils import as_domain -from probnum.quad.typing import DomainLike -from probnum.randvars import Normal -from probnum.typing import FloatLike, IntLike - - -class IntegrationMeasure(abc.ABC): - """An abstract class for a measure against which a target function is integrated. - - Child classes implement specific integration measures and, if available, make use - of random variables for sampling and evaluation of the density function. - - Parameters - ---------- - domain - Domain of integration. Contains lower and upper bound as a scalar or - ``np.ndarray``. - input_dim - Dimension of the integration domain. - """ - - def __init__( - self, - domain: DomainLike, - input_dim: Optional[IntLike], - ) -> None: - - self.domain, self.input_dim = as_domain(domain, input_dim) - - def __call__(self, points: Union[FloatLike, np.ndarray]) -> np.ndarray: - """Evaluate the density function of the integration measure. - - Parameters - ---------- - points - *shape=(n_points, input_dim)* -- Input locations. - - Returns - ------- - density_evals : - *shape=(n_points,)* -- Density evaluated at given locations. - """ - # pylint: disable=no-member - return self.random_variable.pdf(points).reshape(-1) - - def sample( - self, - n_sample: IntLike, - rng: Optional[np.random.Generator] = np.random.default_rng(), - ) -> np.ndarray: - """Sample ``n_sample`` points from the integration measure. - - Parameters - ---------- - n_sample - Number of points to be sampled - rng - Random number generator. Optional. Default is `np.random.default_rng()`. - - Returns - ------- - points : - *shape=(n_sample,input_dim)* -- Sampled points - """ - # pylint: disable=no-member - return np.reshape( - self.random_variable.sample(size=n_sample, rng=rng), - newshape=(n_sample, self.input_dim), - ) - - -class LebesgueMeasure(IntegrationMeasure): - """Lebesgue measure on a hyper-rectangle. - - Parameters - ---------- - domain - Domain of integration. Contains lower and upper bound as a scalar or - ``np.ndarray``. - input_dim - Dimension of the integration domain. If not given, inferred from ``domain``. - normalized - Boolean which controls whether the measure is normalized (i.e., - integral over the domain is one). Defaults to ``False``. - """ - - def __init__( - self, - domain: DomainLike, - input_dim: Optional[IntLike] = None, - normalized: bool = False, - ) -> None: - super().__init__(input_dim=input_dim, domain=domain) - - # Set normalization constant - if normalized: - normalization_constant = 1.0 / np.prod(self.domain[1] - self.domain[0]) - else: - normalization_constant = 1.0 - - if normalization_constant in [0, np.Inf, -np.Inf]: - raise ValueError( - "Normalization constant is too small or too large. " - "Consider setting normalized = False." - ) - - self.normalized = normalized - self.normalization_constant = normalization_constant - - # Use scipy's uniform random variable since uniform random variables are not - # yet implemented in probnum - self.random_variable = scipy.stats.uniform( - loc=self.domain[0], scale=self.domain[1] - self.domain[0] - ) - - def __call__(self, points: np.ndarray) -> np.ndarray: - num_dat = points.shape[0] - return np.full((num_dat,), self.normalization_constant) - - def sample( - self, - n_sample: IntLike, - rng: Optional[np.random.Generator] = np.random.default_rng(), - ) -> np.ndarray: - return self.random_variable.rvs( - size=(n_sample, self.input_dim), random_state=rng - ) - - -# pylint: disable=too-few-public-methods -class GaussianMeasure(IntegrationMeasure): - """Gaussian measure on Euclidean space with given mean and covariance. - - If ``mean`` and ``cov`` are scalars but ``input_dim`` is larger than one, ``mean`` - and ``cov`` are extended to a constant vector and diagonal matrix, respectively, - of appropriate dimensions. - - Parameters - ---------- - mean - *shape=(input_dim,)* -- Mean of the Gaussian measure. - cov - *shape=(input_dim, input_dim)* -- Covariance matrix of the Gaussian measure. - input_dim - Dimension of the integration domain. - """ - - def __init__( - self, - mean: Union[float, np.floating, np.ndarray], - cov: Union[float, np.floating, np.ndarray], - input_dim: Optional[IntLike] = None, - ) -> None: - - # Extend scalar mean and covariance to higher dimensions if input_dim has been - # supplied by the user - if ( - (np.isscalar(mean) or mean.size == 1) - and (np.isscalar(cov) or cov.size == 1) - and input_dim is not None - ): - mean = np.full((input_dim,), mean) - cov = cov * np.eye(input_dim) - - # Set dimension based on the mean vector - if np.isscalar(mean): - input_dim = 1 - else: - input_dim = mean.size - - # Set domain as whole R^n - domain = (np.full((input_dim,), -np.Inf), np.full((input_dim,), np.Inf)) - super().__init__(input_dim=input_dim, domain=domain) - - # Exploit random variables to carry out mean and covariance checks - # squeezes are needed due to the way random variables are currently implemented - # pylint: disable=no-member - self.random_variable = Normal(mean=np.squeeze(mean), cov=np.squeeze(cov)) - self.mean = np.reshape(self.random_variable.mean, (self.input_dim,)) - self.cov = np.reshape( - self.random_variable.cov, (self.input_dim, self.input_dim) - ) - - # Set diagonal_covariance flag - if input_dim == 1: - self.diagonal_covariance = True - else: - self.diagonal_covariance = ( - np.count_nonzero(self.cov - np.diag(np.diagonal(self.cov))) == 0 - ) diff --git a/src/probnum/quad/integration_measures/_lebesgue_measure.py b/src/probnum/quad/integration_measures/_lebesgue_measure.py new file mode 100644 index 000000000..0d96da846 --- /dev/null +++ b/src/probnum/quad/integration_measures/_lebesgue_measure.py @@ -0,0 +1,72 @@ +"""The Lebesgue measure.""" + + +from __future__ import annotations + +from typing import Optional + +import numpy as np +import scipy.stats + +from probnum.quad.typing import DomainLike +from probnum.typing import IntLike + +from ._integration_measure import IntegrationMeasure + + +class LebesgueMeasure(IntegrationMeasure): + """Lebesgue measure on a hyper-rectangle. + + Parameters + ---------- + domain + Domain of integration. Contains lower and upper bound as a scalar or + ``np.ndarray``. + input_dim + Dimension of the integration domain. If not given, inferred from ``domain``. + normalized + Boolean which controls whether the measure is normalized (i.e., + integral over the domain is one). Defaults to ``False``. + """ + + def __init__( + self, + domain: DomainLike, + input_dim: Optional[IntLike] = None, + normalized: bool = False, + ) -> None: + super().__init__(input_dim=input_dim, domain=domain) + + # Set normalization constant + if normalized: + normalization_constant = 1.0 / np.prod(self.domain[1] - self.domain[0]) + else: + normalization_constant = 1.0 + + if normalization_constant in [0, np.Inf, -np.Inf]: + raise ValueError( + "Normalization constant is too small or too large. " + "Consider setting normalized = False." + ) + + self.normalized = normalized + self.normalization_constant = normalization_constant + + # Use scipy's uniform random variable since uniform random variables are not + # yet implemented in probnum + self.random_variable = scipy.stats.uniform( + loc=self.domain[0], scale=self.domain[1] - self.domain[0] + ) + + def __call__(self, points: np.ndarray) -> np.ndarray: + num_dat = points.shape[0] + return np.full((num_dat,), self.normalization_constant) + + def sample( + self, + n_sample: IntLike, + rng: Optional[np.random.Generator] = np.random.default_rng(), + ) -> np.ndarray: + return self.random_variable.rvs( + size=(n_sample, self.input_dim), random_state=rng + ) diff --git a/src/probnum/quad/solvers/belief_updates/__init__.py b/src/probnum/quad/solvers/belief_updates/__init__.py index 422a5f8a5..62ec1d75c 100644 --- a/src/probnum/quad/solvers/belief_updates/__init__.py +++ b/src/probnum/quad/solvers/belief_updates/__init__.py @@ -1,6 +1,7 @@ """Belief updates for Bayesian quadrature.""" -from ._belief_update import BQBeliefUpdate, BQStandardBeliefUpdate +from ._belief_update import BQBeliefUpdate +from ._standard_update import BQStandardBeliefUpdate # Public classes and functions. Order is reflected in documentation. __all__ = [ diff --git a/src/probnum/quad/solvers/belief_updates/_belief_update.py b/src/probnum/quad/solvers/belief_updates/_belief_update.py index a04bb5356..4f9d3dc42 100644 --- a/src/probnum/quad/solvers/belief_updates/_belief_update.py +++ b/src/probnum/quad/solvers/belief_updates/_belief_update.py @@ -1,22 +1,19 @@ -"""Belief updates for Bayesian quadrature.""" +"""Base class of belief update for Bayesian quadrature.""" from __future__ import annotations import abc -from typing import Optional, Tuple +from typing import Tuple import numpy as np from scipy.linalg import cho_factor, cho_solve -from probnum.quad.kernel_embeddings import KernelEmbedding from probnum.quad.solvers._bq_state import BQState -from probnum.randprocs.kernels import Kernel from probnum.randvars import Normal from probnum.typing import FloatLike -# pylint: disable=too-few-public-methods, too-many-locals - +# pylint: disable=too-few-public-methods class BQBeliefUpdate(abc.ABC): """Abstract class for the inference scheme. @@ -87,110 +84,3 @@ def _gram_cho_solve(self, gram_cho_factor: np.ndarray, z: np.ndarray) -> np.ndar """Wrapper for scipy.linalg.cho_solve. Meant to be used for linear systems of the gram matrix. Requires the solution of scipy.linalg.cho_factor as input.""" return cho_solve(gram_cho_factor, z) - - -class BQStandardBeliefUpdate(BQBeliefUpdate): - """Updates integral belief and state using standard Bayesian quadrature based on - standard Gaussian process inference. - - Parameters - ---------- - jitter - Non-negative jitter to numerically stabilise kernel matrix inversion. - scale_estimation - Estimation method to use to compute the scale parameter. - """ - - def __init__(self, jitter: FloatLike, scale_estimation: Optional[str]) -> None: - super().__init__(jitter=jitter) - self.scale_estimation = scale_estimation - - def __call__( - self, - bq_state: BQState, - new_nodes: np.ndarray, - new_fun_evals: np.ndarray, - *args, - **kwargs, - ) -> Tuple[Normal, BQState]: - - # Update nodes and function evaluations - nodes = np.concatenate((bq_state.nodes, new_nodes), axis=0) - fun_evals = np.append(bq_state.fun_evals, new_fun_evals) - - # Estimate intrinsic kernel parameters - new_kernel, kernel_was_updated = self._estimate_kernel(bq_state.kernel) - new_kernel_embedding = KernelEmbedding(new_kernel, bq_state.measure) - - # Update gram matrix and kernel mean vector. Recompute everything from - # scratch if the kernel was updated or if these are the first nodes. - if kernel_was_updated or bq_state.nodes.size == 0: - gram = new_kernel.matrix(nodes) - kernel_means = new_kernel_embedding.kernel_mean(nodes) - else: - gram_new_new = new_kernel.matrix(new_nodes) - gram_old_new = new_kernel.matrix(new_nodes, bq_state.nodes) - gram = np.hstack( - ( - np.vstack((bq_state.gram, gram_old_new)), - np.vstack((gram_old_new.T, gram_new_new)), - ) - ) - kernel_means = np.concatenate( - ( - bq_state.kernel_means, - new_kernel_embedding.kernel_mean(new_nodes), - ) - ) - - # Cholesky factorisation of the Gram matrix - gram_cho_factor = self._compute_gram_cho_factor(gram) - - # Estimate scaling parameter - new_scale_sq = self._estimate_scale(fun_evals, gram_cho_factor, bq_state) - - # Integral mean and variance - weights = self._gram_cho_solve(gram_cho_factor, kernel_means) - integral_mean = weights @ fun_evals - initial_integral_variance = new_kernel_embedding.kernel_variance() - integral_variance = new_scale_sq * ( - initial_integral_variance - weights @ kernel_means - ) - - new_belief = Normal(integral_mean, integral_variance) - new_state = BQState.from_new_data( - kernel=new_kernel, - scale_sq=new_scale_sq, - nodes=nodes, - fun_evals=fun_evals, - integral_belief=new_belief, - prev_state=bq_state, - gram=gram, - kernel_means=kernel_means, - ) - - return new_belief, new_state - - # pylint: disable=no-self-use - def _estimate_kernel(self, kernel: Kernel) -> Tuple[Kernel, bool]: - """Estimate the intrinsic kernel parameters. That is, all parameters except the - scale.""" - new_kernel = kernel - kernel_was_updated = False - return new_kernel, kernel_was_updated - - def _estimate_scale( - self, fun_evals: np.ndarray, gram_cho_factor: np.ndarray, bq_state: BQState - ) -> FloatLike: - """Estimate the scale parameter.""" - if self.scale_estimation is None: - new_scale_sq = bq_state.scale_sq - elif self.scale_estimation == "mle": - new_scale_sq = ( - fun_evals - @ self._gram_cho_solve(gram_cho_factor, fun_evals) - / fun_evals.shape[0] - ) - else: - raise ValueError(f"Scale estimation ({self.scale_estimation}) is unknown.") - return new_scale_sq diff --git a/src/probnum/quad/solvers/belief_updates/_standard_update.py b/src/probnum/quad/solvers/belief_updates/_standard_update.py new file mode 100644 index 000000000..2991d0b48 --- /dev/null +++ b/src/probnum/quad/solvers/belief_updates/_standard_update.py @@ -0,0 +1,124 @@ +"""Belief update for conjugate Bayesian quadrature.""" + +from __future__ import annotations + +from typing import Optional, Tuple + +import numpy as np + +from probnum.quad.kernel_embeddings import KernelEmbedding +from probnum.quad.solvers._bq_state import BQState +from probnum.randprocs.kernels import Kernel +from probnum.randvars import Normal +from probnum.typing import FloatLike + +from ._belief_update import BQBeliefUpdate + + +# pylint: disable=too-few-public-methods +class BQStandardBeliefUpdate(BQBeliefUpdate): + """Updates integral belief and state using standard Bayesian quadrature based on + standard Gaussian process inference. + + Parameters + ---------- + jitter + Non-negative jitter to numerically stabilise kernel matrix inversion. + scale_estimation + Estimation method to use to compute the scale parameter. + """ + + def __init__(self, jitter: FloatLike, scale_estimation: Optional[str]) -> None: + super().__init__(jitter=jitter) + self.scale_estimation = scale_estimation + + # pylint: disable=too-many-locals + def __call__( + self, + bq_state: BQState, + new_nodes: np.ndarray, + new_fun_evals: np.ndarray, + *args, + **kwargs, + ) -> Tuple[Normal, BQState]: + + # Update nodes and function evaluations + nodes = np.concatenate((bq_state.nodes, new_nodes), axis=0) + fun_evals = np.append(bq_state.fun_evals, new_fun_evals) + + # Estimate intrinsic kernel parameters + new_kernel, kernel_was_updated = self._estimate_kernel(bq_state.kernel) + new_kernel_embedding = KernelEmbedding(new_kernel, bq_state.measure) + + # Update gram matrix and kernel mean vector. Recompute everything from + # scratch if the kernel was updated or if these are the first nodes. + if kernel_was_updated or bq_state.nodes.size == 0: + gram = new_kernel.matrix(nodes) + kernel_means = new_kernel_embedding.kernel_mean(nodes) + else: + gram_new_new = new_kernel.matrix(new_nodes) + gram_old_new = new_kernel.matrix(new_nodes, bq_state.nodes) + gram = np.hstack( + ( + np.vstack((bq_state.gram, gram_old_new)), + np.vstack((gram_old_new.T, gram_new_new)), + ) + ) + kernel_means = np.concatenate( + ( + bq_state.kernel_means, + new_kernel_embedding.kernel_mean(new_nodes), + ) + ) + + # Cholesky factorisation of the Gram matrix + gram_cho_factor = self._compute_gram_cho_factor(gram) + + # Estimate scaling parameter + new_scale_sq = self._estimate_scale(fun_evals, gram_cho_factor, bq_state) + + # Integral mean and variance + weights = self._gram_cho_solve(gram_cho_factor, kernel_means) + integral_mean = weights @ fun_evals + initial_integral_variance = new_kernel_embedding.kernel_variance() + integral_variance = new_scale_sq * ( + initial_integral_variance - weights @ kernel_means + ) + + new_belief = Normal(integral_mean, integral_variance) + new_state = BQState.from_new_data( + kernel=new_kernel, + scale_sq=new_scale_sq, + nodes=nodes, + fun_evals=fun_evals, + integral_belief=new_belief, + prev_state=bq_state, + gram=gram, + kernel_means=kernel_means, + ) + + return new_belief, new_state + + # pylint: disable=no-self-use + def _estimate_kernel(self, kernel: Kernel) -> Tuple[Kernel, bool]: + """Estimate the intrinsic kernel parameters. That is, all parameters except the + scale.""" + new_kernel = kernel + kernel_was_updated = False + return new_kernel, kernel_was_updated + + def _estimate_scale( + self, fun_evals: np.ndarray, gram_cho_factor: np.ndarray, bq_state: BQState + ) -> FloatLike: + """Estimate the scale parameter.""" + if self.scale_estimation is None: + new_scale_sq = bq_state.scale_sq + elif self.scale_estimation == "mle": + new_scale_sq = ( + fun_evals + @ self._gram_cho_solve(gram_cho_factor, fun_evals) + / fun_evals.shape[0] + ) + else: + raise ValueError(f"Scale estimation ({self.scale_estimation}) is unknown.") + return new_scale_sq diff --git a/src/probnum/quad/solvers/policies/_random_policy.py b/src/probnum/quad/solvers/policies/_random_policy.py index 6110ca465..cae7f297e 100644 --- a/src/probnum/quad/solvers/policies/_random_policy.py +++ b/src/probnum/quad/solvers/policies/_random_policy.py @@ -22,6 +22,8 @@ class RandomPolicy(Policy): shape (batch_size, n_dim). batch_size Size of batch of nodes when calling the policy once. + rng + A random number generator. """ def __init__( diff --git a/src/probnum/quad/solvers/policies/_van_der_corput_policy.py b/src/probnum/quad/solvers/policies/_van_der_corput_policy.py index 1e7edfe85..3d78bc0e9 100644 --- a/src/probnum/quad/solvers/policies/_van_der_corput_policy.py +++ b/src/probnum/quad/solvers/policies/_van_der_corput_policy.py @@ -40,7 +40,7 @@ def __init__(self, measure: IntegrationMeasure, batch_size: int) -> None: domain_a = measure.domain[0] domain_b = measure.domain[1] - if np.Inf in [abs(domain_a), abs(domain_b)]: + if np.Inf in np.hstack([abs(measure.domain[0]), abs(measure.domain[1])]): raise ValueError("Policy 'vdc' works only for bounded domains.") self.domain_a = domain_a