Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize AreaDefinition.get_proj_coords when requesting dask arrays #401

Merged
merged 5 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 90 additions & 29 deletions pyresample/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1115,11 +1115,26 @@ def _extract_lons_lats(self, lonslats):
return lons, lats


def invproj(data_x, data_y, proj_dict):
def _invproj(data_x, data_y, proj_dict):
"""Perform inverse projection."""
# XXX: does pyproj copy arrays? What can we do so it doesn't?
target_proj = Proj(proj_dict)
return np.dstack(target_proj(data_x, data_y, inverse=True))
lon, lat = target_proj(data_x, data_y, inverse=True)
return np.stack([lon.astype(data_x.dtype), lat.astype(data_y.dtype)])


def _generate_2d_coords(pixel_size_x, pixel_size_y, pixel_upper_left_x, pixel_upper_left_y,
chunks, dtype, block_info=None):
start_y_idx = block_info[None]["array-location"][1][0]
end_y_idx = block_info[None]["array-location"][1][1]
start_x_idx = block_info[None]["array-location"][2][0]
end_x_idx = block_info[None]["array-location"][2][1]
dtype = block_info[None]["dtype"]
x = np.arange(start_x_idx, end_x_idx, dtype=dtype) * pixel_size_x + pixel_upper_left_x
y = np.arange(start_y_idx, end_y_idx, dtype=dtype) * -pixel_size_y + pixel_upper_left_y
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be refactored so that it doesn't duplicate the code in _get_proj_vectors?

x_2d, y_2d = np.meshgrid(x, y)
res = np.stack([x_2d, y_2d])
return res


class _ProjectionDefinition(BaseDefinition):
Expand Down Expand Up @@ -2040,15 +2055,10 @@ def get_lonlat(self, row, col):
@staticmethod
def _do_rotation(xspan, yspan, rot_deg=0):
"""Apply a rotation factor to a matrix of points."""
if hasattr(xspan, 'chunks'):
# we were given dask arrays, use dask functions
import dask.array as numpy
else:
numpy = np
rot_rad = numpy.radians(rot_deg)
rot_mat = numpy.array([[np.cos(rot_rad), np.sin(rot_rad)], [-np.sin(rot_rad), np.cos(rot_rad)]])
x, y = numpy.meshgrid(xspan, yspan)
return numpy.einsum('ji, mni -> jmn', rot_mat, numpy.dstack([x, y]))
rot_rad = np.radians(rot_deg)
rot_mat = np.array([[np.cos(rot_rad), np.sin(rot_rad)], [-np.sin(rot_rad), np.cos(rot_rad)]])
x, y = np.meshgrid(xspan, yspan)
return np.einsum('ji, mni -> jmn', rot_mat, np.dstack([x, y]))

def get_proj_vectors_dask(self, chunks=None, dtype=None):
"""Get projection vectors."""
Expand All @@ -2058,17 +2068,21 @@ def get_proj_vectors_dask(self, chunks=None, dtype=None):
chunks = CHUNK_SIZE # FUTURE: Use a global config object instead
return self.get_proj_vectors(dtype=dtype, chunks=chunks)

def _get_proj_vectors(self, dtype=None, check_rotation=True, chunks=None):
"""Get 1D projection coordinates."""
x_kwargs = {}
y_kwargs = {}

@staticmethod
def _chunks_to_yx_chunks(chunks):
if chunks is not None and not isinstance(chunks, int):
y_chunks = chunks[0]
x_chunks = chunks[1]
else:
y_chunks = x_chunks = chunks
return y_chunks, x_chunks

def _get_proj_vectors(self, dtype=None, check_rotation=True, chunks=None):
"""Get 1D projection coordinates."""
x_kwargs = {}
y_kwargs = {}

y_chunks, x_chunks = self._chunks_to_yx_chunks(chunks)
if x_chunks is not None or y_chunks is not None:
# use dask functions instead of numpy
from dask.array import arange
Expand Down Expand Up @@ -2139,24 +2153,67 @@ def get_proj_coords(self, data_slice=None, dtype=None, chunks=None):
Removed 'cache' keyword argument and add 'chunks' for creating
dask arrays.
"""
if self.rotation != 0 and chunks is not None:
raise ValueError("'rotation' is not supported with dask operations.")
if dtype is None:
dtype = self.dtype
y_slice, x_slice = self._get_yx_data_slice(data_slice)
if chunks is not None:
target_x, target_y = self._proj_coords_dask(chunks, dtype)
if y_slice is not None:
target_x = target_x[y_slice, x_slice]
target_y = target_y[y_slice, x_slice]
return target_x, target_y

target_x, target_y = self._get_proj_vectors(dtype=dtype, check_rotation=False, chunks=chunks)
if data_slice is not None and isinstance(data_slice, slice):
target_y = target_y[data_slice]
elif data_slice is not None:
target_y = target_y[data_slice[0]]
target_x = target_x[data_slice[1]]
if y_slice is not None:
target_y = target_y[y_slice]
if x_slice is not None:
target_x = target_x[x_slice]

if self.rotation != 0:
res = self._do_rotation(target_x, target_y, self.rotation)
target_x, target_y = res[0, :, :], res[1, :, :]
elif chunks is not None:
import dask.array as da
target_x, target_y = da.meshgrid(target_x, target_y)
else:
target_x, target_y = np.meshgrid(target_x, target_y)

return target_x, target_y

@staticmethod
def _get_yx_data_slice(data_slice):
if data_slice is not None and isinstance(data_slice, slice):
return data_slice, slice(None, None, None)
elif data_slice is not None:
return data_slice[0], data_slice[1]
return None, None

def _proj_coords_dask(self, chunks, dtype):
"""Generate 2D x and y coordinate arrays.

This is a separate function because it allows dask to optimize and
separate the individual 2D chunks of coordinates. Using the basic
numpy form of these calculations produces an unnecessary
relationship between the "arange" 1D projection vectors and every
2D coordinate chunk. This makes it difficult for dask to schedule
2D chunks in an optimal way.

"""
y_chunks, x_chunks = self._chunks_to_yx_chunks(chunks)
norm_y_chunks, norm_x_chunks = da.core.normalize_chunks((y_chunks, x_chunks), self.shape, dtype=dtype)
# We must provide `chunks` and `dtype` as passed arguments to ensure
# the returned array has a unique dask name
# See: https://github.com/dask/dask/issues/8450
res = da.map_blocks(_generate_2d_coords,
self.pixel_size_x, self.pixel_size_y,
self.pixel_upper_left[0], self.pixel_upper_left[1],
chunks, dtype,
chunks=((2,), norm_y_chunks, norm_x_chunks),
meta=np.array((), dtype=dtype),
dtype=dtype,
)
target_x, target_y = res[0], res[1]
return target_x, target_y

@property
def projection_x_coords(self):
"""Return projection X coordinates."""
Expand Down Expand Up @@ -2207,7 +2264,8 @@ def get_lonlats(self, nprocs=None, data_slice=None, cache=False, dtype=None, chu
data_slice : slice object, optional
Calculate only coordinates for specified slice
cache : bool, optional
Store result the result. Requires data_slice to be None
Store the result internally for later reuse. Requires data_slice
to be None.
dtype : numpy.dtype, optional
Data type of the returned arrays
chunks: int or tuple, optional
Expand Down Expand Up @@ -2245,10 +2303,13 @@ def get_lonlats(self, nprocs=None, data_slice=None, cache=False, dtype=None, chu
if hasattr(target_x, 'chunks'):
# we are using dask arrays, map blocks to th
from dask.array import map_blocks
res = map_blocks(invproj, target_x, target_y,
chunks=(target_x.chunks[0], target_x.chunks[1], 2),
new_axis=[2], proj_dict=self.crs_wkt).astype(dtype)
return res[:, :, 0], res[:, :, 1]
res = map_blocks(_invproj, target_x, target_y,
chunks=(2,) + target_x.chunks,
meta=np.array((), dtype=target_x.dtype),
dtype=target_x.dtype,
new_axis=[0], proj_dict=self.crs_wkt)
lons, lats = res[0], res[1]
return lons, lats

if nprocs > 1:
target_proj = Proj_MP(self.crs)
Expand Down
23 changes: 15 additions & 8 deletions pyresample/test/test_geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,11 @@ def test_get_proj_coords_dask(self):
area_def = get_area_def(area_id, area_name, proj_id, proj_dict, x_size, y_size, area_extent)

xcoord, ycoord = area_def.get_proj_coords(chunks=4096)
# make sure different chunk size provides a different dask name
xcoord2, ycoord2 = area_def.get_proj_coords(chunks=2048)
assert xcoord2.name != xcoord.name
assert ycoord2.name != ycoord.name

xcoord = xcoord.compute()
ycoord = ycoord.compute()
self.assertTrue(np.allclose(xcoord[0, :],
Expand Down Expand Up @@ -2048,19 +2053,21 @@ def test_get_lonlats(self):
np.testing.assert_allclose(lats[464:, :], lats1)

# check that get_lonlats with chunks definition doesn't cause errors and output arrays are equal
# too many chunks
_check_final_area_lon_lat_with_chunks(final_area, lons, lats, chunks=((200, 264, 464), (5570,)))
# too few chunks
_check_final_area_lon_lat_with_chunks(final_area, lons, lats, chunks=((464,), (5568,)))
# right amount of chunks, same shape
_check_final_area_lon_lat_with_chunks(final_area, lons, lats, chunks=((464, 464), (5568,)))
with pytest.raises(ValueError):
# too many chunks
_check_final_area_lon_lat_with_chunks(final_area, lons, lats, chunks=((200, 264, 464), (5570,)))
# right amount of chunks, different shape
_check_final_area_lon_lat_with_chunks(final_area, lons, lats, chunks=((464, 470), (5568,)))
# only one set of chunks in a tuple
_check_final_area_lon_lat_with_chunks(final_area, lons, lats, chunks=(464, 5568))
# only one chunk value
_check_final_area_lon_lat_with_chunks(final_area, lons, lats, chunks=464)

# only one set of chunks in a tuple
_check_final_area_lon_lat_with_chunks(final_area, lons, lats, chunks=(464, 5568))
# too few chunks
_check_final_area_lon_lat_with_chunks(final_area, lons, lats, chunks=((464,), (5568,)))
# right amount of chunks, same shape
_check_final_area_lon_lat_with_chunks(final_area, lons, lats, chunks=((464, 464), (5568,)))

def test_combine_area_extents(self):
"""Test combination of area extents."""
area1 = MagicMock()
Expand Down