Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Improve warning message when read task is large #46942

Merged
merged 3 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def aggregate_output_metadata(self) -> BlockMetadata:
return BlockMetadata(None, None, None, None, None)

# `get_read_tasks` isn't guaranteed to return exactly one read task.
metadata = [read_task.get_metadata() for read_task in read_tasks]
metadata = [read_task.metadata for read_task in read_tasks]

if all(meta.num_rows is not None for meta in metadata):
num_rows = sum(meta.num_rows for meta in metadata)
Expand Down
28 changes: 15 additions & 13 deletions python/ray/data/_internal/planner/plan_read_op.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import warnings
from typing import Iterable, List

import ray
Expand All @@ -14,12 +15,14 @@
MapTransformer,
MapTransformFn,
)
from ray.data._internal.execution.util import memory_string
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.util import _warn_on_high_parallelism, call_with_retry
from ray.data.block import Block
from ray.data.block import Block, BlockMetadata
from ray.data.datasource.datasource import ReadTask
from ray.util.debug import log_once

TASK_SIZE_WARN_THRESHOLD_BYTES = 100000
TASK_SIZE_WARN_THRESHOLD_BYTES = 1024 * 1024 # 1 MiB

# Transient errors that can occur during longer reads. Trigger retry when these occur.
READ_FILE_RETRY_ON_ERRORS = ["AWS Error NETWORK_CONNECTION", "AWS Error ACCESS_DENIED"]
Expand All @@ -29,24 +32,23 @@
logger = logging.getLogger(__name__)


def cleaned_metadata(read_task: ReadTask):
block_meta = read_task.get_metadata()
def cleaned_metadata(read_task: ReadTask) -> BlockMetadata:
task_size = len(cloudpickle.dumps(read_task))
if (
block_meta.size_bytes is not None
and task_size > block_meta.size_bytes
and task_size > TASK_SIZE_WARN_THRESHOLD_BYTES
if task_size > TASK_SIZE_WARN_THRESHOLD_BYTES and log_once(
f"large_read_task_{read_task.read_fn.__name__}"
):
logger.warning(
f"The read task size ({task_size} bytes) is larger "
"than the reported output size of the task "
f"({block_meta.size_bytes} bytes). This may be a size "
"reporting bug in the datasource being read from."
warnings.warn(
"The serialized size of your read function named "
f"'{read_task.read_fn.__name__}' is {memory_string(task_size)}. This size "
"relatively large. As a result, Ray might excessively "
"spill objects during execution. To fix this issue, avoid accessing "
f"`self` or other large objects in '{read_task.read_fn.__name__}'."
)

# Defensively compute the size of the block as the max size reported by the
# datasource and the actual read task size. This is to guard against issues
# with bad metadata reporting.
block_meta = read_task.metadata
if block_meta.size_bytes is None or task_size > block_meta.size_bytes:
block_meta.size_bytes = task_size

Expand Down
9 changes: 7 additions & 2 deletions python/ray/data/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ReadTask(Callable[[], Iterable[Block]]):

Read tasks are generated by :meth:`~ray.data.Datasource.get_read_tasks`,
and return a list of ``ray.data.Block`` when called. Initial metadata about the read
operation can be retrieved via ``get_metadata()`` prior to executing the
operation can be retrieved via the ``metadata`` attribute prior to executing the
read. Final metadata is returned after the read along with the blocks.

Ray will execute read tasks in remote functions to parallelize execution.
Expand All @@ -149,9 +149,14 @@ def __init__(self, read_fn: Callable[[], Iterable[Block]], metadata: BlockMetada
self._metadata = metadata
self._read_fn = read_fn

def get_metadata(self) -> BlockMetadata:
@property
def metadata(self) -> BlockMetadata:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this from a method to a property for consistency with the newly-added read_fn property

return self._metadata

@property
def read_fn(self) -> Callable[[], Iterable[Block]]:
return self._read_fn

def __call__(self) -> Iterable[Block]:
result = self._read_fn()
if not hasattr(result, "__iter__"):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def read_datasource(
import uuid

stats = DatasetStats(
metadata={"Read": [read_task.get_metadata() for read_task in read_tasks]},
metadata={"Read": [read_task.metadata for read_task in read_tasks]},
parent=None,
needs_stats_actor=True,
stats_uuid=uuid.uuid4(),
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_get_read_tasks(self, mock_setup_delta_sharing_connections):
self.assertTrue(all(isinstance(task, ReadTask) for task in read_tasks))

for task in read_tasks:
metadata = task.get_metadata()
metadata = task.metadata
self.assertIsInstance(metadata, BlockMetadata)
self.assertEqual(len(metadata.input_files), 1)
self.assertTrue(metadata.input_files[0]["url"] in ["file1", "file2"])
Expand Down
21 changes: 21 additions & 0 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@
from ray.data._internal.planner.exchange.sort_task_spec import SortKey
from ray.data._internal.planner.planner import Planner
from ray.data._internal.stats import DatasetStats
from ray.data.block import BlockMetadata
from ray.data.context import DataContext
from ray.data.datasource import Datasource
from ray.data.datasource.datasource import ReadTask
from ray.data.tests.conftest import * # noqa
from ray.data.tests.test_util import get_parquet_read_logical_op
from ray.data.tests.util import column_udf, extract_values, named_values
Expand Down Expand Up @@ -110,6 +113,24 @@ def test_read_operator(ray_start_regular_shared):
)


def test_read_operator_emits_warning_for_large_read_tasks():
class StubDatasource(Datasource):
def estimate_inmemory_data_size(self) -> Optional[int]:
return None

def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
large_object = np.zeros((128, 1024, 1024), dtype=np.uint8) # 128 MiB

def read_fn():
large_object
yield pd.DataFrame({"column": [0]})

return [ReadTask(read_fn, BlockMetadata(1, None, None, None, None))]

with pytest.warns(UserWarning):
ray.data.read_datasource(StubDatasource()).materialize()


def test_split_blocks_operator(ray_start_regular_shared):
planner = Planner()
op = get_parquet_read_logical_op(parallelism=10)
Expand Down
Loading