Skip to content

Commit

Permalink
Merge pull request #51 from NREL/gb/silu
Browse files Browse the repository at this point in the history
Gb/silu
  • Loading branch information
grantbuster authored Jul 23, 2024
2 parents d5da2f1 + 7ae6ebb commit 3bdbabf
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 2 deletions.
187 changes: 187 additions & 0 deletions phygnn/layers/custom_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,3 +931,190 @@ def call(self, x):
"""
const = tf.constant(value=self.value, shape=x.shape, dtype=x.dtype)
return self.fun((x, const))


class SigLin(tf.keras.layers.Layer):
"""Sigmoid linear unit. This can be used to set a soft minimum on a range.
y = 1/(1+exp(-x)) where x<0.5
y = x + 0.5 where x>=0.5
"""

def call(self, x):
"""Operates on x with SigLin
Parameters
----------
x : tf.Tensor
Input tensor
Returns
-------
x : tf.Tensor
Output tensor with same shape as input x operated on by SigLin
"""

return tf.math.maximum(tf.math.sigmoid(x), x + 0.5)


class LogTransform(tf.keras.layers.Layer):
"""Log transform or inverse transform of data
``y = log(x + adder) * scalar`` or
``y = exp(x / scalar) - adder`` for the inverse
"""

def __init__(self, name=None, adder=0, scalar=1, inverse=False, idf=None):
"""
Parameters
----------
name : str | None
Name of the tensorflow layer
adder : float
Adder term for ``y = log(x + adder) * scalar``
scalar : float
Scalar term for ``y = log(x + adder) * scalar``
inverse : bool
Option to perform the inverse operation e.g.
``y = exp(x / scalar) - adder``
idf : int | list | None
One or more feature channel indices to perform log transform on.
None will perform transform on all feature channels.
"""

super().__init__(name=name)
self.adder = adder
self.scalar = scalar
self.inverse = inverse
self.rank = None
self.idf = [idf] if isinstance(idf, int) else idf

def build(self, input_shape):
"""Custom implementation of the tf layer build method.
Parameters
----------
input_shape : tuple
Shape tuple of the input
"""
self.rank = len(input_shape)

def _logt(self, x):
if not self.inverse:
return tf.math.log(x + self.adder) * self.scalar
else:
return tf.math.exp(x / self.scalar) - self.adder

def call(self, x):
"""Operates on x with (inverse) log transform
Parameters
----------
x : tf.Tensor
Input tensor
Returns
-------
y : tf.Tensor
Log-transformed x tensor
"""

if self.idf is None:
return self._logt(x)
else:
out = []
for idf in range(x.shape[-1]):
if idf in self.idf:
out.append(self._logt(x[..., idf:idf + 1]))
else:
out.append(x[..., idf:idf + 1])

out = tf.concat(out, -1, name='concat')
return out


class UnitConversion(tf.keras.layers.Layer):
"""Layer to convert units per feature channel using the linear transform:
``y = x * scalar + adder``
Be sure to check how this will interact with normalization factors.
"""

def __init__(self, name=None, adder=0, scalar=1):
"""
Parameters
----------
name : str | None
Name of the tensorflow layer
adder : float | list
Adder term for ``y = x * scalar + adder``. If this is a float, the
same value will be used for all feature channels. If this is a
list, each value will be used for the corresponding feature channel
and the length must match the number of feature channels
scalar : float | list
Scalar term for ``y = x * scalar + adder``. If this is a float, the
same value will be used for all feature channels. If this is a
list, each value will be used for the corresponding feature channel
and the length must match the number of feature channels
"""

super().__init__(name=name)
self.adder = adder
self.scalar = scalar
self.rank = None

def build(self, input_shape):
"""Custom implementation of the tf layer build method.
Parameters
----------
input_shape : tuple
Shape tuple of the input
"""
self.rank = len(input_shape)
nfeat = input_shape[-1]

dtypes = (int, np.int64, np.int32, float, np.float32, np.float64)

if isinstance(self.adder, dtypes):
self.adder = np.ones(nfeat) * self.adder
self.adder = tf.convert_to_tensor(self.adder, dtype=tf.float32)
else:
msg = (f'UnitConversion layer `adder` array has length '
f'{len(self.adder)} but input shape has last dimension '
f'as {input_shape[-1]}')
assert len(self.adder) == input_shape[-1], msg

if isinstance(self.scalar, dtypes):
self.scalar = np.ones(nfeat) * self.scalar
self.scalar = tf.convert_to_tensor(self.scalar, dtype=tf.float32)
else:
msg = (f'UnitConversion layer `scalar` array has length '
f'{len(self.scalar)} but input shape has last dimension '
f'as {input_shape[-1]}')
assert len(self.scalar) == input_shape[-1], msg

def call(self, x):
"""Convert units
Parameters
----------
x : tf.Tensor
Input tensor
Returns
-------
y : tf.Tensor
Unit-converted x tensor
"""

if self.rank is None:
self.build(x.shape)

out = []
for idf, (adder, scalar) in enumerate(zip(self.adder, self.scalar)):
out.append(x[..., idf:idf + 1] * scalar + adder)

out = tf.concat(out, -1, name='concat')

return out
2 changes: 1 addition & 1 deletion phygnn/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
"""Physics Guided Neural Network version."""

__version__ = '0.0.28'
__version__ = '0.0.29'
74 changes: 73 additions & 1 deletion tests/test_layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
TileLayer,
FunctionalLayer,
GaussianAveragePooling2D,
SigLin,
LogTransform,
UnitConversion,
)
from phygnn.layers.handlers import HiddenLayers, Layers
from phygnn import TfModel
Expand Down Expand Up @@ -109,7 +112,7 @@ def test_skip_connection():
{'class': 'SkipConnection', 'name': 'a'},
{'class': 'Conv2D', 'filters': 4, 'kernel_size': 3,
'activation': 'relu', 'padding': 'same'},
]
]
layers = HiddenLayers(hidden_layers)
assert len(layers.layers) == 5

Expand Down Expand Up @@ -504,3 +507,72 @@ def test_gaussian_pooling():
layer = model2.layers[0]
x_in = np.random.uniform(0, 1, (10, 24, 24, 3))
_ = model2.predict(x_in)


def test_siglin():
"""Test the sigmoid linear layer"""
n_points = 1000
mid = n_points // 2
sl = SigLin()
x = np.linspace(-10, 10, n_points + 1)
y = sl(x).numpy()
assert x.shape == y.shape
assert (y > 0).all()
assert np.allclose(y[mid:], x[mid:] + 0.5)


def test_logtransform():
"""Test the log transform layer"""
n_points = 1000
lt = LogTransform(adder=0)
x = np.linspace(0, 10, n_points + 1)
y = lt(x).numpy()
assert x.shape == y.shape
assert y[0] == -np.inf

lt = LogTransform(adder=1)
ilt = LogTransform(adder=1, inverse=True)
x = np.random.uniform(0, 10, (n_points + 1, 2))
y = lt(x).numpy()
xinv = ilt(y).numpy()
assert not np.isnan(y).any()
assert np.allclose(y, np.log(x + 1))
assert np.allclose(x, xinv)

lt = LogTransform(adder=1, idf=1)
ilt = LogTransform(adder=1, inverse=True, idf=1)
x = np.random.uniform(0, 10, (n_points + 1, 2))
y = lt(x).numpy()
xinv = ilt(y).numpy()
assert np.allclose(x[:, 0], y[:, 0])
assert not np.allclose(x[:, 1], y[:, 1])
assert not np.isnan(y).any()
assert np.allclose(y[:, 1], np.log(x[:, 1] + 1))
assert np.allclose(x, xinv)


def test_unit_conversion():
"""Test the custom unit conversion layer"""
x = np.random.uniform(0, 1, (1, 10, 10, 4)) # 4 features

layer = UnitConversion(adder=0, scalar=1)
y = layer(x).numpy()
assert np.allclose(x, y)

layer = UnitConversion(adder=1, scalar=1)
y = layer(x).numpy()
assert (y >= 1).all() and (y <= 2).all()

layer = UnitConversion(adder=1, scalar=100)
y = layer(x).numpy()
assert (y >= 1).all() and (y > 90).any() and (y <= 101).all()

layer = UnitConversion(adder=0, scalar=[100, 1, 1, 1])
y = layer(x).numpy()
assert (y[..., 0] > 90).any() and (y[..., 0] <= 100).all()
assert (y[..., 1:] >= 0).all() and (y[..., 1:] <= 1).all()

with pytest.raises(AssertionError):
# bad number of scalar values
layer = UnitConversion(adder=0, scalar=[100, 1, 1])
y = layer(x)

0 comments on commit 3bdbabf

Please sign in to comment.