Skip to content

Commit

Permalink
feat: started introducing backpropagation in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
BrunoLiegiBastonLiegi committed Sep 27, 2024
1 parent 1b6ee4a commit e1fd300
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 26 deletions.
2 changes: 2 additions & 0 deletions src/qiboml/models/pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def __post_init__(self):

for j, layer in enumerate(self._trainable_layers):
for i, params in enumerate(layer.parameters):
if self.backend.name != "pytorch":
params = [p.tolist() for p in params]
params = torch.as_tensor(params)
params.requires_grad = True
setattr(
Expand Down
68 changes: 51 additions & 17 deletions tests/test_backprop.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import matplotlib.pyplot as plt
import pytest
import torch
from qibo import hamiltonians
Expand All @@ -10,12 +11,30 @@
from qiboml.models import encoding_decoding as ed


@pytest.mark.parametrize("backend", [JaxBackend(), PyTorchBackend()])
def eval_model(model, data, target, loss_f):
with torch.no_grad():
loss = 0.0
for x, y in zip(data, target):
out = model(x)
loss += loss_f(out.ravel(), y.ravel())
avg_loss = loss / data.shape[0]
print(f"Avg loss: {avg_loss}")
plt.scatter(data[:, 0], target, alpha=0.5)
plt.scatter(data[:, 0], [model(x) for x in data], alpha=0.5)
# plt.show()
return avg_loss


@pytest.mark.parametrize(
"backend",
[
PyTorchBackend(),
],
)
@pytest.mark.parametrize("differentiation", ["Jax", "PSR"])
def test_backpropagation(backend, differentiation):
nqubits = 3
dim = 2
training_layer = ans.ReuploadingLayer(nqubits, backend=backend)
training_layers = [ans.ReuploadingLayer(nqubits, backend=backend) for _ in range(3)]
encoding_layer = ed.PhaseEncodingLayer(nqubits, backend=backend)
kwargs = {"backend": backend}
decoding_qubits = range(nqubits)
Expand All @@ -30,30 +49,45 @@ def test_backpropagation(backend, differentiation):
q_model = pt.QuantumModel(
layers=[
encoding_layer,
training_layer,
*training_layers,
decoding_layer,
],
differentiation=differentiation,
)
encoding = torch.nn.Linear(1, nqubits)
model = torch.nn.Sequential(encoding, q_model)
classical_encoding = torch.nn.Linear(1, nqubits).double()
classical_decoding = torch.nn.Linear(1, 1).double()
"""
model = torch.nn.Sequential(
classical_encoding,
torch.nn.ReLU(),
q_model,
torch.nn.ReLU(),
classical_decoding
)"""
model = q_model
# try to fit a parabola
x = torch.randn(100, 1)
y = x**2
x = torch.randn(100, 3).double()
y = torch.sin(x.sum(-1))
loss_f = torch.nn.MSELoss()

initial_loss = eval_model(model, x, y, loss_f)

optimizer = torch.optim.SGD(model.parameters(), lr=1e-1, momentum=0.9)
for input, target in zip(x, y):
optimizer = torch.optim.Adam(model.parameters())
cum_loss = 0.0

for i, (input, target) in enumerate(zip(x, y)):
optimizer.zero_grad()
output = model(input)
loss = (target - output) ** 2
print(list(model.named_parameters()))
print(f"> loss: {loss}")
loss = loss_f(target, output)
loss.backward()
optimizer.step()
print(list(model.named_parameters()))

# print(
# f"> Parameters delta: {torch.cat(tuple(p.ravel() for p in model.parameters())) - params_bkp}"
# )
cum_loss += loss
if i % 50 == 0 and i != 0:
print(f"Loss: {cum_loss / 50}")
cum_loss = 0.0

final_loss = eval_model(model, x, y, loss_f)

assert initial_loss > final_loss
assert False
91 changes: 82 additions & 9 deletions tests/test_models_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,89 @@ def build_linear_layer(frontend, input_dim, output_dim):


def build_sequential_model(frontend, layers):
import keras

if frontend.__name__ == "qiboml.models.pytorch":
return torch.nn.Sequential(*layers)
return frontend.torch.nn.Sequential(*layers)
elif frontend.__name__ == "qiboml.models.keras":
return keras.Sequential(layers)
return frontend.keras.Sequential(layers)
else:
raise_error(RuntimeError, f"Unknown frontend {frontend}.")


def random_tensor(frontend, shape):
import tensorflow as tf

def random_tensor(frontend, shape, binary=False):
if frontend.__name__ == "qiboml.models.pytorch":
return torch.randn(shape)
tensor = frontend.torch.randint(0, 2, shape) if binary else torch.randn(shape)
elif frontend.__name__ == "qiboml.models.keras":
return tf.random.uniform(shape)
tensor = frontend.tf.random.uniform(shape)
else:
raise_error(RuntimeError, f"Unknown frontend {frontend}.")
return tensor


def train_model(frontend, model, data, target):
if frontend.__name__ == "qiboml.models.pytorch":

optimizer = torch.optim.LBFGS(model.parameters(), lr=1e-1, tolerance_grad=1e-4)
loss_f = torch.nn.MSELoss()

def closure():
optimizer.zero_grad()
out = frontend.torch.vstack([model(x) for x in data])
loss = loss_f(out, target)
loss.backward()
return loss

optimizer.step(closure)
elif frontend.__name__ == "qiboml.models.keras":
pass


def eval_model(frontend, model, data, target=None):
loss = None
outputs = []
if frontend.__name__ == "qiboml.models.pytorch":
loss_f = torch.nn.MSELoss()
with torch.no_grad():
for x in data:
outputs.append(model(x))
outputs = frontend.torch.vstack(outputs)
elif frontend.__name__ == "qiboml.models.keras":
pass
if target is not None:
loss = loss_f(target, outputs)
return outputs, loss


def random_parameters(frontend, model):
if frontend.__name__ == "qiboml.models.pytorch":
new_params = {}
for k, v in model.state_dict().items():
new_params.update({k: frontend.torch.randn(v.shape)})
elif frontend.__name__ == "qiboml.models.keras":
pass
return new_params


def get_parameters(frontend, model):
if frontend.__name__ == "qiboml.models.pytorch":
return {k: v.clone() for k, v in model.state_dict().items()}
elif frontend.__name__ == "qiboml.models.keras":
pass


def set_parameters(frontend, model, params):
if frontend.__name__ == "qiboml.models.pytorch":
model.load_state_dict(params)
elif frontend.__name__ == "qiboml.models.keras":
pass


def prepare_targets(frontend, model, data):
target_params = random_parameters(frontend, model)
init_params = get_parameters(frontend, model)
set_parameters(frontend, model, target_params)
target, _ = eval_model(frontend, model, data)
set_parameters(frontend, model, init_params)
return target


@pytest.mark.parametrize("layer", ENCODING_LAYERS)
Expand All @@ -86,6 +150,15 @@ def test_encoding(backend, frontend, layer):
decoding_layer,
]
)

binary = True if layer.__class__.__name__ == "BinaryEncodingLayer" else False
data = random_tensor(frontend, (100, dim), binary)
target = prepare_targets(frontend, q_model, data)
_, loss_untrained = eval_model(frontend, q_model, data, target)
train_model(frontend, q_model, data, target)
_, loss_trained = eval_model(frontend, q_model, data, target)
assert loss_untrained > loss_trained

model = build_sequential_model(
frontend,
[
Expand Down

0 comments on commit e1fd300

Please sign in to comment.