Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline property to track data lineage #3967

Merged
merged 25 commits into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b32522a
Add pipeline dataset property to track data lineage
jonmmease Sep 17, 2019
702e531
Add pipeline support to apply, redim, and opts accessors
jonmmease Sep 18, 2019
18d3548
Guard against accessors that wrap objects without pipeline support
jonmmease Sep 18, 2019
8ed7c5f
Fix dataset property histogram tests now that apply is added to pipeline
jonmmease Sep 18, 2019
e016dc9
Copy docstrings to Metaclass wrapping methods
jonmmease Sep 18, 2019
11d8439
change metaclass arg name to mcs
jonmmease Sep 18, 2019
7404723
Override options method for pipeline support
jonmmease Sep 18, 2019
2adf704
Add pipeline support for Dataset.map
jonmmease Sep 18, 2019
7ea2ba5
standardize names of args to pipelined_call
jonmmease Sep 18, 2019
25d7674
Fix pipeline tests now that `map` is a pipeline step
jonmmease Sep 18, 2019
f9e1f73
remove trailing whitespace
jonmmease Sep 18, 2019
b7eef37
Revert "Fix pipeline tests now that `map` is a pipeline step"
jonmmease Sep 18, 2019
9657592
Handle pipeline functions that return the same element
jonmmease Sep 18, 2019
cb7cf8d
Reset the dataset property and empty pipeline when clone replaces data
jonmmease Sep 18, 2019
931b796
Propagate dataset property through clone when _in_method
jonmmease Sep 19, 2019
2d1cbae
support relabel in pipeline
jonmmease Sep 20, 2019
388687a
Merge branch 'master' into pipeline
jonmmease Sep 20, 2019
cfa3449
Merge branch 'master' into pipeline
jonmmease Sep 20, 2019
fdfd538
Convert pipeline to be a `chain` operation
jonmmease Sep 21, 2019
1ddb237
Fix tests
jonmmease Sep 21, 2019
a610829
Update pipeline docstring
jonmmease Sep 22, 2019
1ac15d3
Remove execute_pipeline from blacklist
jonmmease Sep 22, 2019
c2a5a96
Use try/finally when setting _in_method to avoid inconsistent state
jonmmease Sep 22, 2019
50bd22a
Make chain operation default to group of element produced by last ope…
jonmmease Sep 23, 2019
ba9b4df
unused import
jonmmease Sep 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 76 additions & 4 deletions holoviews/core/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,85 @@

from collections import OrderedDict
from types import FunctionType
import copy

import param
from param.parameterized import add_metaclass

from . import util
from .pprint import PrettyPrinter


class AccessorPipelineMeta(type):
def __new__(mcs, classname, bases, classdict):
if '__call__' in classdict:
classdict['__call__'] = mcs.pipelined(classdict['__call__'])

inst = type.__new__(mcs, classname, bases, classdict)
return inst

@classmethod
def pipelined(mcs, __call__):
def pipelined_call(*args, **kwargs):
from ..operation.element import method as method_op, factory
from .data import Dataset, MultiDimensionalMapping
inst = args[0]
if not hasattr(inst._obj, '_pipeline'):
# Wrapped object doesn't support the pipeline property
return __call__(*args, **kwargs)

inst_pipeline = copy.copy(inst._obj. _pipeline)
in_method = inst._obj._in_method
if not in_method:
inst._obj._in_method = True

try:
result = __call__(*args, **kwargs)

if not in_method:
init_op = factory.instance(
output_type=type(inst),
kwargs={'mode': getattr(inst, 'mode', None)},
)
call_op = method_op.instance(
input_type=type(inst),
method_name='__call__',
args=list(args[1:]),
kwargs=kwargs,
)

if isinstance(result, Dataset):
result._pipeline = inst_pipeline.instance(
operations=inst_pipeline.operations + [
init_op, call_op
],
output_type=type(result),
)
elif isinstance(result, MultiDimensionalMapping):
for key, element in result.items():
getitem_op = method_op.instance(
input_type=type(result),
method_name='__getitem__',
args=[key],
)
element._pipeline = inst_pipeline.instance(
operations=inst_pipeline.operations + [
init_op, call_op, getitem_op
],
output_type=type(result),
)
finally:
if not in_method:
inst._obj._in_method = False

return result

pipelined_call.__doc__ = __call__.__doc__

return pipelined_call


@add_metaclass(AccessorPipelineMeta)
class Apply(object):
"""
Utility to apply a function or operation to all viewable elements
Expand Down Expand Up @@ -113,7 +185,7 @@ def function(object, **kwargs):
mapped.append((k, new_val))
return self._obj.clone(mapped, link=link_inputs)


def aggregate(self, dimensions=None, function=None, spreadfn=None, **kwargs):
"""Applies a aggregate function to all ViewableElements.

Expand Down Expand Up @@ -150,7 +222,7 @@ def select(self, **kwargs):
return self.__call__('select', **kwargs)



@add_metaclass(AccessorPipelineMeta)
class Redim(object):
"""
Utility that supports re-dimensioning any HoloViews object via the
Expand All @@ -177,7 +249,7 @@ def replace_dimensions(cls, dimensions, overrides):
list: List of dimensions with replacements applied
"""
from .dimension import Dimension

replaced = []
for d in dimensions:
if d.name in overrides:
Expand Down Expand Up @@ -305,7 +377,7 @@ def values(self, specs=None, **ranges):
return self._redim('values', specs, **ranges)



@add_metaclass(AccessorPipelineMeta)
class Opts(object):

def __init__(self, obj, mode=None):
Expand Down
172 changes: 157 additions & 15 deletions holoviews/core/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@
except ImportError:
pass

import types
import copy
import numpy as np
import param
from param.parameterized import add_metaclass, ParameterizedMetaclass

from .. import util
from ..accessors import Redim
from ..dimension import Dimension, process_dimensions
from ..dimension import (
Dimension, process_dimensions, Dimensioned, LabelledData
)
from ..element import Element
from ..ndmapping import OrderedDict
from ..ndmapping import OrderedDict, MultiDimensionalMapping
from ..spaces import HoloMap, DynamicMap
from .interface import Interface, iloc, ndloc, DataError
from .interface import Interface, iloc, ndloc
from .array import ArrayInterface
from .dictionary import DictInterface
from .grid import GridInterface
Expand Down Expand Up @@ -155,6 +160,7 @@ def __call__(self, new_type, kdims=None, vdims=None, groupby=None,
if len(kdims) == selected.ndims or not groupby:
# Propagate dataset
params['dataset'] = self._element.dataset
params['pipeline'] = self._element._pipeline
element = new_type(selected, **params)
return element.sort() if sort else element
group = selected.groupby(groupby, container_type=HoloMap,
Expand All @@ -165,7 +171,75 @@ def __call__(self, new_type, kdims=None, vdims=None, groupby=None,
return group


class PipelineMeta(ParameterizedMetaclass):

# Public methods that should not be wrapped
blacklist = ['__init__', 'clone']

def __new__(mcs, classname, bases, classdict):

for method_name in classdict:
method_fn = classdict[method_name]
if method_name in mcs.blacklist or method_name.startswith('_'):
continue
elif isinstance(method_fn, types.FunctionType):
classdict[method_name] = mcs.pipelined(method_fn, method_name)

inst = type.__new__(mcs, classname, bases, classdict)
return inst

@staticmethod
def pipelined(method_fn, method_name):
def pipelined_fn(*args, **kwargs):
from ...operation.element import method as method_op
inst = args[0]
inst_pipeline = copy.copy(getattr(inst, '_pipeline', None))
in_method = inst._in_method
if not in_method:
inst._in_method = True

try:
result = method_fn(*args, **kwargs)

op = method_op.instance(
input_type=type(inst),
method_name=method_name,
args=list(args[1:]),
kwargs=kwargs,
)

if not in_method:
if isinstance(result, Dataset):
result._pipeline = inst_pipeline.instance(
operations=inst_pipeline.operations + [op],
output_type=type(result),
)

elif isinstance(result, MultiDimensionalMapping):
for key, element in result.items():
if isinstance(element, Dataset):
getitem_op = method_op.instance(
input_type=type(result),
method_name='__getitem__',
args=[key]
)
element._pipeline = inst_pipeline.instance(
operations=inst_pipeline.operations + [
op, getitem_op
],
output_type=type(result),
)
finally:
if not in_method:
inst._in_method = False
return result

pipelined_fn.__doc__ = method_fn.__doc__

return pipelined_fn


@add_metaclass(PipelineMeta)
class Dataset(Element):
"""
Dataset provides a general baseclass for Element types that
Expand Down Expand Up @@ -201,6 +275,15 @@ class Dataset(Element):
_kdim_reductions = {}

def __init__(self, data, kdims=None, vdims=None, **kwargs):
from ...operation.element import (
chain as chain_op, factory
)
self._in_method = False
input_dataset = kwargs.pop('dataset', None)
input_pipeline = kwargs.pop(
'pipeline', None
)

if isinstance(data, Element):
pvals = util.get_param_values(data)
kwargs.update([(l, pvals[l]) for l in ['group', 'label']
Expand All @@ -217,6 +300,50 @@ def __init__(self, data, kdims=None, vdims=None, **kwargs):

self.redim = Redim(self, mode='dataset')

# Handle _pipeline property
if input_pipeline is None:
input_pipeline = chain_op.instance()

init_op = factory.instance(
output_type=type(self),
args=[],
kwargs=kwargs,
)
self._pipeline = input_pipeline.instance(
operations=input_pipeline.operations + [init_op],
output_type=type(self),
)

# Handle initializing the dataset property.
self._dataset = None
if input_dataset is not None:
self._dataset = input_dataset.clone(dataset=None, pipeline=None)

elif type(self) is Dataset:
self._dataset = self

@property
def dataset(self):
"""
The Dataset that this object was created from
"""
from . import Dataset
if self._dataset is None:
dataset = Dataset(self, _validate_vdims=False)
if hasattr(self, '_binned'):
dataset._binned = self._binned
return dataset
else:
return self._dataset

@property
def pipeline(self):
"""
Chain operation that evaluates the sequence of operations that was
used to create this object, starting with the Dataset stored in
dataset property
"""
return self._pipeline

def closest(self, coords=[], **kwargs):
"""Snaps coordinate(s) to closest coordinate in Dataset
Expand Down Expand Up @@ -880,23 +1007,38 @@ def clone(self, data=None, shared_data=True, new_type=None, *args, **overrides):
datatypes = [self.interface.datatype] + self.datatype
overrides['datatype'] = list(util.unique_iterator(datatypes))

if 'dataset' in overrides:
dataset = overrides.pop('dataset')
else:
dataset = self.dataset
if data is None:
overrides['_validate_vdims'] = False

new_dataset = super(Dataset, self).clone(data, shared_data, new_type, *args, **overrides)
if 'dataset' not in overrides:
overrides['dataset'] = self.dataset

if dataset is not None:
try:
new_dataset._dataset = dataset.clone(data=new_dataset.data, dataset=None)
except DataError:
# New dataset doesn't have the necessary dimensions to
# propagate dataset. Do nothing
pass
if 'pipeline' not in overrides:
overrides['pipeline'] = self._pipeline
elif self._in_method:
if 'dataset' not in overrides:
overrides['dataset'] = self.dataset

new_dataset = super(Dataset, self).clone(
data, shared_data, new_type, *args, **overrides
)

return new_dataset

# Overrides of superclass methods that are needed so that PipelineMeta
# will find them to wrap with pipeline support
def options(self, *args, **kwargs):
return super(Dataset, self).options(*args, **kwargs)
options.__doc__ = Dimensioned.options.__doc__

def map(self, *args, **kwargs):
return super(Dataset, self).map(*args, **kwargs)
map.__doc__ = LabelledData.map.__doc__

def relabel(self, *args, **kwargs):
return super(Dataset, self).relabel(*args, **kwargs)
relabel.__doc__ = LabelledData.relabel.__doc__

@property
def iloc(self):
"""Returns iloc indexer with support for columnar indexing.
Expand Down
Loading