Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] [3/3] [no_early_kickoff] Async iter batches e2e #33620

Merged
merged 91 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
6f47dd5
update
amogkam Mar 22, 2023
da336f3
fix
amogkam Mar 22, 2023
98c7918
fix
amogkam Mar 22, 2023
f1e6810
Merge branch 'master' of github.com:ray-project/ray into async-iter-b…
amogkam Mar 22, 2023
18d66b6
newline
amogkam Mar 22, 2023
02d3f7e
fix
amogkam Mar 22, 2023
241d58f
wip
amogkam Mar 22, 2023
3cafc47
wip
amogkam Mar 22, 2023
da9d702
update
amogkam Mar 22, 2023
5a07fd2
more
amogkam Mar 22, 2023
1beee62
wip
amogkam Mar 22, 2023
f526159
Merge branch 'master' of github.com:ray-project/ray into async-iter-b…
amogkam Mar 22, 2023
7cd338d
update
amogkam Mar 22, 2023
b74da64
wip
amogkam Mar 22, 2023
c23bbd0
add
amogkam Mar 22, 2023
2dbbe9c
wip
amogkam Mar 23, 2023
855f9fe
update tests
amogkam Mar 23, 2023
9e429a4
Merge branch 'async-iter-batches-2' of github.com:amogkam/ray into as…
amogkam Mar 23, 2023
4b4851f
wip
amogkam Mar 23, 2023
dcfdb06
comments
amogkam Mar 23, 2023
36dce4a
Merge branch 'master' of github.com:ray-project/ray into async-iter-b…
amogkam Mar 23, 2023
5549fb4
lint
amogkam Mar 23, 2023
c414c52
fix
amogkam Mar 23, 2023
44e63c4
integration
amogkam Mar 23, 2023
8bed11e
stats
amogkam Mar 23, 2023
1133881
legacy stats
amogkam Mar 23, 2023
0a1885f
update
amogkam Mar 23, 2023
34213f2
add iterator
amogkam Mar 23, 2023
1b7f1b9
update
amogkam Mar 23, 2023
8fee289
update
amogkam Mar 23, 2023
d484344
release test
amogkam Mar 23, 2023
fb99ec2
update
amogkam Mar 23, 2023
e6e429a
lock
amogkam Mar 23, 2023
5522174
fix
amogkam Mar 23, 2023
fab714c
update
amogkam Mar 24, 2023
ccc00c2
update
amogkam Mar 24, 2023
64638a5
update
amogkam Mar 24, 2023
e6cdb06
address comments
amogkam Mar 24, 2023
0feeb2d
syntax
amogkam Mar 24, 2023
b14d1c7
Merge branch 'async-iter-batches-2' of github.com:amogkam/ray into as…
amogkam Mar 24, 2023
00a5455
update interfaces
amogkam Mar 24, 2023
5a309df
wip
amogkam Mar 24, 2023
717f3ca
fix
amogkam Mar 24, 2023
1c6dd85
merge conflicts
amogkam Mar 24, 2023
d4141f8
Merge branch 'async-iter-batches-2' of github.com:amogkam/ray into as…
amogkam Mar 24, 2023
ebaa8eb
fix
amogkam Mar 24, 2023
bcdc1db
comment
amogkam Mar 24, 2023
f918fb1
wip
amogkam Mar 24, 2023
1b8c2a6
fix
amogkam Mar 24, 2023
e8638be
Merge branch 'async-iter-batches-2' of github.com:amogkam/ray into as…
amogkam Mar 24, 2023
d4df6ae
update
amogkam Mar 24, 2023
97653c5
Merge branch 'master' of github.com:ray-project/ray into async-iter-b…
amogkam Mar 24, 2023
0145b2d
fix
amogkam Mar 24, 2023
58d73ec
address comments
amogkam Mar 24, 2023
989f3a7
Merge branch 'async-iter-batches-2' of github.com:amogkam/ray into as…
amogkam Mar 24, 2023
620d52e
update
amogkam Mar 24, 2023
3b8494c
update
amogkam Mar 24, 2023
9d111af
lint
amogkam Mar 24, 2023
497eb82
fix
amogkam Mar 25, 2023
9db2a77
Merge branch 'async-iter-batches-2' of github.com:amogkam/ray into as…
amogkam Mar 25, 2023
ddb9460
lock stats
amogkam Mar 25, 2023
5d4587a
fix
amogkam Mar 25, 2023
f39a0ef
Merge branch 'async-iter-batches-2' of github.com:amogkam/ray into as…
amogkam Mar 25, 2023
6313be7
remove lock
amogkam Mar 25, 2023
171aa14
fix stats
amogkam Mar 25, 2023
1c7dfe9
fix
amogkam Mar 25, 2023
deb7b97
fix
amogkam Mar 25, 2023
d64072f
Merge branch 'async-iter-batches-2' of github.com:amogkam/ray into as…
amogkam Mar 25, 2023
d47dae1
fix
amogkam Mar 25, 2023
771d7c9
ci fixes
amogkam Mar 25, 2023
3c752ac
more fixes
amogkam Mar 25, 2023
4046d29
Merge branch 'master' of github.com:ray-project/ray into async-iter-b…
amogkam Mar 25, 2023
5869107
fix
amogkam Mar 25, 2023
e65f73f
fix stats
amogkam Mar 25, 2023
f84b9eb
more fix
amogkam Mar 25, 2023
738b225
change back trace deallocation
amogkam Mar 25, 2023
06bbe2b
fix
amogkam Mar 25, 2023
c5da601
empty
amogkam Mar 25, 2023
fa3d13b
address comments
amogkam Mar 25, 2023
9ab19dc
default to 1
amogkam Mar 25, 2023
52d4767
default to 1
amogkam Mar 25, 2023
ad2dbb2
fixes
amogkam Mar 25, 2023
72dc3fb
fix test_stats
amogkam Mar 25, 2023
444541c
Merge branch 'master' of github.com:ray-project/ray into async-iter-b…
amogkam Mar 27, 2023
33195a0
update
amogkam Mar 27, 2023
e3c79ba
update
amogkam Mar 27, 2023
2888f96
fix iter_rows
amogkam Mar 27, 2023
5f65e43
comment
amogkam Mar 27, 2023
d2958bd
zero division
amogkam Mar 27, 2023
8308ee6
fix
amogkam Mar 27, 2023
829311d
add init file
amogkam Mar 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/ray-air/doc_code/air_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
datasets={"train": dataset},
preprocessor=preprocessor,
num_epochs=1, # Stop after this number of epochs is read.
prefetch_blocks=1, # Number of blocks to prefetch when reading data.
prefetch_batches=1, # Number of batches to prefetch when reading data.
batch_size=None, # Use whole blocks as batches.
)
trainer.fit()
Expand Down
23 changes: 15 additions & 8 deletions python/ray/air/util/check_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DummyTrainer(DataParallelTrainer):
scaling_config: Configuration for how to scale training. This is the same
as for :class:`~ray.train.base_trainer.BaseTrainer`.
num_epochs: How many many times to iterate through the datasets for.
prefetch_blocks: The number of blocks to prefetch ahead of the
prefetch_batches: The number of batches to prefetch ahead of the
current block during the scan. This is the same as
:meth:`~ray.data.dataset.Dataset.iter_batches`
time_preprocessing_separately: Whether to time the preprocessing separately
Expand All @@ -44,16 +44,18 @@ def __init__(
*args,
scaling_config: Optional[ScalingConfig] = None,
num_epochs: int = 1,
prefetch_blocks: int = 1,
prefetch_batches: int = 1,
batch_size: Optional[int] = 4096,
time_preprocessing_separately: bool = False,
# Deprecated.
prefetch_blocks: int = 0,
**kwargs,
):
if not scaling_config:
scaling_config = ScalingConfig(num_workers=1)
super().__init__(
train_loop_per_worker=DummyTrainer.make_train_loop(
num_epochs, prefetch_blocks, batch_size
num_epochs, prefetch_batches, prefetch_blocks, batch_size
),
*args,
scaling_config=scaling_config,
Expand Down Expand Up @@ -81,7 +83,10 @@ def preprocess_datasets(self):

@staticmethod
def make_train_loop(
num_epochs: int, prefetch_blocks: int, batch_size: Optional[int]
num_epochs: int,
prefetch_batches: int,
prefetch_blocks: int,
batch_size: Optional[int],
):
"""Make a debug train loop that runs for the given amount of epochs."""

Expand All @@ -99,7 +104,9 @@ def train_loop_per_worker():
epochs_read += 1
batch_start = time.perf_counter()
for batch in data_shard.iter_batches(
prefetch_blocks=prefetch_blocks, batch_size=batch_size
prefetch_batches=prefetch_batches,
prefetch_blocks=prefetch_blocks,
batch_size=batch_size,
):
batch_delay = time.perf_counter() - batch_start
batch_delays.append(batch_delay)
Expand Down Expand Up @@ -189,11 +196,11 @@ def make_local_dataset_iterator(
"--num-epochs", "-e", type=int, default=1, help="Number of epochs to read."
)
parser.add_argument(
"--prefetch-blocks",
"--prefetch-batches",
"-b",
type=int,
default=1,
help="Number of blocks to prefetch when reading data.",
help="Number of batches to prefetch when reading data.",
)

args = parser.parse_args()
Expand All @@ -215,7 +222,7 @@ def make_local_dataset_iterator(
datasets={"train": dataset},
preprocessor=preprocessor,
num_epochs=args.num_epochs,
prefetch_blocks=args.prefetch_blocks,
prefetch_batches=args.prefetch_batches,
dataset_config={"train": DatasetConfig()},
batch_size=None,
)
Expand Down
33 changes: 13 additions & 20 deletions python/ray/data/_internal/block_batching/block_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
format_batches,
collate,
extract_data_from_batch,
make_async_gen,
WaitBlockPrefetcher,
ActorBlockPrefetcher,
)
from ray.data._internal.memory_tracing import trace_deallocation
from ray.data._internal.stats import DatasetPipelineStats, DatasetStats
from ray.data.block import Block, DataBatch
from ray.data.context import DatasetContext
Expand Down Expand Up @@ -45,7 +45,6 @@ def batch_block_refs(
shuffle_buffer_min_size: Optional[int] = None,
shuffle_seed: Optional[int] = None,
ensure_copy: bool = False,
prefetch_batches: int = 0,
) -> Iterator[DataBatch]:
"""Create formatted batches of data from 1 or more block object references.

Expand Down Expand Up @@ -79,17 +78,12 @@ def batch_block_refs(
shuffle_seed: The seed to use for the local random shuffle.
ensure_copy: Whether batches are always copied from the underlying base
blocks (not zero-copy views).
prefetch_batches: The number of batches to fetch ahead of the current batch to
process. If set to greater than 0, a separate thread will be used to fetch
the specified amount of formatted batches from blocks. This improves
performance for non-CPU bound UDFs, allowing batch fetching compute and
formatting to be overlapped with the UDF. Defaults to 0 (no prefetching
enabled).

Returns:
An iterator over record batches.
"""

if stats:
stats._legacy_iter_batches = True
context = DatasetContext.get_current()

if (
Expand All @@ -107,11 +101,10 @@ def batch_block_refs(
_prefetch_blocks(
block_ref_iter=block_refs,
prefetcher=prefetcher,
stats=stats,
num_blocks_to_prefetch=prefetch_blocks,
eager_free=eager_free,
),
stats=stats,
eager_free=eager_free,
)

yield from batch_blocks(
Expand All @@ -124,7 +117,6 @@ def batch_block_refs(
shuffle_buffer_min_size=shuffle_buffer_min_size,
shuffle_seed=shuffle_seed,
ensure_copy=ensure_copy,
prefetch_batches=prefetch_batches,
)


Expand All @@ -139,7 +131,6 @@ def batch_blocks(
shuffle_buffer_min_size: Optional[int] = None,
shuffle_seed: Optional[int] = None,
ensure_copy: bool = False,
prefetch_batches: int = 0,
) -> Iterator[DataBatch]:
"""Create formatted batches of data from 1 or more blocks.

Expand All @@ -164,17 +155,12 @@ def _iterator_fn(base_iterator: Iterator[Block]) -> Iterator[DataBatch]:
)

if collate_fn is not None:
batch_iter = collate(batch_iter, collate_fn=collate_fn)
batch_iter = collate(batch_iter, collate_fn=collate_fn, stats=stats)

batch_iter = extract_data_from_batch(batch_iter)
yield from batch_iter

if prefetch_batches > 0:
batch_iter = make_async_gen(
blocks, fn=_iterator_fn, num_workers=prefetch_batches
)
else:
batch_iter = _iterator_fn(blocks)
batch_iter = _iterator_fn(blocks)

for formatted_batch in batch_iter:
user_timer = stats.iter_user_s.timer() if stats else nullcontext()
Expand All @@ -186,6 +172,7 @@ def _prefetch_blocks(
block_ref_iter: Iterator[ObjectRef[Block]],
prefetcher: BlockPrefetcher,
num_blocks_to_prefetch: int,
eager_free: bool = False,
stats: Optional[Union[DatasetStats, DatasetPipelineStats]] = None,
) -> Iterator[ObjectRef[Block]]:
"""Given an iterable of Block Object References, returns an iterator
Expand All @@ -201,6 +188,9 @@ def _prefetch_blocks(
if num_blocks_to_prefetch == 0:
for block_ref in block_ref_iter:
yield block_ref
trace_deallocation(
block_ref, "block_batching._prefetch_blocks", free=eager_free
)

window_size = num_blocks_to_prefetch
# Create the initial set of blocks to prefetch.
Expand All @@ -219,3 +209,6 @@ def _prefetch_blocks(
except StopIteration:
pass
yield block_ref
trace_deallocation(
block_ref, "block_batching._prefetch_blocks", free=eager_free
)
Loading