Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create an Arrow Coder for Beam that allows us to create Ray Datasets #17

Open
pabloem opened this issue Jun 8, 2022 · 3 comments
Open

Comments

@pabloem
Copy link
Collaborator

pabloem commented Jun 8, 2022

This coder is the first step to allow us to create Ray Datasets based on Beam PCollections.

fyi @TheNeuralBit

@TheNeuralBit
Copy link

Thanks, I think it's worth tracking this in a Beam issue as well. Could you provide some references for Ray Datasets that would inform how an Arrow encoded PCollection can integrate with it?

@pabloem
Copy link
Collaborator Author

pabloem commented Jun 10, 2022

theres's very silly superficial stuff I wrote here: https://docs.google.com/document/d/1DcuKhCPnZezIvu9vFMsM4BRdBv0kgAWewOJqRbS42GI/edit#

Specifically, I would say read_datasource may be a good point to look at - it spins up several Ray Tasks that read individual blocks. Each block is usually stored as a block of Arrow records.

I suppose an integration we could have is something like:

class RayPCollection(beam.PCollection):

  def to_dataset(self):
    pipeline_result = self.pipeline.run()
    pcoll = pipeline_result.get_pcoll(pcoll)
    return ray.data.Dataset(BlockList(pcoll.block_list))

or something like that

@pabloem
Copy link
Collaborator Author

pabloem commented Jan 24, 2023

Beam has a few utilities to convert to-from Beam and Arrow schemas (see here).

A first step would be to write an ArrowRecordBatchCoder, which can be constructed with a Beam Schema or an Arrow Schema, where each individual element is an Arrow RecordBatch

And then we can write a simple PTransform that takes a Beam PCollection of rows with schema, into a Beam PCollection where each element is an Arrow RecordBatch. (and viceversa)

Then it becomes easier to pass this to Ray's Datasets (and also into Beam from Datasets).


A second step could be to encode Beam Rows as batches of Arrow Records, but we can think about that once we do the first step.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants