Skip to content

Commit

Permalink
Merge branch 'master' of github.com:bystrogenomics/bystro
Browse files Browse the repository at this point in the history
  • Loading branch information
akotlar committed Oct 24, 2024
2 parents ea9470e + 79b8b87 commit 939810a
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 22 deletions.
17 changes: 3 additions & 14 deletions python/python/bystro/supervised_ppca/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
_entropy_subset,
_mutual_information,
inv_sherman_woodbury_fa,
_get_conditional_parameters_sherman_woodbury,
_conditional_score_sherman_woodbury,
_conditional_score_samples_sherman_woodbury,
_marginal_score_sherman_woodbury,
Expand Down Expand Up @@ -273,12 +272,7 @@ def transform(self, X: NDArray[np.float_], sherman_woodbury: bool = False):
prec = self.get_precision()
coefs = np.dot(self.W_, prec)
else:
Lambda = self.get_noise()
A = la.solve(Lambda, self.W_)
B = np.dot(A, self.W_.T)
IpB = np.eye(self.n_components) + B
end = la.solve(IpB, A)
coefs = A - np.dot(B, end)
raise NotImplementedError("Subclass PCA required for this Sherman Woodbury")

return np.dot(X, coefs.T)

Expand Down Expand Up @@ -308,20 +302,15 @@ def transform_subset(
if self.W_ is None:
raise ValueError("Model has not been fit yet")

Wo = self.W_[:, observed_feature_idxs]
if sherman_woodbury is False:
covariance = self.get_covariance()
cov_sub = covariance[
np.ix_(observed_feature_idxs, observed_feature_idxs)
]
Wo = self.W_[:, observed_feature_idxs]
coefs = np.dot(Wo, la.inv(cov_sub))
else:
noise = self.get_noise()
noise_sub = noise[observed_feature_idxs]
coef, _ = _get_conditional_parameters_sherman_woodbury(
noise_sub, self.W_, observed_feature_idxs
)

raise NotImplementedError("Subclass PCA required for this Sherman Woodbury")
return np.dot(X, coefs.T)

def conditional_score(
Expand Down
4 changes: 2 additions & 2 deletions python/python/bystro/supervised_ppca/gf_dropout_pt.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ def fit( # type: ignore[override]
z_samples = mean_z + torch.matmul(eps, C1_2)

if task == "regression":
beta_ = softplus(beta_l)
beta_ = softplus(beta_l) + .01
y_hat = torch.squeeze(
torch.matmul(z_samples[:, : self.n_supervised], beta_)
torch.matmul(20*z_samples[:, : self.n_supervised], beta_)
)
loss_y = supervision_loss(y_hat, torch.squeeze(y_batch))
else:
Expand Down
2 changes: 1 addition & 1 deletion python/python/bystro/supervised_ppca/gf_generative_pt.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from numpy.typing import NDArray
from typing import Any, Callable

from sklearn.decomposition import PCA
from sklearn.decomposition import PCA # type: ignore
from tqdm import trange
import torch
from torch import Tensor, nn
Expand Down
213 changes: 208 additions & 5 deletions python/python/bystro/supervised_ppca/ppca_augmented.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from bystro.supervised_ppca._base import BaseGaussianFactorModel
from numpy.typing import NDArray

from sklearn.utils.extmath import randomized_range_finder
from sklearn.utils.extmath import randomized_range_finder # type: ignore

from bystro.covariance._covariance_np import (
EmpiricalCovariance,
Expand Down Expand Up @@ -678,8 +678,10 @@ def __init__(
eps: float = 1e-8,
n_oversamples: int = 20,
random_state: int = 2021,
regularization: str = "Linear",
):
super().__init__(n_components=n_components)
self.regularization = regularization
self.mu: float = mu
self.eps: float = eps
self.W_: NDArray[np.float_] | None = None
Expand Down Expand Up @@ -725,9 +727,20 @@ def fit(
)

X_tilde = Q.T @ X_dm.T
X_tilde = X_tilde.T

model_cov = _select_covariance_estimator(self.regularization)
XX = np.zeros((N, n_random + q))
XX[:, :n_random] = X_tilde
if q == 1:
XX[:, -1] = np.squeeze(Y_dm)
else:
XX[:, n_random:] = Y_dm
model_cov.fit(XX)
cov = model_cov.covariance

B_11 = np.cov(X_tilde)
B_12 = 1 / N * np.dot(X_tilde, Y_dm)
B_11 = cov[:n_random, :n_random]
B_12 = cov[n_random:, :n_random].T
B_22 = B_12.T @ la.solve(B_11 + self.eps * np.eye(n_random), B_12)

B = np.zeros((n_random + q, n_random + q))
Expand All @@ -754,6 +767,7 @@ def fit(
A = W.T @ W
B = W.T @ X_dm.T
S = la.solve(A, B).T
self.explained_variance_ = np.mean(S**2, axis=0)
X_recon = S @ W.T
var = np.mean((X - X_recon) ** 2)
self._store_instance_variables((W, var))
Expand All @@ -778,7 +792,195 @@ def get_covariance(self) -> NDArray[np.float_]:
if self.W_ is None or self.sigma2_ is None or self.p is None:
raise ValueError("Model has not been fit yet")

return np.dot(self.W_.T, self.W_) + self.sigma2_ * np.eye(self.p)
exp_var_diff = self.explained_variance_ - self.sigma2_
W_mod = self.W_.T * np.sqrt(exp_var_diff)
return np.dot(W_mod, W_mod.T) + self.sigma2_ * np.eye(self.p)

def get_noise(self):
"""
Returns the observational noise as a diagonal matrix
Parameters
----------
None
Returns
-------
Lambda : NDArray,(p,p)
The observational noise
"""
if self.sigma2_ is None or self.p is None:
raise ValueError("Model has not been fit yet")

return self.sigma2_ * np.eye(self.p)

def _store_instance_variables(
self,
trainable_variables: tuple[
NDArray[np.float_],
np.float_,
],
) -> None:
"""
Saves the learned variables
Parameters
----------
trainable_variables : list
List of variables saved
Sets
----
W_ : NDArray,(n_components,p)
The loadings
sigma2_ : float
The isotropic variance
"""
self.W_ = trainable_variables[0].T
self.sigma2_ = trainable_variables[1]

def _initialize_save_losses(self):
pass

def _save_losses(self):
pass

def _test_inputs(self):
pass

def _transform_training_data(self):
pass

def _save_variables(self):
pass


class PPCAsupervisedRandomized(BaseGaussianFactorModel):
def __init__(
self,
n_components: int = 2,
mu: float = 1.0,
eps: float = 1e-8,
n_oversamples: int = 20,
random_state: int = 2021,
regularization: str = "Linear",
):
super().__init__(n_components=n_components)
self.regularization = regularization
self.mu: float = mu
self.eps: float = eps
self.W_: NDArray[np.float_] | None = None
self.D_: NDArray[np.float_] | None = None
self.p: int | None = None
self.n_oversamples: int = n_oversamples
self.random_state: int = random_state
self.sigma2_: np.float_ | None = None

def fit(
self, X: NDArray[np.float_], Y: NDArray[np.float_]
) -> "PPCAsupervisedRandomized":
"""
Fits a model given covariates X
Parameters
----------
X : NDArray,(n_samples,n_covariates)
The data
Y : NDArray,(n_samples,n_confounders)
The concommitant data we want to remove
Returns
-------
self : PPCAadversarial
The model
"""
self.mean_x = np.mean(X, axis=0)
self.mean_y = np.mean(Y, axis=0)
X_dm = X - self.mean_x
Y_dm = Y - self.mean_y
N, self.p = X.shape
q = Y.shape[1]

n_random = self.n_components + self.n_oversamples
Q = randomized_range_finder(
X_dm.T,
n_iter=7,
size=n_random,
power_iteration_normalizer="auto",
random_state=self.random_state,
)

X_tilde = Q.T @ X_dm.T
X_tilde = X_tilde.T

model_cov = _select_covariance_estimator(self.regularization)
XX = np.zeros((N, n_random + q))
XX[:, :n_random] = X_tilde
if q == 1:
XX[:, -1] = np.squeeze(Y_dm)
else:
XX[:, n_random:] = Y_dm
model_cov.fit(XX)
cov = model_cov.covariance

B_11 = cov[:n_random, :n_random]
B_12 = cov[n_random:, :n_random].T
B_22 = B_12.T @ la.solve(B_11 + self.eps * np.eye(n_random), B_12)

B = np.zeros((n_random + q, n_random + q))
B[:n_random, :n_random] = B_11
B[:n_random, n_random:] = B_12
B[n_random:, :n_random] = self.mu * B_12.T
B[n_random:, n_random:] = self.mu * B_22
B = B.T

eigvals, eigvecs = la.eig(B)
eigvals = np.real(eigvals)
eigvecs = np.real(eigvecs)
idx = eigvals.argsort()[::-1]

self.eigvals_ = eigvals[idx]
V_latent = eigvecs[:, idx]

self.V_latent = V_latent

W_latent = V_latent[:n_random, : self.n_components]

W = Q @ W_latent

A = W.T @ W
B = W.T @ X_dm.T
S = la.solve(A, B).T
self.explained_variance_ = np.mean(S**2, axis=0)
X_recon = S @ W.T
var = np.mean((X - X_recon) ** 2)
self._store_instance_variables((W, var))

return self

def get_covariance(self) -> NDArray[np.float_]:
"""
Gets the covariance matrix
Sigma = W^TW + sigma2*I
Parameters
----------
None
Returns
-------
covariance : NDArray,(p,p)
The covariance matrix
"""
if self.W_ is None or self.sigma2_ is None or self.p is None:
raise ValueError("Model has not been fit yet")

exp_var_diff = self.explained_variance_ - self.sigma2_
W_mod = self.W_.T * np.sqrt(exp_var_diff)
return np.dot(W_mod, W_mod.T) + self.sigma2_ * np.eye(self.p)

def get_noise(self):
"""
Expand All @@ -801,7 +1003,8 @@ def get_noise(self):
def _store_instance_variables(
self,
trainable_variables: tuple[
NDArray[np.float_], np.float_,
NDArray[np.float_],
np.float_,
],
) -> None:
"""
Expand Down

0 comments on commit 939810a

Please sign in to comment.