Skip to content

Commit

Permalink
initial node_eval / node output caching decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
jmilloy committed Nov 26, 2018
1 parent e05e927 commit c902a6b
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 83 deletions.
16 changes: 7 additions & 9 deletions podpac/core/algorithm/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def _inputs(self):
}

@common_doc(COMMON_DOC)
@node_eval
def eval(self, coordinates, output=None, method=None):
"""Evalutes this nodes using the supplied coordinates.
Expand All @@ -73,8 +74,6 @@ def eval(self, coordinates, output=None, method=None):
{eval_return}
"""

self._requested_coordinates = coordinates

# evaluate input nodes and keep outputs in self.outputs
self.outputs = {}
for key, node in self._inputs.items():
Expand All @@ -87,16 +86,15 @@ def eval(self, coordinates, output=None, method=None):
result = self.algorithm()
if isinstance(result, np.ndarray):
if output is None:
output = self.create_output_array(output_coordinates)
output.data[:] = result
output = self.create_output_array(output_coordinates, data=result)
else:
output.data[:] = result
else:
if output is None:
output_coordinates = Coordinates.from_xarray(result.coords)
output = self.create_output_array(output_coordinates)
output[:] = result
output = output.transpose(*[dim for dim in coordinates.dims if dim in result.dims])
output = result
else:
output[:] = result

self._output = output
return output

def find_coordinates(self):
Expand Down
32 changes: 13 additions & 19 deletions podpac/core/algorithm/coord_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,6 @@ class ModifyCoordinates(Algorithm):
@tl.default('coordinates_source')
def _default_coordinates_source(self):
return self.source

This comment has been minimized.

Copy link
@mpu-creare

mpu-creare Nov 27, 2018

Contributor

Why did this block get removed?

This comment has been minimized.

Copy link
@mpu-creare

mpu-creare Nov 27, 2018

Contributor

Nevermind. I got it.

def algorithm(self):
"""Passthrough of the source data
Returns
-------
UnitDataArray
Source evaluated at the expanded coordinates
"""
return self.outputs['source']

@common_doc(COMMON_DOC)
def eval(self, coordinates, output=None, method=None):
Expand All @@ -76,23 +66,27 @@ def eval(self, coordinates, output=None, method=None):
"""

self._requested_coordinates = coordinates

modified_coordinates = Coordinates(
self.outputs = {}
self._modified_coordinates = Coordinates(
[self.get_modified_coordinates1d(coordinates, dim) for dim in coordinates.dims])
for dim in modified_coordinates.udims:
if modified_coordinates[dim].size == 0:

for dim in self._modified_coordinates.udims:
if self._modified_coordinates[dim].size == 0:
raise ValueError("Modified coordinates do not intersect with source data (dim '%s')" % dim)
output = super(ModifyCoordinates, self).eval(modified_coordinates, output=output, method=method)

# debugging
self._modified_coordinates = modified_coordinates
self._output = output
self.outputs['source'] = self.source.eval(self._modified_coordinates, output=output, method=method)

if output = None:
output = self.outputs['source']
else:
output[:] = self.outputs['source']

self._output = output
return output

class ExpandCoordinates(ModifyCoordinates):
"""Algorithm node used to expand requested coordinates. This is normally used in conjunction with a reduce operation
to calculate, for example, the average temperature over the last month. While this is simple to do when evaluating
to calculate, for example, the average temperature over the last month. While this is simple to do when uating
a single node (just provide the coordinates), this functionality is needed for nodes buried deeper in a pipeline.
lat, lon, time, alt : List
Expand Down
46 changes: 17 additions & 29 deletions podpac/core/algorithm/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class Convolution(Algorithm):
_full_kernel = tl.Instance(np.ndarray)

@common_doc(COMMON_DOC)
@cache_output
def eval(self, coordinates, output=None, method=None):
"""Evaluates this nodes using the supplied coordinates.
Expand All @@ -91,20 +92,18 @@ def eval(self, coordinates, output=None, method=None):
-------
{eval_return}
"""
self._requested_coordinates = coordinates

# This should be aligned with coordinates' dimension order
# The size of this kernel is used to figure out the expanded size
self._full_kernel = self.get_full_kernel(coordinates)
shape = self._full_kernel.shape

if len(shape) != len(coordinates.shape):
raise ValueError("Kernel shape does not match source data shape")
if len(self._full_kernel.shape) != len(coordinates.shape):
raise ValueError("shape mismatch, kernel does not match source data (%s != %s)" % (
self._full_kernel.shape, coordinates.shape))

# expand the coordinates
exp_coords = []
exp_slice = []
for dim, s in zip(coordinates.dims, shape):
for dim, s in zip(coordinates.dims, self._full_kernel.shape):
coord = coordinates[dim]
if s == 1 or not isinstance(coord, UniformCoordinates1d):
exp_coords.append(coord)
Expand All @@ -121,21 +120,25 @@ def eval(self, coordinates, output=None, method=None):
coord.step,
**coord.properties))
exp_slice.append(slice(-s_start, -s_end))
exp_coords = Coordinates(exp_coords)
exp_slice = tuple(exp_slice)
self._expanded_coordinates = Coordinates(exp_coords)

# evaluate source using expanded coordinates, convolve, and then slice out original coordinates
self.outputs['source'] = self.source.eval(self._expanded_coordinates, method=method)

if np.isnan(np.max(self.outputs['source'])):
method = 'direct'
else:
method = 'auto'

result scipy.signal.convolve(self.outputs['source'], self._full_kernel, mode='same', method=method)
result = result[exp_slice]

# evaluate using expanded coordinates and then reduce down to originally requested coordinates
out = super(Convolution, self).eval(exp_coords, method=method)
result = out[exp_slice]
if output is None:
output = result
else:
output[:] = result

# debugging
self._expanded_coordinates = exp_coords
self._output = output

return output

@tl.default('kernel')
Expand Down Expand Up @@ -163,21 +166,6 @@ def get_full_kernel(self, coordinates):
"""
return self.kernel

def algorithm(self):
"""Computes the convolution of the source and the kernel
Returns
-------
np.ndarray
Resultant array.
"""
if np.isnan(np.max(self.outputs['source'])):
method = 'direct'
else: method = 'auto'
res = scipy.signal.convolve(self.outputs['source'], self._full_kernel, mode='same', method=method)
return res


class TimeConvolution(Convolution):
"""Specialized convolution node that computes temporal convolutions only.
Expand Down
11 changes: 5 additions & 6 deletions podpac/core/algorithm/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def iteroutputs(self, coordinates, method=None):
yield self.source.eval(chunk, method=method)

@common_doc(COMMON_DOC)
@cache_output
def eval(self, coordinates, output=None, method=None):
"""Evaluates this nodes using the supplied coordinates.
Expand All @@ -185,9 +186,8 @@ def eval(self, coordinates, output=None, method=None):
{eval_return}
"""

self._requested_coordinates = coordinates
self.dims = self.get_dims(self._requested_coordinates)
self._reduced_coordinates = self._requested_coordinates.drop(self.dims)
self.dims = self.get_dims(coordinates)
self._reduced_coordinates = coordinates.drop(self.dims)

if output is None:
output = self.create_output_array(self._reduced_coordinates)
Expand All @@ -206,7 +206,6 @@ def eval(self, coordinates, output=None, method=None):
else:
output[:] = result

self._output = output
return output

def reduce(self, x):
Expand Down Expand Up @@ -911,6 +910,7 @@ def _get_source_coordinates(self, requested_coordinates):
return coords

@common_doc(COMMON_DOC)
@cache_output
def eval(self, coordinates, output=None, method=None):
"""Evaluates this nodes using the supplied coordinates.
Expand All @@ -932,7 +932,7 @@ def eval(self, coordinates, output=None, method=None):
ValueError
If source it not time-depended (required by this node).
"""
self._requested_coordinates = coordinates

self._source_coordinates = self._get_source_coordinates(coordinates)

if output is None:
Expand All @@ -956,7 +956,6 @@ def eval(self, coordinates, output=None, method=None):
out = out.sel(**{self.groupby:E}).rename({self.groupby: 'time'})
output[:] = out.transpose(*output.dims).data

self._output = output
return output

def base_ref(self):
Expand Down
5 changes: 1 addition & 4 deletions podpac/core/compositor.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def f(src):
output[:] = np.nan

@common_doc(COMMON_DOC)
@node_eval
def eval(self, coordinates, output=None, method=None):
"""Evaluates this nodes using the supplied coordinates.
Expand All @@ -216,12 +217,8 @@ def eval(self, coordinates, output=None, method=None):
{eval_return}
"""

self._requested_coordinates = coordinates

outputs = self.iteroutputs(coordinates, method=method)
output = self.composite(outputs, output)

self._output = output
return output

def find_coordinates(self):
Expand Down
16 changes: 5 additions & 11 deletions podpac/core/data/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class will be used without modication.
# privates
_interpolation = tl.Instance(Interpolation)

_evaluated_coordinates = tl.Instance(Coordinates, allow_none=True)
_original_requested_coordinates = tl.Instance(Coordinates, allow_none=True)
_requested_source_coordinates = tl.Instance(Coordinates)
_requested_source_coordinates_index = tl.List()
_requested_source_data = tl.Instance(UnitsDataArray)
Expand Down Expand Up @@ -238,7 +238,7 @@ def eval(self, coordinates, output=None, method=None):
'`coordinate_index_type` is set to `numpy`', UserWarning)

# store requested coordinates for debugging
self._requested_coordinates = coordinates
self._original_requested_coordinates = coordinates

# check for missing dimensions
for c in self.native_coordinates.values():
Expand All @@ -264,8 +264,10 @@ def eval(self, coordinates, output=None, method=None):
extra.append(c.name)
coordinates = coordinates.drop(extra)

self._evaluated_coordinates = coordinates # TODO move this if WCS can be updated to allow that
return self._eval(coordinates, output=output, method=None)

@node_eval
def _eval(self, coordinates, output=None, method=None):
# intersect the native coordinates with requested coordinates
# to get native coordinates within requested coordinates bounds
# TODO: support coordinate_index_type parameter to define other index types
Expand All @@ -278,8 +280,6 @@ def eval(self, coordinates, output=None, method=None):
output = self.create_output_array(coordinates)
else:
output[:] = np.nan

self._output = output
return output

# reset interpolation
Expand All @@ -299,12 +299,6 @@ def eval(self, coordinates, output=None, method=None):
output = self.create_output_array(coordinates)
output = self._interpolate(coordinates, output)

# set the order of dims to be the same as that of requested_coordinates
# this is required in case the user supplied an output object with a different dims order
output = output.transpose(*coordinates.dims)

self._output = output

return output

def find_coordinates(self):
Expand Down
8 changes: 4 additions & 4 deletions podpac/core/data/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,14 +562,14 @@ def native_coordinates(self):
data wrangling for us...
"""

# TODO update so that we don't rely on _evaluated_coordinates
if not self._evaluated_coordinates:
# TODO update so that we don't rely on _requested_coordinates if possible
if not self._requested_coordinates:
return self.wcs_coordinates

cs = []
for dim in self.wcs_coordinates.dims:
if dim in self._evaluated_coordinates.dims:
c = self._evaluated_coordinates[dim]
if dim in self._requested_coordinates.dims:
c = self._requested_coordinates[dim]
if c.size == 1:
cs.append(ArrayCoordinates1d(c.coordinates[0], name=dim))
elif isinstance(c, UniformCoordinates1d):
Expand Down
46 changes: 45 additions & 1 deletion podpac/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class Node(tl.HasTraits):
cache_type = tl.Enum([None, 'disk', 'ram'], allow_none=True)
node_defaults = tl.Dict(allow_none=True)
style = tl.Instance(Style)
debug = tl.Bool(False) # TODO replace with a setting

@tl.default('style')
def _style_default(self):
Expand All @@ -101,6 +102,7 @@ def _style_default(self):
# debugging
_requested_coordinates = tl.Instance(Coordinates, allow_none=True)
_output = tl.Instance(UnitsDataArray, allow_none=True)
_from_cache = tl.Bool(allow_none=True, default_value=None)

# temporary messages
@property
Expand Down Expand Up @@ -677,4 +679,46 @@ def clear_disk_cache(self, attr='*', node_cache=False, all_cache=False):
shutil.rmtree(self.cache_dir)
else:
for f in glob.glob(self.cache_path(attr)):
os.remove(f)
os.remove(f)

def node_eval(fn):
"""
Decorator for Node eval methods that handles caching and a user provided output argument.
fn : function
Node eval method to wrap
Returns
-------
wrapper : function
Wrapped node eval method
"""

cache_key = 'output'

def wrapper(self, coordinates, output=None):
if self.debug:
self._requested_coordinates = coordinates

cache_coordinates = coordinates.transform(sorted(coordinates.dims)) # order agnostic caching
if self.has_cache(key, cache_coordinates):
data = self.get_cache(key, cache_coordinates)
if output is not None:
order = [dim for dim in output.dims if dim not in data.dims] + list(data.dims)
output.transpose(*order)[:] = data
self._from_cache = True
else:
data = fn(self, coordinates, output=output,)
self.put_cache(key, data, cache_coordinates)
self._from_cache = False

# transpose data to match the dims order of the requested coordinates
order = [dim for dim in coordinates.dims if dim in data.dims]
data = data.transpose(*order)

if self.debug:
self._output = data

return data

return wrapper

0 comments on commit c902a6b

Please sign in to comment.