diff --git a/holoviews/core/data/__init__.py b/holoviews/core/data/__init__.py index f6a0493f1e..76eb9e912a 100644 --- a/holoviews/core/data/__init__.py +++ b/holoviews/core/data/__init__.py @@ -46,6 +46,12 @@ except ImportError: pass +try: + from .dask import DaskInterface + datatypes.append('dask') +except ImportError: + pass + from ..dimension import Dimension from ..element import Element from ..spaces import HoloMap, DynamicMap @@ -311,7 +317,7 @@ def __getitem__(self, slices): selection = dict(zip(self.dimensions(label=True), slices)) data = self.select(**selection) if value_select: - if len(data) == 1: + if data.shape[0] == 1: return data[value_select][0] else: return data.reindex(vdims=[value_select]) diff --git a/holoviews/core/data/dask.py b/holoviews/core/data/dask.py new file mode 100644 index 0000000000..1871051d8e --- /dev/null +++ b/holoviews/core/data/dask.py @@ -0,0 +1,241 @@ +from __future__ import absolute_import + +try: + import itertools.izip as zip +except ImportError: + pass + +import numpy as np +import pandas as pd +import dask.dataframe as dd +from dask.dataframe import DataFrame +from dask.dataframe.core import Scalar + +from .. import util +from ..element import Element +from ..ndmapping import NdMapping, item_check +from .interface import Interface +from .pandas import PandasInterface + + +class DaskInterface(PandasInterface): + """ + The DaskInterface allows a Dataset objects to wrap a dask + DataFrame object. Using dask allows loading data lazily + and performing out-of-core operations on the data, making + it possible to work on datasets larger than memory. + + The DaskInterface covers almost the complete API exposed + by the PandasInterface with two notable exceptions: + + 1) Sorting is not supported and any attempt at sorting will + be ignored with an warning. + 2) Dask does not easily support adding a new column to an existing + dataframe unless it is a scalar, add_dimension will therefore + error when supplied a non-scalar value. + 4) Not all functions can be easily applied to a dask dataframe so + some functions applied with aggregate and reduce will not work. + """ + + types = (DataFrame,) + + datatype = 'dask' + + default_partitions = 100 + + @classmethod + def init(cls, eltype, data, kdims, vdims): + data, kdims, vdims = PandasInterface.init(eltype, data, kdims, vdims) + if not isinstance(data, DataFrame): + data = dd.from_pandas(data, npartitions=cls.default_partitions, sort=False) + return data, kdims, vdims + + @classmethod + def shape(cls, dataset): + return (len(dataset.data), len(dataset.data.columns)) + + @classmethod + def range(cls, columns, dimension): + column = columns.data[columns.get_dimension(dimension).name] + if column.dtype.kind == 'O': + column = np.sort(column[column.notnull()].compute()) + return column[0], column[-1] + else: + return dd.compute(column.min(), column.max()) + + @classmethod + def sort(cls, columns, by=[]): + columns.warning('Dask dataframes do not support sorting') + return columns.data + + @classmethod + def values(cls, columns, dim, expanded=True, flat=True): + data = columns.data[dim] + if not expanded: + data = data.unique() + return data.compute().values + + @classmethod + def select_mask(cls, dataset, selection): + """ + Given a Dataset object and a dictionary with dimension keys and + selection keys (i.e tuple ranges, slices, sets, lists or literals) + return a boolean mask over the rows in the Dataset object that + have been selected. + """ + select_mask = None + for dim, k in selection.items(): + if isinstance(k, tuple): + k = slice(*k) + masks = [] + series = dataset.data[dim] + if isinstance(k, slice): + if k.start is not None: + masks.append(k.start <= series) + if k.stop is not None: + masks.append(series < k.stop) + elif isinstance(k, (set, list)): + iter_slc = None + for ik in k: + mask = series == ik + if iter_slc is None: + iter_slc = mask + else: + iter_slc |= mask + masks.append(iter_slc) + elif callable(k): + masks.append(k(series)) + else: + masks.append(series == k) + for mask in masks: + if select_mask: + select_mask &= mask + else: + select_mask = mask + return select_mask + + @classmethod + def select(cls, columns, selection_mask=None, **selection): + df = columns.data + if selection_mask is not None: + return df[selection_mask] + selection_mask = cls.select_mask(columns, selection) + indexed = cls.indexed(columns, selection) + df = df if selection_mask is None else df[selection_mask] + if indexed and len(df) == 1: + return df[columns.vdims[0].name].compute().iloc[0] + return df + + @classmethod + def groupby(cls, columns, dimensions, container_type, group_type, **kwargs): + index_dims = [columns.get_dimension(d) for d in dimensions] + element_dims = [kdim for kdim in columns.kdims + if kdim not in index_dims] + + group_kwargs = {} + if group_type != 'raw' and issubclass(group_type, Element): + group_kwargs = dict(util.get_param_values(columns), + kdims=element_dims) + group_kwargs.update(kwargs) + + data = [] + groupby = columns.data.groupby(dimensions) + ind_array = columns.data[dimensions].compute().values + indices = (tuple(ind) for ind in ind_array) + for coord in util.unique_iterator(indices): + if any(isinstance(c, float) and np.isnan(c) for c in coord): + continue + if len(coord) == 1: + coord = coord[0] + group = group_type(groupby.get_group(coord), **group_kwargs) + data.append((coord, group)) + if issubclass(container_type, NdMapping): + with item_check(False): + return container_type(data, kdims=index_dims) + else: + return container_type(data) + + @classmethod + def aggregate(cls, columns, dimensions, function, **kwargs): + data = columns.data + cols = [d.name for d in columns.kdims if d in dimensions] + vdims = columns.dimensions('value', True) + dtypes = data.dtypes + numeric = [c for c, dtype in zip(dtypes.index, dtypes.values) + if dtype.kind in 'iufc' and c in vdims] + reindexed = data[cols+numeric] + + inbuilts = {'amin': 'min', 'amax': 'max', 'mean': 'mean', + 'std': 'std', 'sum': 'sum', 'var': 'var'} + if len(dimensions): + groups = reindexed.groupby(cols, sort=False) + if (function.__name__ in inbuilts): + agg = getattr(groups, inbuilts[function.__name__])() + else: + agg = groups.apply(function) + return agg.reset_index() + else: + if (function.__name__ in inbuilts): + agg = getattr(reindexed, inbuilts[function.__name__])() + else: + raise NotImplementedError + return pd.DataFrame(agg.compute()).T + + @classmethod + def unpack_scalar(cls, columns, data): + """ + Given a columns object and data in the appropriate format for + the interface, return a simple scalar. + """ + if len(data.columns) > 1 or len(data) != 1: + return data + if isinstance(data, dd.DataFrame): + data = data.compute() + return data.iat[0,0] + + @classmethod + def sample(cls, columns, samples=[]): + data = columns.data + dims = columns.dimensions('key', label=True) + mask = None + for sample in samples: + if np.isscalar(sample): sample = [sample] + for i, (c, v) in enumerate(zip(dims, sample)): + dim_mask = data[c]==v + if mask is None: + mask = dim_mask + else: + mask |= dim_mask + return data[mask] + + @classmethod + def add_dimension(cls, columns, dimension, dim_pos, values, vdim): + data = columns.data + if dimension.name not in data.columns: + if not np.isscalar(values): + err = ('Dask dataframe does not support assigning ' + 'non-scalar value.') + raise NotImplementedError(err) + data = data.assign(**{dimension.name: values}) + return data + + @classmethod + def concat(cls, columns_objs): + cast_objs = cls.cast(columns_objs) + return dd.concat([col.data for col in cast_objs]) + + @classmethod + def dframe(cls, columns, dimensions): + return columns.data.compute() + + @classmethod + def length(cls, dataset): + """ + Length of dask dataframe is unknown, always return 1 + for performance, use shape to compute dataframe shape. + """ + return 1 + + + +Interface.register(DaskInterface) diff --git a/holoviews/core/data/pandas.py b/holoviews/core/data/pandas.py index f2b04de1e0..08591f6129 100644 --- a/holoviews/core/data/pandas.py +++ b/holoviews/core/data/pandas.py @@ -79,6 +79,16 @@ def init(cls, eltype, data, kdims, vdims): return data, {'kdims':kdims, 'vdims':vdims}, {} + @classmethod + def validate(cls, dataset): + not_found = [d for d in dataset.dimensions(label=True) + if d not in dataset.data.columns] + if not_found: + raise ValueError("Supplied data does not contain specified " + "dimensions, the following dimensions were " + "not found: %s" % repr(not_found)) + + @classmethod def range(cls, columns, dimension): column = columns.data[columns.get_dimension(dimension).name] @@ -125,9 +135,10 @@ def aggregate(cls, columns, dimensions, function, **kwargs): data = columns.data cols = [d.name for d in columns.kdims if d in dimensions] vdims = columns.dimensions('value', True) - reindexed = data.reindex(columns=cols+vdims) + reindexed = data[cols+vdims] if len(dimensions): - return reindexed.groupby(cols, sort=False).aggregate(function, **kwargs).reset_index() + grouped = reindexed.groupby(cols, sort=False) + return grouped.aggregate(function, **kwargs).reset_index() else: agg = reindexed.apply(function, **kwargs) return pd.DataFrame.from_items([(col, [v]) for col, v in @@ -185,11 +196,9 @@ def select(cls, columns, selection_mask=None, **selection): @classmethod def values(cls, columns, dim, expanded=True, flat=True): data = columns.data[dim] - if util.dd and isinstance(data, util.dd.Series): - data = data.compute() if not expanded: - return util.unique_array(data) - return np.array(data) + return data.unique() + return data.values @classmethod diff --git a/holoviews/element/comparison.py b/holoviews/element/comparison.py index d03a491b19..f3a70dc4e2 100644 --- a/holoviews/element/comparison.py +++ b/holoviews/element/comparison.py @@ -447,7 +447,7 @@ def compare_bounds(cls, el1, el2, msg='Bounds'): @classmethod def compare_dataset(cls, el1, el2, msg='Dataset'): cls.compare_dimensioned(el1, el2) - if len(el1) != len(el2): + if el1.shape[0] != el2.shape[0]: raise AssertionError("%s not of matching length." % msg) dimension_data = [(d, el1[d], el2[d]) for d in el1.dimensions()] for dim, d1, d2 in dimension_data: diff --git a/holoviews/operation/datashader.py b/holoviews/operation/datashader.py index 7453dd87a0..4a1e61492c 100644 --- a/holoviews/operation/datashader.py +++ b/holoviews/operation/datashader.py @@ -8,19 +8,22 @@ import xarray as xr import datashader as ds import datashader.transfer_functions as tf +import dask.dataframe as dd from datashader.core import bypixel from datashader.pandas import pandas_pipeline +from datashader.dask import dask_pipeline from datashape.dispatch import dispatch from datashape import discover as dsdiscover from ..core import (ElementOperation, Element, Dimension, NdOverlay, Overlay, CompositeOverlay, Dataset) -from ..core.data import ArrayInterface, PandasInterface +from ..core.data import ArrayInterface, PandasInterface, DaskInterface from ..core.util import get_param_values, basestring from ..element import GridImage, Path, Curve, Contours, RGB from ..streams import RangeXY +DF_INTERFACES = [PandasInterface, DaskInterface] @dispatch(Element) def discover(dataset): @@ -28,7 +31,7 @@ def discover(dataset): Allows datashader to correctly discover the dtypes of the data in a holoviews Element. """ - if dataset.interface in [PandasInterface, ArrayInterface]: + if dataset.interface in DF_INTERFACES: return dsdiscover(dataset.data) else: return dsdiscover(dataset.dframe()) @@ -54,8 +57,13 @@ def dataset_pipeline(dataset, schema, canvas, glyph, summary): vdims = [dataset.get_dimension(column)(name) if column else Dimension('Count')] - agg = pandas_pipeline(dataset.data, schema, canvas, - glyph, summary) + if dataset.interface is PandasInterface: + agg = pandas_pipeline(dataset.data, schema, canvas, + glyph, summary) + elif dataset.interface is DaskInterface: + agg = dask_pipeline(dataset.data, schema, canvas, + glyph, summary) + agg = agg.rename({'x_axis': kdims[0].name, 'y_axis': kdims[1].name}) return agg @@ -125,7 +133,7 @@ def get_agg_data(cls, obj, category=None): kdims = obj.kdims vdims = obj.vdims x, y = obj.dimensions(label=True)[:2] - is_df = lambda x: isinstance(x, Dataset) and x.interface is PandasInterface + is_df = lambda x: isinstance(x, Dataset) and x.interface in DF_INTERFACES if isinstance(obj, Path): glyph = 'line' for p in obj.data: @@ -145,12 +153,20 @@ def get_agg_data(cls, obj, category=None): elif isinstance(obj, Element): glyph = 'line' if isinstance(obj, Curve) else 'points' paths.append(obj.data if is_df(obj) else obj.dframe()) - if glyph == 'line': - empty = paths[0][:1].copy() - empty.loc[0, :] = (np.NaN,) * empty.shape[1] - paths = [elem for path in paths for elem in (path, empty)][:-1] if len(paths) > 1: - df = pd.concat(paths).reset_index(drop=True) + if glyph == 'line': + path = paths[0][:1] + if isinstance(path, dd.DataFrame): + path = path.compute() + empty = path.copy() + empty.iloc[0, :] = (np.NaN,) * empty.shape[1] + paths = [elem for path in paths for elem in (path, empty)][:-1] + if all(isinstance(path, dd.DataFrame) for path in paths): + df = dd.concat(paths) + else: + paths = [path.compute() if isinstance(path, dd.DataFrame) else path + for path in paths] + df = pd.concat(paths) else: df = paths[0] if category and df[category].dtype.name != 'category': diff --git a/tests/testdataset.py b/tests/testdataset.py index 5505fb3eb6..7df37e718f 100644 --- a/tests/testdataset.py +++ b/tests/testdataset.py @@ -15,6 +15,10 @@ except: pd = None +try: + import dask.dataframe as dd +except: + dd = None class HomogeneousColumnTypes(object): @@ -105,7 +109,7 @@ def test_dataset_array_hm(self): def test_dataset_add_dimensions_value_hm(self): table = self.dataset_hm.add_dimension('z', 1, 0) self.assertEqual(table.kdims[1], 'z') - self.compare_arrays(table.dimension_values('z'), np.zeros(len(table))) + self.compare_arrays(table.dimension_values('z'), np.zeros(table.shape[0])) def test_dataset_add_dimensions_values_hm(self): table = self.dataset_hm.add_dimension('z', 1, range(1,12)) @@ -259,7 +263,7 @@ def test_dataset_2D_partial_reduce_ht(self): kdims=['x'], vdims=['z']) self.assertEqual(dataset.reduce(['y'], np.mean), reduced) - def test_column_aggregate_ht(self): + def test_dataset_aggregate_ht(self): aggregated = Dataset({'Gender':['M', 'F'], 'Weight':[16.5, 10], 'Height':[0.7, 0.8]}, kdims=self.kdims[:1], vdims=self.vdims) self.compare_dataset(self.table.aggregate(['Gender'], np.mean), aggregated) @@ -290,7 +294,7 @@ def test_dataset_groupby_dynamic(self): def test_dataset_add_dimensions_value_ht(self): table = self.dataset_ht.add_dimension('z', 1, 0) self.assertEqual(table.kdims[1], 'z') - self.compare_arrays(table.dimension_values('z'), np.zeros(len(table))) + self.compare_arrays(table.dimension_values('z'), np.zeros(table.shape[0])) def test_dataset_add_dimensions_values_ht(self): table = self.dataset_ht.add_dimension('z', 1, range(1,12)) @@ -384,6 +388,52 @@ def setUp(self): self.init_data() +class DaskDatasetTest(HeterogeneousColumnTypes, ComparisonTestCase): + """ + Test of the pandas DaskDataset interface. + """ + + def setUp(self): + if dd is None: + raise SkipTest("dask not available") + self.restore_datatype = Dataset.datatype + Dataset.datatype = ['dask'] + self.data_instance_type = dd.DataFrame + self.init_data() + + def test_dataset_reduce_ht(self): + reduced = Dataset({'Age': self.age, + 'Weight':self.weight, + 'Height':self.height}, + kdims=self.kdims[1:], vdims=self.vdims, + datatype=['dataframe']).sort() + self.assertEqual(self.table.reduce(['Gender'], np.mean), reduced) + + def test_dataset_aggregate_ht(self): + aggregated = Dataset({'Gender':['F', 'M'], 'Weight':[10, 16.5], + 'Height':[0.8, 0.7]}, + kdims=self.kdims[:1], vdims=self.vdims) + self.compare_dataset(self.table.aggregate(['Gender'], np.mean), aggregated) + + # Disabled tests for NotImplemented methods + def test_dataset_add_dimensions_values_hm(self): + raise SkipTest("Not supported") + + def test_dataset_add_dimensions_values_ht(self): + raise SkipTest("Not supported") + + def test_dataset_sort_vdim_ht(self): + raise SkipTest("Not supported") + + def test_dataset_sort_vdim_hm(self): + raise SkipTest("Not supported") + + def test_dataset_sort_string_ht(self): + raise SkipTest("Not supported") + + def test_dataset_boolean_index(self): + raise SkipTest("Not supported") + class DictDatasetTest(HeterogeneousColumnTypes, ComparisonTestCase): """ @@ -624,22 +674,22 @@ def setUp(self): # Disabled tests for NotImplemented methods def test_dataset_add_dimensions_values_hm(self): - pass + raise SkipTest("Not supported") def test_dataset_sort_vdim_hm(self): - pass + raise SkipTest("Not supported") def test_dataset_1D_reduce_hm(self): - pass + raise SkipTest("Not supported") def test_dataset_2D_reduce_hm(self): - pass + raise SkipTest("Not supported") def test_dataset_2D_aggregate_partial_hm(self): - pass + raise SkipTest("Not supported") def test_dataset_sample_hm(self): - pass + raise SkipTest("Not supported") class XArrayDatasetTest(GridDatasetTest): """ @@ -655,10 +705,10 @@ def setUp(self): # Disabled tests for NotImplemented methods def test_dataset_add_dimensions_values_hm(self): - pass + raise SkipTest("Not supported") def test_dataset_sort_vdim_hm(self): - pass + raise SkipTest("Not supported") def test_dataset_sample_hm(self): - pass + raise SkipTest("Not supported")