From 6ff6276ced72bf5a75820d5b3397b64a7bea8a55 Mon Sep 17 00:00:00 2001 From: Jon Mease Date: Sat, 23 Feb 2019 11:27:30 -0500 Subject: [PATCH] fix memory explosion when computing extents with dask --- datashader/dask.py | 9 ++- datashader/glyphs.py | 109 +++++++++++++++++++--------------- datashader/tests/test_dask.py | 37 ++++++++++++ 3 files changed, 104 insertions(+), 51 deletions(-) diff --git a/datashader/dask.py b/datashader/dask.py index 2d29e4e0b..b57cbdc9f 100644 --- a/datashader/dask.py +++ b/datashader/dask.py @@ -31,8 +31,13 @@ def dask_pipeline(df, schema, canvas, glyph, summary): def shape_bounds_st_and_axis(df, canvas, glyph): - x_range = canvas.x_range or glyph.compute_x_bounds_dask(df) - y_range = canvas.y_range or glyph.compute_y_bounds_dask(df) + if not canvas.x_range or not canvas.y_range: + x_extents, y_extents = glyph.compute_bounds_dask(df) + else: + x_extents, y_extents = None, None + + x_range = canvas.x_range or x_extents + y_range = canvas.y_range or y_extents x_min, x_max, y_min, y_max = bounds = compute(*(x_range + y_range)) x_range, y_range = (x_min, x_max), (y_min, y_max) diff --git a/datashader/glyphs.py b/datashader/glyphs.py index a02c5b365..f725c6c84 100644 --- a/datashader/glyphs.py +++ b/datashader/glyphs.py @@ -84,23 +84,20 @@ def maybe_expand_bounds(bounds): return minval, maxval @memoize - def compute_x_bounds_dask(self, df): - """Like ``PointLike._compute_x_bounds``, but memoized because - ``df`` is immutable/hashable (a Dask dataframe). - """ - xs = df[self.x].values - minval, maxval = np.nanmin(xs), np.nanmax(xs) - return self.maybe_expand_bounds((minval, maxval)) + def compute_bounds_dask(self, ddf): - @memoize - def compute_y_bounds_dask(self, df): - """Like ``PointLike._compute_y_bounds``, but memoized because - ``df`` is immutable/hashable (a Dask dataframe). - """ - ys = df[self.y].values - minval, maxval = np.nanmin(ys), np.nanmax(ys) - return self.maybe_expand_bounds((minval, maxval)) + r = ddf.map_partitions(lambda df: np.array([[ + np.nanmin(df[self.x].values), + np.nanmax(df[self.x].values), + np.nanmin(df[self.y].values), + np.nanmax(df[self.y].values)]] + )).compute() + + x_extents = np.nanmin(r[:, 0]), np.nanmax(r[:, 1]) + y_extents = np.nanmin(r[:, 2]), np.nanmax(r[:, 3]) + return (self.maybe_expand_bounds(x_extents), + self.maybe_expand_bounds(y_extents)) class _PolygonLike(_PointLike): @@ -255,18 +252,20 @@ def compute_y_bounds(self, df): return self.maybe_expand_bounds((min(mins), max(maxes))) @memoize - def compute_x_bounds_dask(self, df): - bounds_list = [self._compute_x_bounds(df[x].values.compute()) - for x in self.x] - mins, maxes = zip(*bounds_list) - return self.maybe_expand_bounds((min(mins), max(maxes))) + def compute_bounds_dask(self, ddf): - @memoize - def compute_y_bounds_dask(self, df): - bounds_list = [self._compute_y_bounds(df[y].values.compute()) - for y in self.y] - mins, maxes = zip(*bounds_list) - return self.maybe_expand_bounds((min(mins), max(maxes))) + r = ddf.map_partitions(lambda df: np.array([[ + np.nanmin([np.nanmin(df[c].values) for c in self.x]), + np.nanmax([np.nanmax(df[c].values) for c in self.x]), + np.nanmin([np.nanmin(df[c].values) for c in self.y]), + np.nanmax([np.nanmax(df[c].values) for c in self.y])]] + )).compute() + + x_extents = np.nanmin(r[:, 0]), np.nanmax(r[:, 1]) + y_extents = np.nanmin(r[:, 2]), np.nanmax(r[:, 3]) + + return (self.maybe_expand_bounds(x_extents), + self.maybe_expand_bounds(y_extents)) @memoize def _build_extend(self, x_mapper, y_mapper, info, append): @@ -344,28 +343,20 @@ def compute_y_bounds(self, df): return self.maybe_expand_bounds((min(mins), max(maxes))) @memoize - def compute_x_bounds_dask(self, df): - """Like ``PointLike.compute_x_bounds``, but memoized because - ``df`` is immutable/hashable (a Dask dataframe). - """ - x_mins = [np.nanmin(df[xlabel].values) for xlabel in self.x] - x_maxes = [np.nanmax(df[xlabel].values) for xlabel in self.x] + def compute_bounds_dask(self, ddf): - minval, maxval = np.nanmin(x_mins), np.nanmax(x_maxes) + r = ddf.map_partitions(lambda df: np.array([[ + np.nanmin([np.nanmin(df[c].values) for c in self.x]), + np.nanmax([np.nanmax(df[c].values) for c in self.x]), + np.nanmin([np.nanmin(df[c].values) for c in self.y]), + np.nanmax([np.nanmax(df[c].values) for c in self.y])]] + )).compute() - return self.maybe_expand_bounds((minval, maxval)) + x_extents = np.nanmin(r[:, 0]), np.nanmax(r[:, 1]) + y_extents = np.nanmin(r[:, 2]), np.nanmax(r[:, 3]) - @memoize - def compute_y_bounds_dask(self, df): - """Like ``PointLike.compute_x_bounds``, but memoized because - ``df`` is immutable/hashable (a Dask dataframe). - """ - y_mins = [np.nanmin(df[ylabel].values) for ylabel in self.y] - y_maxes = [np.nanmax(df[ylabel].values) for ylabel in self.y] - - minval, maxval = np.nanmin(y_mins), np.nanmax(y_maxes) - - return self.maybe_expand_bounds((minval, maxval)) + return (self.maybe_expand_bounds(x_extents), + self.maybe_expand_bounds(y_extents)) @memoize def _build_extend(self, x_mapper, y_mapper, info, append): @@ -406,8 +397,18 @@ def compute_x_bounds(self, *args): x_max = np.nanmax(self.x) return self.maybe_expand_bounds((x_min, x_max)) - def compute_x_bounds_dask(self, df): - return self.compute_x_bounds() + @memoize + def compute_bounds_dask(self, ddf): + + r = ddf.map_partitions(lambda df: np.array([[ + np.nanmin([np.nanmin(df[c].values) for c in self.y]), + np.nanmax([np.nanmax(df[c].values) for c in self.y])]] + )).compute() + + y_extents = np.nanmin(r[:, 0]), np.nanmax(r[:, 1]) + + return (self.compute_x_bounds(), + self.maybe_expand_bounds(y_extents)) @memoize def _build_extend(self, x_mapper, y_mapper, info, append): @@ -448,8 +449,18 @@ def compute_y_bounds(self, *args): y_max = np.nanmax(self.y) return self.maybe_expand_bounds((y_min, y_max)) - def compute_y_bounds_dask(self, df): - return self.compute_y_bounds() + @memoize + def compute_bounds_dask(self, ddf): + + r = ddf.map_partitions(lambda df: np.array([[ + np.nanmin([np.nanmin(df[c].values) for c in self.x]), + np.nanmax([np.nanmax(df[c].values) for c in self.x])]] + )).compute() + + x_extents = np.nanmin(r[:, 0]), np.nanmax(r[:, 1]) + + return (self.maybe_expand_bounds(x_extents), + self.compute_y_bounds()) @memoize def _build_extend(self, x_mapper, y_mapper, info, append): diff --git a/datashader/tests/test_dask.py b/datashader/tests/test_dask.py index e740ae633..a7e3cfb38 100644 --- a/datashader/tests/test_dask.py +++ b/datashader/tests/test_dask.py @@ -420,6 +420,43 @@ def test_line_autorange(df, x, y, ax): assert_eq(agg, out) +def test_line_x_constant_autorange(): + # axis1 y constant + df = pd.DataFrame({ + 'y0': [0, 0, 0], + 'y1': [-4, 0, 4], + 'y2': [0, 0, 0], + }) + + x = np.array([-4, 0, 4]) + y = ['y0', 'y1', 'y2'] + ax = 1 + + axis = ds.core.LinearAxis() + lincoords = axis.compute_index( + axis.compute_scale_and_translate((-4., 4.), 9), 9) + + ddf = dd.from_pandas(df, npartitions=2) + + cvs = ds.Canvas(plot_width=9, plot_height=9) + + agg = cvs.line(ddf, x, y, ds.count(), axis=ax) + + sol = np.array([[0, 0, 0, 0, 1, 0, 0, 0, 0], + [0, 0, 0, 1, 0, 1, 0, 0, 0], + [0, 0, 1, 0, 0, 0, 1, 0, 0], + [0, 1, 0, 0, 0, 0, 0, 1, 0], + [3, 1, 1, 1, 1, 1, 1, 1, 3], + [0, 1, 0, 0, 0, 0, 0, 1, 0], + [0, 0, 1, 0, 0, 0, 1, 0, 0], + [0, 0, 0, 1, 0, 1, 0, 0, 0], + [0, 0, 0, 0, 1, 0, 0, 0, 0]], dtype='i4') + + out = xr.DataArray(sol, coords=[lincoords, lincoords], + dims=['y', 'x']) + assert_eq(agg, out) + + def test_log_axis_line(): axis = ds.core.LogAxis() logcoords = axis.compute_index(axis.compute_scale_and_translate((1, 10), 2), 2)