Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] [streaming] Add streaming_split() API #32991

Merged
merged 91 commits into from
Mar 10, 2023

Conversation

ericl
Copy link
Contributor

@ericl ericl commented Mar 3, 2023

Why are these changes needed?

TODO:

  • Implement backpressure
  • Implement split locality
  • Implement output locality
  • Performance testing
  • Unit tests
import numpy as np
import ray
import time

ray.data.context.DatasetContext.get_current().use_streaming_executor = True
ray.data.context.DatasetContext.get_current().execution_options.resource_limits.object_store_memory = 40e9

NUM_CONSUMERS = 10
DATA_GB = 1000

@ray.remote(scheduling_strategy="SPREAD", num_cpus=1)
class Consumer:
    def consume(self, it):
        total = 0
        for block in it.iter_batches(batch_format="pyarrow"):
            total += len(block)
        print("consume done", total)

    def get_location(self):
        return ray.get_runtime_context().get_node_id()

consumers = [Consumer.remote() for c in range(NUM_CONSUMERS)]
hints = [ray.get(c.get_location.remote()) for c in consumers]

def preprocess(x):
    return x

shards = ray.data.range_tensor(DATA_GB * 7000, shape=(80, 80, 3), parallelism=2 * DATA_GB) \
    .map_batches(preprocess) \
    .streaming_split(NUM_CONSUMERS, equal=True, locality_hints=hints)

start = time.time()
ray.get([consumers[i].consume.remote(shards[i]) for i in range(NUM_CONSUMERS)])

time.sleep(1)
print("Throughput", DATA_GB / (time.time() - start), "GiB/s")

Output on 5-node cluster:

Throughput 13.839650118615584 GiB/s

Output on 50 nodes, 50 consumers:

Throughput 58.442277295083365 GiB/s

ericl added 30 commits February 17, 2023 13:11
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
@ericl ericl changed the title [WIP] Add stream_split() API [data] [streaming] Add streaming_split() API Mar 8, 2023
python/ray/data/_internal/stream_split_dataset_iterator.py Outdated Show resolved Hide resolved
python/ray/data/_internal/stream_split_dataset_iterator.py Outdated Show resolved Hide resolved
def gen_blocks() -> Iterator[ObjectRef[Block]]:
future = self._coord_actor.get.remote(self._output_split_idx)
while True:
block = ray.get(future)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I missed that future is ObjectRef[ObjectRef[Block]].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added explicit types here.

ericl added 3 commits March 8, 2023 16:55
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
@ericl ericl added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Mar 9, 2023
Copy link
Contributor

@jianoaix jianoaix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look exciting to get this!

ericl added 3 commits March 9, 2023 16:23
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Copy link
Contributor Author

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed!

@ericl ericl removed the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Mar 10, 2023
@ericl ericl merged commit e1c61e0 into ray-project:master Mar 10, 2023
ProjectsByJackHe pushed a commit to ProjectsByJackHe/ray that referenced this pull request Mar 21, 2023
Signed-off-by: Jack He <jackhe2345@gmail.com>
edoakes pushed a commit to edoakes/ray that referenced this pull request Mar 22, 2023
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
peytondmurray pushed a commit to peytondmurray/ray that referenced this pull request Mar 22, 2023
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
Signed-off-by: elliottower <elliot@elliottower.com>
ProjectsByJackHe pushed a commit to ProjectsByJackHe/ray that referenced this pull request May 4, 2023
Signed-off-by: Jack He <jackhe2345@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants