From a6e9d55b0ce778cc7751034ab14e287a02a772cc Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 7 Oct 2024 17:58:50 -0700 Subject: [PATCH 1/4] Initial commit Signed-off-by: Balaji Veeramani --- .../planner/exchange/sort_task_spec.py | 30 +++++++------------ python/ray/data/tests/test_sort.py | 14 +++++++++ 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/python/ray/data/_internal/planner/exchange/sort_task_spec.py b/python/ray/data/_internal/planner/exchange/sort_task_spec.py index bcb4b704aefb..7bace6815c92 100644 --- a/python/ray/data/_internal/planner/exchange/sort_task_spec.py +++ b/python/ray/data/_internal/planner/exchange/sort_task_spec.py @@ -174,34 +174,26 @@ def sample_boundaries( # TODO(zhilong): Update sort sample bar before finished. samples = sample_bar.fetch_until_complete(sample_results) del sample_results - samples = [s for s in samples if len(s) > 0] + samples: List[Block] = [s for s in samples if len(s) > 0] # The dataset is empty if len(samples) == 0: return [None] * (num_reducers - 1) + + # Convert samples to a sorted list[tuple[...]] where each tuple represents a + # sample. builder = DelegatingBlockBuilder() for sample in samples: builder.add_block(sample) samples = builder.build() + samples = BlockAccessor.for_block(samples).to_numpy(columns=columns) + samples = sorted(zip(*samples.values())) - sample_dict = BlockAccessor.for_block(samples).to_numpy(columns=columns) - # Compute sorted indices of the samples. In np.lexsort last key is the - # primary key hence have to reverse the order. - indices = np.lexsort(list(reversed(list(sample_dict.values())))) - # Sort each column by indices, and calculate q-ths quantile items. - # Ignore the 1st item as it's not required for the boundary - for k, v in sample_dict.items(): - sorted_v = v[indices] - sample_dict[k] = list( - np.quantile( - sorted_v, np.linspace(0, 1, num_reducers), interpolation="nearest" - )[1:] - ) - # Return the list of boundaries as tuples - # of a form (col1_value, col2_value, ...) - return [ - tuple(sample_dict[k][i] for k in sample_dict) - for i in range(num_reducers - 1) + # Each boundary corresponds to a quantile of the data. + quantile_indices = [ + int(q * (len(samples) - 1)) for q in np.linspace(0, 1, num_reducers + 1) ] + # Exclude the first and last quantiles because they're 0 and 1. + return [samples[i] for i in quantile_indices[1:-1]] def _sample_block(block: Block, n_samples: int, sort_key: SortKey) -> Block: diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index 2246723553ab..9032e1005d86 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -51,6 +51,20 @@ def test_sort_with_specified_boundaries(ray_start_regular, descending, boundarie assert np.all(block["id"] == expected_block) +def test_sort_multiple_keys_produces_equally_sized_blocks(ray_start_regular_shared): + ds = ray.data.from_items( + [{"a": i, "b": j} for i in range(2) for j in range(5)], override_num_blocks=5 + ) + + ds_sorted = ds.sort(["a", "b"]) + + num_rows_per_block = [ + bundle.num_rows() for bundle in ds_sorted.iter_internal_ref_bundles() + ] + assert len(num_rows_per_block) == 5, len(num_rows_per_block) + assert max(num_rows_per_block) == 3, num_rows_per_block + + def test_sort_simple(ray_start_regular, use_push_based_shuffle): num_items = 100 parallelism = 4 From 0fd2b10ed57347b2374c2bcde7ad2218ebc42b1c Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 7 Oct 2024 18:08:31 -0700 Subject: [PATCH 2/4] Add comments Signed-off-by: Balaji Veeramani --- python/ray/data/_internal/planner/exchange/sort_task_spec.py | 2 ++ python/ray/data/tests/test_sort.py | 1 + 2 files changed, 3 insertions(+) diff --git a/python/ray/data/_internal/planner/exchange/sort_task_spec.py b/python/ray/data/_internal/planner/exchange/sort_task_spec.py index 7bace6815c92..9b407b5c77f2 100644 --- a/python/ray/data/_internal/planner/exchange/sort_task_spec.py +++ b/python/ray/data/_internal/planner/exchange/sort_task_spec.py @@ -181,6 +181,8 @@ def sample_boundaries( # Convert samples to a sorted list[tuple[...]] where each tuple represents a # sample. + # TODO: Once we deprecate pandas blocks, we can avoid this conversion and + # directly sort the samples. builder = DelegatingBlockBuilder() for sample in samples: builder.add_block(sample) diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index 9032e1005d86..9b6fca5edca7 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -52,6 +52,7 @@ def test_sort_with_specified_boundaries(ray_start_regular, descending, boundarie def test_sort_multiple_keys_produces_equally_sized_blocks(ray_start_regular_shared): + # Test for https://github.com/ray-project/ray/issues/45303. ds = ray.data.from_items( [{"a": i, "b": j} for i in range(2) for j in range(5)], override_num_blocks=5 ) From f3154c60a6a94fdd4e714b31d4d18392937e89f3 Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 7 Oct 2024 19:26:16 -0700 Subject: [PATCH 3/4] Add comments Signed-off-by: Balaji Veeramani --- python/ray/data/tests/test_sort.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index 9b6fca5edca7..d3a55c626d2d 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -62,8 +62,13 @@ def test_sort_multiple_keys_produces_equally_sized_blocks(ray_start_regular_shar num_rows_per_block = [ bundle.num_rows() for bundle in ds_sorted.iter_internal_ref_bundles() ] + # Number of output blocks should be equal to the number of input blocks. assert len(num_rows_per_block) == 5, len(num_rows_per_block) - assert max(num_rows_per_block) == 3, num_rows_per_block + # Ideally we should have 10 rows / 5 blocks = 2 rows per block, but to make this + # test less fragile we allow for a small deviation. + assert all( + 1 <= num_rows <= 3 for num_rows in num_rows_per_block + ), num_rows_per_block def test_sort_simple(ray_start_regular, use_push_based_shuffle): From 00861bb25e8a4583df2907f9fec549a6c1ada5be Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Wed, 9 Oct 2024 09:13:06 -0700 Subject: [PATCH 4/4] Adderss review comments Signed-off-by: Balaji Veeramani --- .../_internal/planner/exchange/sort_task_spec.py | 12 +++++++----- python/ray/data/tests/test_sort.py | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/python/ray/data/_internal/planner/exchange/sort_task_spec.py b/python/ray/data/_internal/planner/exchange/sort_task_spec.py index 9b407b5c77f2..edeea0639464 100644 --- a/python/ray/data/_internal/planner/exchange/sort_task_spec.py +++ b/python/ray/data/_internal/planner/exchange/sort_task_spec.py @@ -186,16 +186,18 @@ def sample_boundaries( builder = DelegatingBlockBuilder() for sample in samples: builder.add_block(sample) - samples = builder.build() - samples = BlockAccessor.for_block(samples).to_numpy(columns=columns) - samples = sorted(zip(*samples.values())) + samples_table = builder.build() + samples_dict = BlockAccessor.for_block(samples_table).to_numpy(columns=columns) + # This zip does the transposition from list of column values to list of tuples. + samples_list = sorted(zip(*samples_dict.values())) # Each boundary corresponds to a quantile of the data. quantile_indices = [ - int(q * (len(samples) - 1)) for q in np.linspace(0, 1, num_reducers + 1) + int(q * (len(samples_list) - 1)) + for q in np.linspace(0, 1, num_reducers + 1) ] # Exclude the first and last quantiles because they're 0 and 1. - return [samples[i] for i in quantile_indices[1:-1]] + return [samples_list[i] for i in quantile_indices[1:-1]] def _sample_block(block: Block, n_samples: int, sort_key: SortKey) -> Block: diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index d3a55c626d2d..982cf99409d4 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -51,7 +51,7 @@ def test_sort_with_specified_boundaries(ray_start_regular, descending, boundarie assert np.all(block["id"] == expected_block) -def test_sort_multiple_keys_produces_equally_sized_blocks(ray_start_regular_shared): +def test_sort_multiple_keys_produces_equally_sized_blocks(ray_start_regular): # Test for https://github.com/ray-project/ray/issues/45303. ds = ray.data.from_items( [{"a": i, "b": j} for i in range(2) for j in range(5)], override_num_blocks=5