Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added dask data interface #974

Merged
merged 8 commits into from
Nov 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion holoviews/core/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
except ImportError:
pass

try:
from .dask import DaskInterface
datatypes.append('dask')
except ImportError:
pass

from ..dimension import Dimension
from ..element import Element
from ..spaces import HoloMap, DynamicMap
Expand Down Expand Up @@ -311,7 +317,7 @@ def __getitem__(self, slices):
selection = dict(zip(self.dimensions(label=True), slices))
data = self.select(**selection)
if value_select:
if len(data) == 1:
if data.shape[0] == 1:
return data[value_select][0]
else:
return data.reindex(vdims=[value_select])
Expand Down
241 changes: 241 additions & 0 deletions holoviews/core/data/dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
from __future__ import absolute_import

try:
import itertools.izip as zip
except ImportError:
pass

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.dataframe import DataFrame
from dask.dataframe.core import Scalar

from .. import util
from ..element import Element
from ..ndmapping import NdMapping, item_check
from .interface import Interface
from .pandas import PandasInterface


class DaskInterface(PandasInterface):
"""
The DaskInterface allows a Dataset objects to wrap a dask
DataFrame object. Using dask allows loading data lazily
and performing out-of-core operations on the data, making
it possible to work on datasets larger than memory.

The DaskInterface covers almost the complete API exposed
by the PandasInterface with two notable exceptions:

1) Sorting is not supported and any attempt at sorting will
be ignored with an warning.
2) Dask does not easily support adding a new column to an existing
dataframe unless it is a scalar, add_dimension will therefore
error when supplied a non-scalar value.
4) Not all functions can be easily applied to a dask dataframe so
some functions applied with aggregate and reduce will not work.
"""

types = (DataFrame,)

datatype = 'dask'

default_partitions = 100

@classmethod
def init(cls, eltype, data, kdims, vdims):
data, kdims, vdims = PandasInterface.init(eltype, data, kdims, vdims)
if not isinstance(data, DataFrame):
data = dd.from_pandas(data, npartitions=cls.default_partitions, sort=False)
return data, kdims, vdims

@classmethod
def shape(cls, dataset):
return (len(dataset.data), len(dataset.data.columns))

@classmethod
def range(cls, columns, dimension):
column = columns.data[columns.get_dimension(dimension).name]
if column.dtype.kind == 'O':
column = np.sort(column[column.notnull()].compute())
return column[0], column[-1]
else:
return dd.compute(column.min(), column.max())

@classmethod
def sort(cls, columns, by=[]):
columns.warning('Dask dataframes do not support sorting')
return columns.data

@classmethod
def values(cls, columns, dim, expanded=True, flat=True):
data = columns.data[dim]
if not expanded:
data = data.unique()
return data.compute().values

@classmethod
def select_mask(cls, dataset, selection):
"""
Given a Dataset object and a dictionary with dimension keys and
selection keys (i.e tuple ranges, slices, sets, lists or literals)
return a boolean mask over the rows in the Dataset object that
have been selected.
"""
select_mask = None
for dim, k in selection.items():
if isinstance(k, tuple):
k = slice(*k)
masks = []
series = dataset.data[dim]
if isinstance(k, slice):
if k.start is not None:
masks.append(k.start <= series)
if k.stop is not None:
masks.append(series < k.stop)
elif isinstance(k, (set, list)):
iter_slc = None
for ik in k:
mask = series == ik
if iter_slc is None:
iter_slc = mask
else:
iter_slc |= mask
masks.append(iter_slc)
elif callable(k):
masks.append(k(series))
else:
masks.append(series == k)
for mask in masks:
if select_mask:
select_mask &= mask
else:
select_mask = mask
return select_mask

@classmethod
def select(cls, columns, selection_mask=None, **selection):
df = columns.data
if selection_mask is not None:
return df[selection_mask]
selection_mask = cls.select_mask(columns, selection)
indexed = cls.indexed(columns, selection)
df = df if selection_mask is None else df[selection_mask]
if indexed and len(df) == 1:
return df[columns.vdims[0].name].compute().iloc[0]
return df

@classmethod
def groupby(cls, columns, dimensions, container_type, group_type, **kwargs):
index_dims = [columns.get_dimension(d) for d in dimensions]
element_dims = [kdim for kdim in columns.kdims
if kdim not in index_dims]

group_kwargs = {}
if group_type != 'raw' and issubclass(group_type, Element):
group_kwargs = dict(util.get_param_values(columns),
kdims=element_dims)
group_kwargs.update(kwargs)

data = []
groupby = columns.data.groupby(dimensions)
ind_array = columns.data[dimensions].compute().values
indices = (tuple(ind) for ind in ind_array)
for coord in util.unique_iterator(indices):
if any(isinstance(c, float) and np.isnan(c) for c in coord):
continue
if len(coord) == 1:
coord = coord[0]
group = group_type(groupby.get_group(coord), **group_kwargs)
data.append((coord, group))
if issubclass(container_type, NdMapping):
with item_check(False):
return container_type(data, kdims=index_dims)
else:
return container_type(data)

@classmethod
def aggregate(cls, columns, dimensions, function, **kwargs):
data = columns.data
cols = [d.name for d in columns.kdims if d in dimensions]
vdims = columns.dimensions('value', True)
dtypes = data.dtypes
numeric = [c for c, dtype in zip(dtypes.index, dtypes.values)
if dtype.kind in 'iufc' and c in vdims]
reindexed = data[cols+numeric]

inbuilts = {'amin': 'min', 'amax': 'max', 'mean': 'mean',
'std': 'std', 'sum': 'sum', 'var': 'var'}
if len(dimensions):
groups = reindexed.groupby(cols, sort=False)
if (function.__name__ in inbuilts):
agg = getattr(groups, inbuilts[function.__name__])()
else:
agg = groups.apply(function)
return agg.reset_index()
else:
if (function.__name__ in inbuilts):
agg = getattr(reindexed, inbuilts[function.__name__])()
else:
raise NotImplementedError
return pd.DataFrame(agg.compute()).T

@classmethod
def unpack_scalar(cls, columns, data):
"""
Given a columns object and data in the appropriate format for
the interface, return a simple scalar.
"""
if len(data.columns) > 1 or len(data) != 1:
return data
if isinstance(data, dd.DataFrame):
data = data.compute()
return data.iat[0,0]

@classmethod
def sample(cls, columns, samples=[]):
data = columns.data
dims = columns.dimensions('key', label=True)
mask = None
for sample in samples:
if np.isscalar(sample): sample = [sample]
for i, (c, v) in enumerate(zip(dims, sample)):
dim_mask = data[c]==v
if mask is None:
mask = dim_mask
else:
mask |= dim_mask
return data[mask]

@classmethod
def add_dimension(cls, columns, dimension, dim_pos, values, vdim):
data = columns.data
if dimension.name not in data.columns:
if not np.isscalar(values):
err = ('Dask dataframe does not support assigning '
'non-scalar value.')
raise NotImplementedError(err)
data = data.assign(**{dimension.name: values})
return data

@classmethod
def concat(cls, columns_objs):
cast_objs = cls.cast(columns_objs)
return dd.concat([col.data for col in cast_objs])

@classmethod
def dframe(cls, columns, dimensions):
return columns.data.compute()

@classmethod
def length(cls, dataset):
"""
Length of dask dataframe is unknown, always return 1
for performance, use shape to compute dataframe shape.
"""
return 1



Interface.register(DaskInterface)
21 changes: 15 additions & 6 deletions holoviews/core/data/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ def init(cls, eltype, data, kdims, vdims):
return data, {'kdims':kdims, 'vdims':vdims}, {}


@classmethod
def validate(cls, dataset):
not_found = [d for d in dataset.dimensions(label=True)
if d not in dataset.data.columns]
if not_found:
raise ValueError("Supplied data does not contain specified "
"dimensions, the following dimensions were "
"not found: %s" % repr(not_found))


@classmethod
def range(cls, columns, dimension):
column = columns.data[columns.get_dimension(dimension).name]
Expand Down Expand Up @@ -125,9 +135,10 @@ def aggregate(cls, columns, dimensions, function, **kwargs):
data = columns.data
cols = [d.name for d in columns.kdims if d in dimensions]
vdims = columns.dimensions('value', True)
reindexed = data.reindex(columns=cols+vdims)
reindexed = data[cols+vdims]
if len(dimensions):
return reindexed.groupby(cols, sort=False).aggregate(function, **kwargs).reset_index()
grouped = reindexed.groupby(cols, sort=False)
return grouped.aggregate(function, **kwargs).reset_index()
else:
agg = reindexed.apply(function, **kwargs)
return pd.DataFrame.from_items([(col, [v]) for col, v in
Expand Down Expand Up @@ -185,11 +196,9 @@ def select(cls, columns, selection_mask=None, **selection):
@classmethod
def values(cls, columns, dim, expanded=True, flat=True):
data = columns.data[dim]
if util.dd and isinstance(data, util.dd.Series):
data = data.compute()
if not expanded:
return util.unique_array(data)
return np.array(data)
return data.unique()
return data.values


@classmethod
Expand Down
2 changes: 1 addition & 1 deletion holoviews/element/comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def compare_bounds(cls, el1, el2, msg='Bounds'):
@classmethod
def compare_dataset(cls, el1, el2, msg='Dataset'):
cls.compare_dimensioned(el1, el2)
if len(el1) != len(el2):
if el1.shape[0] != el2.shape[0]:
raise AssertionError("%s not of matching length." % msg)
dimension_data = [(d, el1[d], el2[d]) for d in el1.dimensions()]
for dim, d1, d2 in dimension_data:
Expand Down
Loading