Skip to content

Commit

Permalink
fix memory explosion when computing extents with dask
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmmease authored and jbednar committed Feb 23, 2019
1 parent 26894b9 commit 6ff6276
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 51 deletions.
9 changes: 7 additions & 2 deletions datashader/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ def dask_pipeline(df, schema, canvas, glyph, summary):


def shape_bounds_st_and_axis(df, canvas, glyph):
x_range = canvas.x_range or glyph.compute_x_bounds_dask(df)
y_range = canvas.y_range or glyph.compute_y_bounds_dask(df)
if not canvas.x_range or not canvas.y_range:
x_extents, y_extents = glyph.compute_bounds_dask(df)
else:
x_extents, y_extents = None, None

x_range = canvas.x_range or x_extents
y_range = canvas.y_range or y_extents
x_min, x_max, y_min, y_max = bounds = compute(*(x_range + y_range))
x_range, y_range = (x_min, x_max), (y_min, y_max)

Expand Down
109 changes: 60 additions & 49 deletions datashader/glyphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,20 @@ def maybe_expand_bounds(bounds):
return minval, maxval

@memoize
def compute_x_bounds_dask(self, df):
"""Like ``PointLike._compute_x_bounds``, but memoized because
``df`` is immutable/hashable (a Dask dataframe).
"""
xs = df[self.x].values
minval, maxval = np.nanmin(xs), np.nanmax(xs)
return self.maybe_expand_bounds((minval, maxval))
def compute_bounds_dask(self, ddf):

@memoize
def compute_y_bounds_dask(self, df):
"""Like ``PointLike._compute_y_bounds``, but memoized because
``df`` is immutable/hashable (a Dask dataframe).
"""
ys = df[self.y].values
minval, maxval = np.nanmin(ys), np.nanmax(ys)
return self.maybe_expand_bounds((minval, maxval))
r = ddf.map_partitions(lambda df: np.array([[
np.nanmin(df[self.x].values),
np.nanmax(df[self.x].values),
np.nanmin(df[self.y].values),
np.nanmax(df[self.y].values)]]
)).compute()

x_extents = np.nanmin(r[:, 0]), np.nanmax(r[:, 1])
y_extents = np.nanmin(r[:, 2]), np.nanmax(r[:, 3])

return (self.maybe_expand_bounds(x_extents),
self.maybe_expand_bounds(y_extents))


class _PolygonLike(_PointLike):
Expand Down Expand Up @@ -255,18 +252,20 @@ def compute_y_bounds(self, df):
return self.maybe_expand_bounds((min(mins), max(maxes)))

@memoize
def compute_x_bounds_dask(self, df):
bounds_list = [self._compute_x_bounds(df[x].values.compute())
for x in self.x]
mins, maxes = zip(*bounds_list)
return self.maybe_expand_bounds((min(mins), max(maxes)))
def compute_bounds_dask(self, ddf):

@memoize
def compute_y_bounds_dask(self, df):
bounds_list = [self._compute_y_bounds(df[y].values.compute())
for y in self.y]
mins, maxes = zip(*bounds_list)
return self.maybe_expand_bounds((min(mins), max(maxes)))
r = ddf.map_partitions(lambda df: np.array([[
np.nanmin([np.nanmin(df[c].values) for c in self.x]),
np.nanmax([np.nanmax(df[c].values) for c in self.x]),
np.nanmin([np.nanmin(df[c].values) for c in self.y]),
np.nanmax([np.nanmax(df[c].values) for c in self.y])]]
)).compute()

x_extents = np.nanmin(r[:, 0]), np.nanmax(r[:, 1])
y_extents = np.nanmin(r[:, 2]), np.nanmax(r[:, 3])

return (self.maybe_expand_bounds(x_extents),
self.maybe_expand_bounds(y_extents))

@memoize
def _build_extend(self, x_mapper, y_mapper, info, append):
Expand Down Expand Up @@ -344,28 +343,20 @@ def compute_y_bounds(self, df):
return self.maybe_expand_bounds((min(mins), max(maxes)))

@memoize
def compute_x_bounds_dask(self, df):
"""Like ``PointLike.compute_x_bounds``, but memoized because
``df`` is immutable/hashable (a Dask dataframe).
"""
x_mins = [np.nanmin(df[xlabel].values) for xlabel in self.x]
x_maxes = [np.nanmax(df[xlabel].values) for xlabel in self.x]
def compute_bounds_dask(self, ddf):

minval, maxval = np.nanmin(x_mins), np.nanmax(x_maxes)
r = ddf.map_partitions(lambda df: np.array([[
np.nanmin([np.nanmin(df[c].values) for c in self.x]),
np.nanmax([np.nanmax(df[c].values) for c in self.x]),
np.nanmin([np.nanmin(df[c].values) for c in self.y]),
np.nanmax([np.nanmax(df[c].values) for c in self.y])]]
)).compute()

return self.maybe_expand_bounds((minval, maxval))
x_extents = np.nanmin(r[:, 0]), np.nanmax(r[:, 1])
y_extents = np.nanmin(r[:, 2]), np.nanmax(r[:, 3])

@memoize
def compute_y_bounds_dask(self, df):
"""Like ``PointLike.compute_x_bounds``, but memoized because
``df`` is immutable/hashable (a Dask dataframe).
"""
y_mins = [np.nanmin(df[ylabel].values) for ylabel in self.y]
y_maxes = [np.nanmax(df[ylabel].values) for ylabel in self.y]

minval, maxval = np.nanmin(y_mins), np.nanmax(y_maxes)

return self.maybe_expand_bounds((minval, maxval))
return (self.maybe_expand_bounds(x_extents),
self.maybe_expand_bounds(y_extents))

@memoize
def _build_extend(self, x_mapper, y_mapper, info, append):
Expand Down Expand Up @@ -406,8 +397,18 @@ def compute_x_bounds(self, *args):
x_max = np.nanmax(self.x)
return self.maybe_expand_bounds((x_min, x_max))

def compute_x_bounds_dask(self, df):
return self.compute_x_bounds()
@memoize
def compute_bounds_dask(self, ddf):

r = ddf.map_partitions(lambda df: np.array([[
np.nanmin([np.nanmin(df[c].values) for c in self.y]),
np.nanmax([np.nanmax(df[c].values) for c in self.y])]]
)).compute()

y_extents = np.nanmin(r[:, 0]), np.nanmax(r[:, 1])

return (self.compute_x_bounds(),
self.maybe_expand_bounds(y_extents))

@memoize
def _build_extend(self, x_mapper, y_mapper, info, append):
Expand Down Expand Up @@ -448,8 +449,18 @@ def compute_y_bounds(self, *args):
y_max = np.nanmax(self.y)
return self.maybe_expand_bounds((y_min, y_max))

def compute_y_bounds_dask(self, df):
return self.compute_y_bounds()
@memoize
def compute_bounds_dask(self, ddf):

r = ddf.map_partitions(lambda df: np.array([[
np.nanmin([np.nanmin(df[c].values) for c in self.x]),
np.nanmax([np.nanmax(df[c].values) for c in self.x])]]
)).compute()

x_extents = np.nanmin(r[:, 0]), np.nanmax(r[:, 1])

return (self.maybe_expand_bounds(x_extents),
self.compute_y_bounds())

@memoize
def _build_extend(self, x_mapper, y_mapper, info, append):
Expand Down
37 changes: 37 additions & 0 deletions datashader/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,43 @@ def test_line_autorange(df, x, y, ax):
assert_eq(agg, out)


def test_line_x_constant_autorange():
# axis1 y constant
df = pd.DataFrame({
'y0': [0, 0, 0],
'y1': [-4, 0, 4],
'y2': [0, 0, 0],
})

x = np.array([-4, 0, 4])
y = ['y0', 'y1', 'y2']
ax = 1

axis = ds.core.LinearAxis()
lincoords = axis.compute_index(
axis.compute_scale_and_translate((-4., 4.), 9), 9)

ddf = dd.from_pandas(df, npartitions=2)

cvs = ds.Canvas(plot_width=9, plot_height=9)

agg = cvs.line(ddf, x, y, ds.count(), axis=ax)

sol = np.array([[0, 0, 0, 0, 1, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 1, 0, 0, 0],
[0, 0, 1, 0, 0, 0, 1, 0, 0],
[0, 1, 0, 0, 0, 0, 0, 1, 0],
[3, 1, 1, 1, 1, 1, 1, 1, 3],
[0, 1, 0, 0, 0, 0, 0, 1, 0],
[0, 0, 1, 0, 0, 0, 1, 0, 0],
[0, 0, 0, 1, 0, 1, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0, 0, 0]], dtype='i4')

out = xr.DataArray(sol, coords=[lincoords, lincoords],
dims=['y', 'x'])
assert_eq(agg, out)


def test_log_axis_line():
axis = ds.core.LogAxis()
logcoords = axis.compute_index(axis.compute_scale_and_translate((1, 10), 2), 2)
Expand Down

0 comments on commit 6ff6276

Please sign in to comment.