Skip to content

Commit

Permalink
Merge pull request #175 from creare-com/feature/eval_decorator
Browse files Browse the repository at this point in the history
Implemented disk caching as part of the Node.
  • Loading branch information
mpu-creare authored Nov 30, 2018
2 parents 77c7aef + 0cd6257 commit de12039
Show file tree
Hide file tree
Showing 15 changed files with 681 additions and 627 deletions.
21 changes: 21 additions & 0 deletions podpac/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,27 @@
Description
"""


# Monkey match os.makedirs for Python 2 compatibility
import sys
import os
_osmakedirs = os.makedirs
def makedirs(name, mode=511, exist_ok=False):
try:
_osmakedirs(name, mode)
except Exception as e:
if exist_ok:
pass
else:
raise e
if sys.version_info.major == 2:
makedirs.__doc__ = os.makedirs.__doc__
os.makedirs = makedirs
else:
del _osmakedirs
del os
del sys

# Public API
from podpac.core.coordinates import Coordinates, crange, clinspace
from podpac.core.node import Node, NodeException
Expand Down
25 changes: 17 additions & 8 deletions podpac/core/algorithm/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
from podpac.core.coordinates import Coordinates, union
from podpac.core.units import UnitsDataArray
from podpac.core.node import Node
from podpac.core.node import NodeException
from podpac.core.node import COMMON_NODE_DOC
from podpac.core.node import node_eval
from podpac.core.utils import common_doc

COMMON_DOC = COMMON_NODE_DOC.copy()
Expand Down Expand Up @@ -51,6 +53,7 @@ def _inputs(self):
}

@common_doc(COMMON_DOC)
@node_eval
def eval(self, coordinates, output=None):
"""Evalutes this nodes using the supplied coordinates.
Expand Down Expand Up @@ -79,16 +82,22 @@ def eval(self, coordinates, output=None):
result = self.algorithm(inputs)
if isinstance(result, np.ndarray):
if output is None:
output = self.create_output_array(output_coordinates)
output.data[:] = result
else:
output = self.create_output_array(output_coordinates, data=result)
else:
output.data[:] = result
elif isinstance(result, xr.DataArray):
if output is None:
output = self.create_output_array(Coordinates.from_xarray(result.coords), data=result.data)
else:
output[:] = result.data
elif isinstance(result, UnitsDataArray):
if output is None:
output_coordinates = Coordinates.from_xarray(result.coords)
output = self.create_output_array(output_coordinates)
output[:] = result
output = output.transpose(*[dim for dim in coordinates.dims if dim in result.dims])
output = result
else:
output[:] = result
else:
raise NodeException

self._output = output
return output

def find_coordinates(self):
Expand Down
38 changes: 14 additions & 24 deletions podpac/core/algorithm/coord_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,6 @@ class ModifyCoordinates(Algorithm):
@tl.default('coordinates_source')
def _default_coordinates_source(self):
return self.source

def algorithm(self, inputs):
"""Passthrough of the source data
Arguments
----------
inputs : dict
Evaluated output of the input nodes. The keys are the attribute names.
Returns
-------
UnitDataArray
Source evaluated at the expanded coordinates
"""
return inputs['source']

@common_doc(COMMON_DOC)
def eval(self, coordinates, output=None):
Expand All @@ -79,18 +64,23 @@ def eval(self, coordinates, output=None):
"""

self._requested_coordinates = coordinates

modified_coordinates = Coordinates(
self.outputs = {}
self._modified_coordinates = Coordinates(
[self.get_modified_coordinates1d(coordinates, dim) for dim in coordinates.dims])
for dim in modified_coordinates.udims:
if modified_coordinates[dim].size == 0:

for dim in self._modified_coordinates.udims:
if self._modified_coordinates[dim].size == 0:
raise ValueError("Modified coordinates do not intersect with source data (dim '%s')" % dim)
output = super(ModifyCoordinates, self).eval(modified_coordinates, output=output)

# debugging
self._modified_coordinates = modified_coordinates
self._output = output
self.outputs['source'] = self.source.eval(self._modified_coordinates, output=output)

if output is None:
output = self.outputs['source']
else:
output[:] = self.outputs['source']

if self.debug:
self._output = output
return output

class ExpandCoordinates(ModifyCoordinates):
Expand Down Expand Up @@ -207,4 +197,4 @@ def get_modified_coordinates1d(self, coords, dim):
else:
raise ValueError("Invalid selection attrs for '%s'" % dim)

return coords1d
return coords1d
55 changes: 19 additions & 36 deletions podpac/core/algorithm/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from podpac.core.node import Node
from podpac.core.algorithm.algorithm import Algorithm
from podpac.core.utils import common_doc
from podpac.core.node import COMMON_NODE_DOC
from podpac.core.node import COMMON_NODE_DOC, node_eval

COMMON_DOC = COMMON_NODE_DOC.copy()
COMMON_DOC['full_kernel'] = '''Kernel that contains all the dimensions of the input source, in the correct order.
Expand Down Expand Up @@ -77,6 +77,7 @@ class Convolution(Algorithm):
_full_kernel = tl.Instance(np.ndarray)

@common_doc(COMMON_DOC)
@node_eval
def eval(self, coordinates, output=None):
"""Evaluates this nodes using the supplied coordinates.
Expand All @@ -91,20 +92,18 @@ def eval(self, coordinates, output=None):
-------
{eval_return}
"""
self._requested_coordinates = coordinates

# This should be aligned with coordinates' dimension order
# The size of this kernel is used to figure out the expanded size
self._full_kernel = self.get_full_kernel(coordinates)
shape = self._full_kernel.shape

if len(shape) != len(coordinates.shape):
raise ValueError("Kernel shape does not match source data shape")
if len(self._full_kernel.shape) != len(coordinates.shape):
raise ValueError("shape mismatch, kernel does not match source data (%s != %s)" % (
self._full_kernel.shape, coordinates.shape))

# expand the coordinates
exp_coords = []
exp_slice = []
for dim, s in zip(coordinates.dims, shape):
for dim, s in zip(coordinates.dims, self._full_kernel.shape):
coord = coordinates[dim]
if s == 1 or not isinstance(coord, UniformCoordinates1d):
exp_coords.append(coord)
Expand All @@ -121,21 +120,25 @@ def eval(self, coordinates, output=None):
coord.step,
**coord.properties))
exp_slice.append(slice(-s_start, -s_end))
exp_coords = Coordinates(exp_coords)
exp_slice = tuple(exp_slice)
self._expanded_coordinates = Coordinates(exp_coords)

# evaluate source using expanded coordinates, convolve, and then slice out original coordinates
source = self.source.eval(self._expanded_coordinates)

if np.isnan(np.max(source)):
method = 'direct'
else:
method = 'auto'

# evaluate using expanded coordinates and then reduce down to originally requested coordinates
out = super(Convolution, self).eval(exp_coords)
result = out[exp_slice]
result = scipy.signal.convolve(source, self._full_kernel, mode='same', method=method)
result = result[exp_slice]

if output is None:
output = result
output = self.create_output_array(coordinates, data=result)
else:
output[:] = result

# debugging
self._expanded_coordinates = exp_coords
self._output = output

return output

@tl.default('kernel')
Expand Down Expand Up @@ -163,26 +166,6 @@ def get_full_kernel(self, coordinates):
"""
return self.kernel

def algorithm(self, inputs):
"""Computes the convolution of the source and the kernel
Arguments
----------
inputs : dict
evaluated outputs of the input nodes. The keys are the attribute names.
Returns
-------
np.ndarray
Resultant array.
"""
if np.isnan(np.max(inputs['source'])):
method = 'direct'
else:
method = 'auto'
res = scipy.signal.convolve(inputs['source'], self._full_kernel, mode='same', method=method)
return res


class TimeConvolution(Convolution):
"""Compute a temporal convolution over a source node.
Expand Down
8 changes: 4 additions & 4 deletions podpac/core/algorithm/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from podpac.core.node import Node
from podpac.core.algorithm.algorithm import Algorithm
from podpac.core.utils import common_doc
from podpac.core.node import COMMON_NODE_DOC
from podpac.core.node import COMMON_NODE_DOC, node_eval

COMMON_DOC = COMMON_NODE_DOC.copy()

Expand Down Expand Up @@ -162,6 +162,7 @@ def iteroutputs(self, coordinates):
yield self.source.eval(chunk)

@common_doc(COMMON_DOC)
@node_eval
def eval(self, coordinates, output=None):
"""Evaluates this nodes using the supplied coordinates.
Expand Down Expand Up @@ -204,7 +205,6 @@ def eval(self, coordinates, output=None):
else:
output[:] = result

self._output = output
return output

def reduce(self, x):
Expand Down Expand Up @@ -907,6 +907,7 @@ def _get_source_coordinates(self, requested_coordinates):
return coords

@common_doc(COMMON_DOC)
@node_eval
def eval(self, coordinates, output=None):
"""Evaluates this nodes using the supplied coordinates.
Expand All @@ -926,7 +927,7 @@ def eval(self, coordinates, output=None):
ValueError
If source it not time-depended (required by this node).
"""
self._requested_coordinates = coordinates

self._source_coordinates = self._get_source_coordinates(coordinates)

if output is None:
Expand All @@ -950,7 +951,6 @@ def eval(self, coordinates, output=None):
out = out.sel(**{self.groupby:E}).rename({self.groupby: 'time'})
output[:] = out.transpose(*output.dims).data

self._output = output
return output

def base_ref(self):
Expand Down
26 changes: 15 additions & 11 deletions podpac/core/cache/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
from glob import glob
import shutil
from hashlib import md5 as hash_alg

try:
import cPickle # Python 2.7
Expand Down Expand Up @@ -140,19 +141,20 @@ def has(self, node, key, coordinates=None, mode=None):
class CacheStore(object):

def get_hash_val(self, obj):
return hash(obj)
return hash_alg(obj).hexdigest()

def hash_node(self, node):
hashable_repr = cPickle.dumps(node.definition)
return self.get_hash_val(hashable_repr)
hashable_repr = 'None' if node is None else node.hash
return hashable_repr

def hash_coordinates(self, coordinates):
hashable_repr = None if coordinates is None else coordinates.json
return self.get_hash_val(hashable_repr)
hashable_repr = 'None' if coordinates is None else coordinates.hash
return hashable_repr

def hash_key(self, key):
hashable_repr = str(repr(key))
return self.get_hash_val(hashable_repr)
#hashable_repr = str(repr(key)).encode('utf-8')
#return self.get_hash_val(hashable_repr)
return key

def put(self, node, data, key, coordinates=None, update=False):
'''Cache data for specified node.
Expand Down Expand Up @@ -314,7 +316,7 @@ def cache_dir(self, node):
basedir = self._root_dir_path
subdir = str(node.__class__)[8:-2].split('.')
dirs = [basedir] + subdir
return os.path.join(*dirs)
return (os.path.join(*dirs)).replace(':', '_').replace('<', '_').replace('>', '_')

def cache_filename(self, node, key, coordinates):
pre = str(node.base_ref).replace('/', '_').replace('\\', '_').replace(':', '_')
Expand All @@ -329,7 +331,7 @@ def cache_filename(self, node, key, coordinates):
def cache_glob(self, node, key, coordinates):
pre = '*'
nKeY = 'nKeY{}'.format(self.hash_node(node))
kKeY = 'kKeY*' if key == '*' else 'kKeY{}'.format(self.hash_key(key))
kKeY = 'kKeY*' if key == '*' else 'kKeY{}'.format(self.cleanse_filename_str(self.hash_key(key)))
cKeY = 'cKeY*' if coordinates == '*' else 'cKeY{}'.format(self.hash_coordinates(coordinates))
filename = '_'.join([pre, nKeY, kKeY, cKeY])
filename = filename + '.' + self._extension
Expand All @@ -339,11 +341,12 @@ def cache_path(self, node, key, coordinates):
return os.path.join(self.cache_dir(node), self.cache_filename(node, key, coordinates))

def cleanse_filename_str(self, s):
s = s.replace('/', '_').replace('\\', '_').replace(':', '_')
s = s.replace('/', '_').replace('\\', '_').replace(':', '_').replace('<', '_').replace('>', '_')
s = s.replace('nKeY', 'xxxx').replace('kKeY', 'xxxx').replace('cKeY', 'xxxx')
return s

def put(self, node, data, key, coordinates=None, update=False):

self.make_cache_dir(node)
listing = CacheListing(node=node, key=key, coordinates=coordinates, data=data)
if self.has(node, key, coordinates): # a little inefficient but will do for now
Expand Down Expand Up @@ -425,4 +428,5 @@ def has(self, node, key, coordinates=None):
if c.has(listing):
return True
return False



Loading

0 comments on commit de12039

Please sign in to comment.