Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Improved naming of Ray Data map tasks #32585

Merged
merged 28 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3bb31a5
initial jam
Feb 7, 2023
b82b028
Merge branch 'master' into improved-data-dash-names
Feb 14, 2023
5756e47
setup path for improved task names
Feb 16, 2023
9467992
clean up
Feb 16, 2023
9f07bc8
add read fn name to LazyBlockList instead of read_datasource
Feb 16, 2023
9b49e12
format
Feb 16, 2023
b977c4c
update class detection logic
Feb 16, 2023
16295f8
Merge branch 'master' into improved-data-dash-names
Feb 17, 2023
48a67d9
update tests
Feb 17, 2023
4dc1ea2
clean up
Feb 17, 2023
003c521
format
Feb 17, 2023
7770385
update union naming scheme
Feb 17, 2023
92aa93c
remove unnecessary options
Feb 21, 2023
501372d
Merge branch 'master' into improved-data-dash-names
Feb 21, 2023
05adb73
Merge branch 'master' into improved-data-dash-names
Feb 23, 2023
5b9e2b9
optimize tests
Feb 24, 2023
2e89515
Merge branch 'master' into improved-data-dash-names
Feb 24, 2023
20200a8
rewrite Datasource get_name method
Feb 24, 2023
66f3668
Merge branch 'master' into improved-data-dash-names
Feb 24, 2023
58bb391
Merge branch 'master' into improved-data-dash-names
Feb 27, 2023
cf2de1f
update tests
Feb 27, 2023
2f5845c
straggler tests
Feb 27, 2023
b1b3fbe
Merge branch 'master' into improved-data-dash-names
Feb 28, 2023
01eac28
Merge branch 'master' into improved-data-dash-names
Feb 28, 2023
69c6d1c
update datasource naming
Feb 28, 2023
40fd9ea
update manual datasource names
Mar 1, 2023
6f70ef8
Merge branch 'master' into improved-data-dash-names
Mar 1, 2023
d2df035
Merge branch 'master' into improved-data-dash-names
Mar 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def _start_actor(self):
"""Start a new actor and add it to the actor pool as a pending actor."""
assert self._cls is not None
ctx = DatasetContext.get_current()
actor = self._cls.remote(ctx)
actor = self._cls.remote(ctx, src_fn_name=self.name)
self._actor_pool.add_pending_actor(actor, actor.get_location.remote())

def _add_bundled_input(self, bundle: RefBundle):
Expand Down Expand Up @@ -122,7 +122,7 @@ def _dispatch_tasks(self):
bundle = self._bundle_queue.popleft()
input_blocks = [block for block, _ in bundle.blocks]
ctx = TaskContext(task_idx=self._next_task_idx)
ref = actor.submit.options(num_returns="dynamic").remote(
ref = actor.submit.options(num_returns="dynamic", name=self.name).remote(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rkooo567 - I thought you mentioned there's problem that actor should not be set name here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we look at the dash screenshot above, I believe this isn't actually setting the name of the actor -- as we can see _MapWorker still being used in the Active Actors by Name chart. I think this part instead sets the name of the submitted task instead (e.g. fn_actors in above example).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the name of actors should be unique across the cluster. So let's avoid setting the name for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep this is just setting the name of the actor task, so this should be fine, right?

self._transform_fn_ref, ctx, *input_blocks
)
self._next_task_idx += 1
Expand Down Expand Up @@ -284,8 +284,9 @@ def _apply_default_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str, Any
class _MapWorker:
"""An actor worker for MapOperator."""

def __init__(self, ctx: DatasetContext):
def __init__(self, ctx: DatasetContext, src_fn_name: str):
DatasetContext._set_current(ctx)
self.src_fn_name: str = src_fn_name

def get_location(self) -> NodeIdStr:
return ray.get_runtime_context().get_node_id()
Expand All @@ -298,6 +299,9 @@ def submit(
) -> Iterator[Union[Block, List[BlockMetadata]]]:
yield from _map_task(fn, ctx, *blocks)

def __repr__(self):
return f"MapWorker({self.src_fn_name})"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually feeling the MapWorker naming might be confusing for user, shall we rename it to be more specific - like MapBatchesActor? @clarkzinzow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but I think that the current MapWorker(MapBatches(fn_name)) is pretty good!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once #32922 is implemented, this should allow us to send this actor name to the dash



# TODO(Clark): Promote this to a public config once we deprecate the legacy compute
# strategies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _add_bundled_input(self, bundle: RefBundle):
map_task = cached_remote_fn(_map_task, num_returns="dynamic")
input_blocks = [block for block, _ in bundle.blocks]
ctx = TaskContext(task_idx=self._next_task_idx)
ref = map_task.options(**self._ray_remote_args).remote(
ref = map_task.options(**self._ray_remote_args, name=self.name).remote(
self._transform_fn_ref, ctx, *input_blocks
)
self._next_task_idx += 1
Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/_internal/lazy_block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class LazyBlockList(BlockList):
def __init__(
self,
tasks: List[ReadTask],
read_stage_name: Optional[str] = None,
block_partition_refs: Optional[List[ObjectRef[MaybeBlockPartition]]] = None,
block_partition_meta_refs: Optional[List[ObjectRef[BlockMetadata]]] = None,
cached_metadata: Optional[List[BlockPartitionMetadata]] = None,
Expand All @@ -46,6 +47,8 @@ def __init__(

Args:
tasks: The read tasks that will produce the blocks of this lazy block list.
read_stage_name: An optional name for the read stage, derived from the
underlying Datasource
block_partition_refs: An optional list of already submitted read task
futures (i.e. block partition refs). This should be the same length as
the tasks argument.
Expand All @@ -60,6 +63,7 @@ def __init__(
stats. If not provided, a new UUID will be created.
"""
self._tasks = tasks
self._read_stage_name = read_stage_name
self._num_blocks = len(self._tasks)
if stats_uuid is None:
stats_uuid = uuid.uuid4()
Expand Down Expand Up @@ -129,6 +133,7 @@ def stats(self) -> DatasetStats:
def copy(self) -> "LazyBlockList":
return LazyBlockList(
self._tasks.copy(),
read_stage_name=self._read_stage_name,
block_partition_refs=self._block_partition_refs.copy(),
block_partition_meta_refs=self._block_partition_meta_refs.copy(),
cached_metadata=self._cached_metadata,
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,9 @@ def block_fn(
for block in read_fn():
yield block

name = "read"
name = in_blocks._read_stage_name or "Read"
if isinstance(name, list):
name = "->".join(name)

# Fuse downstream randomize stage with the read stage if possible. This is needed
# when .window() is called right after read->randomize, since it forces execution.
Expand Down
12 changes: 12 additions & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1565,12 +1565,24 @@ def union(self, *other: List["Dataset[T]"]) -> "Dataset[T]":
tasks: List[ReadTask] = []
block_partition_refs: List[ObjectRef[BlockPartition]] = []
block_partition_meta_refs: List[ObjectRef[BlockMetadata]] = []

# Gather read task names from input blocks of unioned Datasets,
# and concat them before passing to resulting LazyBlockList
read_task_names = []
self_read_name = self._plan._in_blocks._read_stage_name or "Read"
read_task_names.append(self_read_name)
other_read_names = [
o._plan._in_blocks._read_stage_name or "Read" for o in other
]
read_task_names.extend(other_read_names)

for bl in bls:
tasks.extend(bl._tasks)
block_partition_refs.extend(bl._block_partition_refs)
block_partition_meta_refs.extend(bl._block_partition_meta_refs)
blocklist = LazyBlockList(
tasks,
f"Union({','.join(read_task_names)})",
block_partition_refs,
block_partition_meta_refs,
owned_by_consumer=owned_by_consumer,
Expand Down
17 changes: 17 additions & 0 deletions python/ray/data/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ def on_write_failed(
"""
pass

def get_name(self) -> str:
"""Return a human-readable name for this datasource.
This will be used as the names of the read tasks.
"""
name = type(self).__name__
datasource_suffix = "Datasource"
if name.endswith(datasource_suffix):
name = name[: -len(datasource_suffix)]
return f"Read{name}"
scottjlee marked this conversation as resolved.
Show resolved Hide resolved


@PublicAPI
class Reader(Generic[T]):
Expand Down Expand Up @@ -388,6 +398,13 @@ class RandomIntRowDatasource(Datasource[ArrowRow]):
{'c_0': 4983608804013926748, 'c_1': 1160140066899844087}
"""

def get_name(self) -> str:
"""Return a human-readable name for this datasource.
This will be used as the names of the read tasks.
Note: overrides the base `Datasource` method.
"""
return "ReadRandomInt"

def create_reader(
self,
n: int,
Expand Down
7 changes: 7 additions & 0 deletions python/ray/data/datasource/parquet_base_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ class ParquetBaseDatasource(FileBasedDatasource):

_FILE_EXTENSION = "parquet"

def get_name(self):
"""Return a human-readable name for this datasource.
This will be used as the names of the read tasks.
Note: overrides the base `Datasource` method.
"""
return "ReadParquetBulk"

def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args):
import pyarrow.parquet as pq

Expand Down
6 changes: 5 additions & 1 deletion python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,12 @@ def read_datasource(
"dataset blocks."
)

read_stage_name = datasource.get_name()
block_list = LazyBlockList(
read_tasks, ray_remote_args=ray_remote_args, owned_by_consumer=False
read_tasks,
read_stage_name=read_stage_name,
ray_remote_args=ray_remote_args,
owned_by_consumer=False,
)

# TODO(chengsu): avoid calling Reader.get_read_tasks() twice after removing
Expand Down
28 changes: 14 additions & 14 deletions python/ray/data/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def test_stage_linking(ray_start_regular_shared):
ds = ds.fully_executed()
_assert_has_stages(ds._plan._stages_before_snapshot, ["map"])
assert len(ds._plan._stages_after_snapshot) == 0
_assert_has_stages(ds._plan._last_optimized_stages, ["read->map"])
_assert_has_stages(ds._plan._last_optimized_stages, ["ReadRange->map"])


def test_optimize_reorder(ray_start_regular_shared):
Expand All @@ -326,7 +326,7 @@ def test_optimize_reorder(ray_start_regular_shared):
expect_stages(
ds,
2,
["read->MapBatches(dummy_map)", "randomize_block_order"],
["ReadRange->MapBatches(dummy_map)", "randomize_block_order"],
)

ds2 = (
Expand All @@ -339,7 +339,7 @@ def test_optimize_reorder(ray_start_regular_shared):
expect_stages(
ds2,
3,
["read->randomize_block_order", "repartition", "MapBatches(dummy_map)"],
["ReadRange->randomize_block_order", "repartition", "MapBatches(dummy_map)"],
)


Expand All @@ -352,7 +352,7 @@ def test_window_randomize_fusion(ray_start_regular_shared):
pipe = ray.data.range(100).randomize_block_order().window().map_batches(dummy_map)
pipe.take()
stats = pipe.stats()
assert "read->randomize_block_order->MapBatches(dummy_map)" in stats, stats
assert "ReadRange->randomize_block_order->MapBatches(dummy_map)" in stats, stats


def test_write_fusion(ray_start_regular_shared, tmp_path):
Expand All @@ -365,7 +365,7 @@ def test_write_fusion(ray_start_regular_shared, tmp_path):
ds = ray.data.range(100).map_batches(lambda x: x)
ds.write_csv(path)
stats = ds._write_ds.stats()
assert "read->MapBatches(<lambda>)->write" in stats, stats
assert "ReadRange->MapBatches(<lambda>)->write" in stats, stats

ds = (
ray.data.range(100)
Expand All @@ -375,7 +375,7 @@ def test_write_fusion(ray_start_regular_shared, tmp_path):
)
ds.write_csv(path)
stats = ds._write_ds.stats()
assert "read->MapBatches(<lambda>)" in stats, stats
assert "ReadRange->MapBatches(<lambda>)" in stats, stats
assert "random_shuffle" in stats, stats
assert "MapBatches(<lambda>)->write" in stats, stats

Expand All @@ -388,7 +388,7 @@ def test_write_doesnt_reorder_randomize_block(ray_start_regular_shared, tmp_path

# The randomize_block_order will switch order with the following map_batches,
# but not the tailing write operator.
assert "read->MapBatches(<lambda>)" in stats, stats
assert "ReadRange->MapBatches(<lambda>)" in stats, stats
assert "randomize_block_order" in stats, stats
assert "write" in stats, stats

Expand All @@ -412,7 +412,7 @@ def build_pipe():
build_pipe(),
1,
[
"read->MapBatches(dummy_map)->MapBatches(dummy_map)->random_shuffle_map",
"ReadRange->MapBatches(dummy_map)->MapBatches(dummy_map)->random_shuffle_map", # noqa: E501
"random_shuffle_reduce",
],
)
Expand Down Expand Up @@ -487,7 +487,7 @@ def test_optimize_equivalent_remote_args(ray_start_regular_shared):
pipe,
1,
[
"read->MapBatches(dummy_map)->MapBatches(dummy_map)",
"ReadRange->MapBatches(dummy_map)->MapBatches(dummy_map)",
],
)

Expand All @@ -502,7 +502,7 @@ def test_optimize_equivalent_remote_args(ray_start_regular_shared):
pipe,
1,
[
"read->MapBatches(dummy_map)->random_shuffle_map",
"ReadRange->MapBatches(dummy_map)->random_shuffle_map",
"random_shuffle_reduce",
],
)
Expand All @@ -527,7 +527,7 @@ def test_optimize_incompatible_stages(shutdown_only):
pipe,
2,
[
"read->MapBatches(dummy_map)",
"ReadRange->MapBatches(dummy_map)",
"MapBatches(dummy_map)->random_shuffle_map",
"random_shuffle_reduce",
],
Expand All @@ -542,7 +542,7 @@ def test_optimize_incompatible_stages(shutdown_only):
pipe,
3,
[
"read->MapBatches(dummy_map)",
"ReadRange->MapBatches(dummy_map)",
"MapBatches(dummy_map)",
"random_shuffle_map",
"random_shuffle_reduce",
Expand Down Expand Up @@ -612,7 +612,7 @@ def __call__(self, x):
pipe,
1,
[
"read->MapBatches(CallableFn)->MapBatches(CallableFn)",
"ReadParquetBulk->MapBatches(CallableFn)->MapBatches(CallableFn)",
],
)

Expand Down Expand Up @@ -648,7 +648,7 @@ def __call__(self, x):
pipe,
1,
[
"read->MapBatches(<lambda>)->MapBatches(CallableFn)",
"ReadParquetBulk->MapBatches(<lambda>)->MapBatches(CallableFn)",
],
)

Expand Down
Loading