Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: improve utils/test_graph.py module #420

Merged
merged 2 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deeprankcore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from glob import glob
from multiprocessing import Pool
from os.path import basename
from random import randrange
from types import ModuleType
from typing import Dict, Iterator, List, Optional, Union

Expand Down Expand Up @@ -210,7 +211,7 @@ def _process_one_query( # pylint: disable=too-many-arguments

for _ in range(grid_augmentation_count):
# repeat with random augmentation
axis, angle = pdb2sql.transform.get_rot_axis_angle() # insert numpy random seed once implemented
axis, angle = pdb2sql.transform.get_rot_axis_angle(randrange(100))
augmentation = Augmentation(axis, angle)
graph.write_as_grid_to_hdf5(output_path, grid_settings, grid_map_method, augmentation)

Expand Down
226 changes: 165 additions & 61 deletions tests/utils/test_graph.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os
import shutil
import tempfile
from random import randrange

import h5py
import numpy as np
import pytest
from pdb2sql import pdb2sql
from pdb2sql.transform import get_rot_axis_angle

Expand All @@ -17,20 +19,26 @@
from deeprankcore.utils.graph import Edge, Graph, Node
from deeprankcore.utils.grid import Augmentation, GridSettings, MapMethod

entry_id = "test"
node_feature_narray = "node_feat1"
edge_feature_narray = "edge_feat1"
node_feature_singleton = "node_feat2"
# target name and value
target_name = "target1"
target_value = 1.0

def test_graph_build_and_export(): # pylint: disable=too-many-locals

@pytest.fixture
def graph():
"""Build a simple graph of two nodes and one edge in between them.
Test that the export methods can be called without failure.
"""

entry_id = "test"

# load the structure
pdb = pdb2sql("tests/data/pdb/101M/101M.pdb")
try:
structure = get_structure(pdb, entry_id)
finally:
pdb._close() # pylint: disable=protected-access
pdb._close() # pylint: disable=protected-access

# build a contact from two residues
residue0 = structure.chains[0].residues[0]
Expand All @@ -43,99 +51,195 @@ def test_graph_build_and_export(): # pylint: disable=too-many-locals
edge01 = Edge(contact01)

# add features to the nodes and edge
node_feature_name = "node_features"
edge_feature_name = "edge_features"
node_feature_singlevalue_name = "singlevalue_features"

node0.features[node_feature_name] = np.array([0.1])
node1.features[node_feature_name] = np.array([1.0])
edge01.features[edge_feature_name] = np.array([2.0])
node0.features[node_feature_singlevalue_name] = 1.0
node1.features[node_feature_singlevalue_name] = 0.0
node0.features[node_feature_narray] = np.array([0.1, 0.1, 0.5])
node1.features[node_feature_narray] = np.array([1.0, 0.9, 0.5])
edge01.features[edge_feature_narray] = np.array([2.0])
node0.features[node_feature_singleton] = 1.0
node1.features[node_feature_singleton] = 0.0

# set node positions, for the grid mapping
node0.features[Nfeat.POSITION] = get_residue_center(residue0)
node1.features[Nfeat.POSITION] = get_residue_center(residue1)

# init the graph
graph = Graph(structure.id)
graph.center = np.mean(
[node0.features[Nfeat.POSITION], node1.features[Nfeat.POSITION]],
axis=0)
graph.targets[target_name] = target_value

graph.add_node(node0)
graph.add_node(node1)
graph.add_edge(edge01)
return graph


def test_graph_write_to_hdf5(graph):
"""Test that the graph is correctly written to hdf5 file.
"""

# create a temporary hdf5 file to write to
tmp_dir_path = tempfile.mkdtemp()

hdf5_path = os.path.join(tmp_dir_path, "101m.hdf5")

# target name and value
target_name = "target1"
target_value = 1.0
try:
# init the graph
graph = Graph(structure.id)
graph.center = np.mean([node0.features[Nfeat.POSITION], node1.features[Nfeat.POSITION]], axis=0)
graph.targets[target_name] = target_value

graph.add_node(node0)
graph.add_node(node1)
graph.add_edge(edge01)

# export graph to hdf5
graph.write_to_hdf5(hdf5_path)

# export grid to hdf5
grid_settings = GridSettings([20, 20, 20], [20.0, 20.0, 20.0])
assert np.all(grid_settings.resolutions == np.array((1.0, 1.0, 1.0)))

axis, angle = get_rot_axis_angle(412346587)
augmentation = Augmentation(axis, angle)

graph.write_as_grid_to_hdf5(hdf5_path, grid_settings, MapMethod.GAUSSIAN)
graph.write_as_grid_to_hdf5(hdf5_path, grid_settings, MapMethod.GAUSSIAN, augmentation)

# check the contents of the hdf5 file
with h5py.File(hdf5_path, "r") as f5:
entry_group = f5[entry_id]

# check for graph values
# nodes
assert Nfeat.NODE in entry_group
node_features_group = entry_group[Nfeat.NODE]
assert node_feature_name in node_features_group
assert len(np.nonzero(node_features_group[node_feature_name][()])) > 0

assert node_feature_narray in node_features_group
assert len(np.nonzero(
node_features_group[node_feature_narray][()])) > 0
assert node_features_group[node_feature_narray][()].shape == (2, 3)
assert node_features_group[node_feature_singleton][()].shape == (
2, )

# edges
assert Efeat.EDGE in entry_group
edge_features_group = entry_group[Efeat.EDGE]
assert edge_feature_name in edge_features_group
assert len(np.nonzero(edge_features_group[edge_feature_name][()])) > 0

assert edge_feature_narray in edge_features_group
assert len(np.nonzero(
edge_features_group[edge_feature_narray][()])) > 0
assert edge_features_group[edge_feature_narray][()].shape == (1, 1)
assert Efeat.INDEX in edge_features_group
assert len(np.nonzero(edge_features_group[Efeat.INDEX][()])) > 0

# check for grid-mapped values
# target
assert entry_group[Target.VALUES][target_name][()] == target_value

finally:
shutil.rmtree(tmp_dir_path) # clean up after the test


def test_graph_write_as_grid_to_hdf5(graph):
"""Test that the graph is correctly written to hdf5 file as a grid.
"""

# create a temporary hdf5 file to write to
tmp_dir_path = tempfile.mkdtemp()

hdf5_path = os.path.join(tmp_dir_path, "101m.hdf5")

try:

# export grid to hdf5
grid_settings = GridSettings([20, 20, 20], [20.0, 20.0, 20.0])
assert np.all(grid_settings.resolutions == np.array((1.0, 1.0, 1.0)))

graph.write_as_grid_to_hdf5(hdf5_path, grid_settings,
MapMethod.GAUSSIAN)

# check the contents of the hdf5 file
with h5py.File(hdf5_path, "r") as f5:
entry_group = f5[entry_id]

# mapped features
assert gridstorage.MAPPED_FEATURES in entry_group
mapped_group = entry_group[gridstorage.MAPPED_FEATURES]

for feature_name in (node_feature_name, edge_feature_name):
feature_name = f"{feature_name}_000"
## narray features
for feature_name in [
f"{node_feature_narray}_000", f"{node_feature_narray}_001",
f"{node_feature_narray}_002", f"{edge_feature_narray}_000"
]:

assert (
feature_name in mapped_group
), f"missing mapped feature {feature_name}"
assert feature_name in mapped_group
feature_name
in mapped_group), f"missing mapped feature {feature_name}"
data = mapped_group[feature_name][()]
assert len(np.nonzero(data)) > 0, f"{feature_name}: all zero"
assert np.all(data.shape == tuple(grid_settings.points_counts))
## single value features
data = mapped_group[node_feature_singleton][()]
assert len(np.nonzero(data)) > 0, f"{feature_name}: all zero"
assert np.all(data.shape == tuple(grid_settings.points_counts))

# check that the feature value is preserved after augmentation
unaugmented_data = mapped_group[node_feature_singlevalue_name][:]

# Check the value
# target
assert entry_group[Target.VALUES][target_name][()] == target_value

# check that the augmented data is the same, just different orientation
entry_group = f5[f"{entry_id}_000"]
mapped_group = entry_group[gridstorage.MAPPED_FEATURES]
augmented_data = mapped_group[node_feature_singlevalue_name][:]
finally:
shutil.rmtree(tmp_dir_path) # clean up after the test

# Check the value
assert entry_group[Target.VALUES][target_name][()] == target_value

assert np.abs(np.sum(augmented_data) - np.sum(unaugmented_data)).item() < 0.1
def test_graph_augmented_write_as_grid_to_hdf5(graph):
"""Test that the graph is correctly written to hdf5 file as a grid.
"""

# create a temporary hdf5 file to write to
tmp_dir_path = tempfile.mkdtemp()

hdf5_path = os.path.join(tmp_dir_path, "101m.hdf5")

try:

# export grid to hdf5
grid_settings = GridSettings([20, 20, 20], [20.0, 20.0, 20.0])
assert np.all(grid_settings.resolutions == np.array((1.0, 1.0, 1.0)))

# save to hdf5
graph.write_as_grid_to_hdf5(hdf5_path, grid_settings,
MapMethod.GAUSSIAN)

# two data points augmentation
axis, angle = get_rot_axis_angle(randrange(100))
augmentation = Augmentation(axis, angle)
graph.write_as_grid_to_hdf5(hdf5_path, grid_settings,
MapMethod.GAUSSIAN, augmentation)
axis, angle = get_rot_axis_angle(randrange(100))
augmentation = Augmentation(axis, angle)
graph.write_as_grid_to_hdf5(hdf5_path, grid_settings,
MapMethod.GAUSSIAN, augmentation)

# check the contents of the hdf5 file
with h5py.File(hdf5_path, "r") as f5:
assert list(
f5.keys()) == [entry_id, f"{entry_id}_000", f"{entry_id}_001"]
entry_group = f5[entry_id]
mapped_group = entry_group[gridstorage.MAPPED_FEATURES]
# check that the feature value is preserved after augmentation
unaugmented_data = mapped_group[node_feature_singleton][:]

for aug_id in [f"{entry_id}_000", f"{entry_id}_001"]:
entry_group = f5[aug_id]

# mapped features
assert gridstorage.MAPPED_FEATURES in entry_group
mapped_group = entry_group[gridstorage.MAPPED_FEATURES]
## narray features
for feature_name in [
f"{node_feature_narray}_000",
f"{node_feature_narray}_001",
f"{node_feature_narray}_002",
f"{edge_feature_narray}_000"
]:

assert (feature_name in mapped_group
), f"missing mapped feature {feature_name}"
data = mapped_group[feature_name][()]
assert len(
np.nonzero(data)) > 0, f"{feature_name}: all zero"
assert np.all(
data.shape == tuple(grid_settings.points_counts))
## single value features
data = mapped_group[node_feature_singleton][()]
assert len(np.nonzero(data)) > 0, f"{feature_name}: all zero"
assert np.all(data.shape == tuple(grid_settings.points_counts))
# check that the augmented data is the same, just different orientation
assert (f5[f"{entry_id}/grid_points/center"][(
)] == f5[f"{aug_id}/grid_points/center"][()]).all()
assert np.abs(np.sum(data) -
np.sum(unaugmented_data)).item() < 0.11

# target
assert entry_group[Target.VALUES][target_name][(
)] == target_value

finally:
shutil.rmtree(tmp_dir_path) # clean up after the test