Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WriteParquetBatched #23030

Merged
merged 8 commits into from
Nov 1, 2022

Conversation

peridotml
Copy link
Contributor

@peridotml peridotml commented Sep 5, 2022

This is a start to addressing #22813. @TheNeuralBit I put up a draft. Let me know if this still would be worth adding to the sdk or if there is way to do this already.

This is an example of the pipeline:

with TestPipeline() as p:
  _ = p \
  | RunInference(...) \
  | BatchToArrowTable() \  # turns a dict of vectors into a pa.Table,
                           # e.g. {'id': [1, 2, 3], 'score': [0.9, 0.1, 0.8]} -> pa.Table     
  | WriteToParquetBatched(
      path, self.SCHEMA, num_shards=1, shard_name_template='')

Motivation

  • Don't need to un-batch into a series of python dicts, which hurts performance.
  • Models naturally return batches, so it works
  • I didn't add it in yet, but you can infer the schema from the first batch (possibly will break something, but worked with dataflow)

Open Questions

  1. Controlling row_group_size would need to be handled upstream
  2. A lot of duplicate code. Possibly some smart ways to refactor a bit (let me know)

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@peridotml
Copy link
Contributor Author

@TheNeuralBit here is an initial draft. Let me know what you think and I will work on it more!

@peridotml peridotml marked this pull request as draft September 5, 2022 02:27
@codecov
Copy link

codecov bot commented Sep 5, 2022

Codecov Report

Merging #23030 (315a02e) into master (31561e2) will decrease coverage by 0.52%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##           master   #23030      +/-   ##
==========================================
- Coverage   73.71%   73.18%   -0.53%     
==========================================
  Files         714      729      +15     
  Lines       95240    96457    +1217     
==========================================
+ Hits        70203    70590     +387     
- Misses      23740    24570     +830     
  Partials     1297     1297              
Flag Coverage Δ
python 82.60% <100.00%> (-0.94%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/io/parquetio.py 95.71% <100.00%> (+0.55%) ⬆️
sdks/python/apache_beam/io/gcp/bigquery_tools.py 73.34% <0.00%> (-12.39%) ⬇️
sdks/python/apache_beam/typehints/__init__.py 71.42% <0.00%> (-6.35%) ⬇️
...s/interactive/dataproc/dataproc_cluster_manager.py 71.72% <0.00%> (-5.70%) ⬇️
sdks/python/apache_beam/typehints/schemas.py 88.95% <0.00%> (-4.90%) ⬇️
...python/apache_beam/io/gcp/bigquery_schema_tools.py 79.24% <0.00%> (-4.85%) ⬇️
.../python/apache_beam/testing/test_stream_service.py 88.09% <0.00%> (-4.77%) ⬇️
...dks/python/apache_beam/metrics/monitoring_infos.py 92.50% <0.00%> (-4.50%) ⬇️
sdks/python/apache_beam/internal/pickler.py 92.00% <0.00%> (-3.46%) ⬇️
...ks/python/apache_beam/runners/worker/statecache.py 92.93% <0.00%> (-3.22%) ⬇️
... and 88 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @peridotml! I do think this is worth adding. There will probably always be times when users want to have more control and will need this.

self._schema,
compression=self._codec,
use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps,
use_compliant_nested_type=self._use_compliant_nested_type)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try to re-use this for the element-wise Parquet sink? I think we could structure this similar to ReadFromParquet andReadFromParquetBatched. They both use the same underlying source that produces batches, but the former adds an additional transform to explode the batches into individual dictionaries:

def expand(self, pvalue):
return pvalue | Read(self._source) | ParDo(_ArrowTableToRowDictionaries())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TheNeuralBit Brilliant idea! I will implement it this weekend.

Copy link
Contributor Author

@peridotml peridotml left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TheNeuralBit I gave it a shot. I imagined you were thinking of reusing the batch sink, no worries if you want me to reuse the original sync and like unnest the pyarrow tables :).

I left a ton of questions / comments. I hope that is already!

self._flush_buffer()
if self._record_batches_byte_size > 0:
table = self._create_table()
yield window.GlobalWindows.windowed_value_at_end_of_window(table)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we need to be careful with windows when using buffers inside of DoFns.

I don't actually know if this works, but I looked at BatchElements and copied part of it. If there is a working strategy, then I assumed you would let me know. I am not experienced with streaming.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok for now, at least it shouldn't be a regression from the current behavior.

rb = pa.RecordBatch.from_arrays(arrays, schema=self._schema)
self._record_batches.append(rb)
size = 0
for x in arrays:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both pa.RecordBatch and pa.Table have the attribute nbytes. It could be simpler and more performant to just use the attribute. I couldn't get documentation to see if it was available before v1 of pyarrow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note we test with all supported major versions of pyarrow in CI, so you don't need to worry about unintentionally breaking compatibility with an old version.

Also, if support for pyarrow 0.x or 1.x is problematic we could go ahead and drop them. They are 2 years old at this point.

@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
yield row


class _RowDictionariesToArrowTable(DoFn):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used pa.Table for consistency with reading batched parquets, but I wanted to bring up that pa.RecordBatch would also work.

I don't have strong preferences either way. I would use it both ways.

if len(self._buffer[0]) >= self._buffer_size:
self._flush_buffer()

if self._record_batches_byte_size >= self._row_group_buffer_size:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we switched to using nbytes then this could be moved into the sink if that makes more sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There also is [rb|pa_table].get_total_buffer_size, which would save some lines of code and is actually a lot faster. Although, it is pretty negligible.

Test results

  • 900 rows x 10 columns 830 nano sec vs 9.94 micro secs
  • 900 rows x 100 columns 9.8 micro sec vs 100 micro secs

Unfortunately, the counts don't line up perfectly and it already is pretty fast. Just wanted to flag this as an option!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, the counts don't line up perfectly? How different are they? If they are different I would suspect the number provided by pyarrow is the correct one.

I think this is worth digging into further, but I'd recommend filing an issue and doing it in a separate PR. That way this one is mostly a no-op moving code around.

# reorder the data in columnar format.
for i, n in enumerate(self._schema.names):
self._buffer[i].append(value[n])
writer.write_table(value)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have used this before where I infer the schema from the first batch that comes through. This simplified my model inference scripts and avoided loading tensorflow saved models to get output signature and converting them to pyarrow types.

It worked locally and on Dataflow. Thought I would throw it out there.

# reorder the data in columnar format.
for i, n in enumerate(self._schema.names):
self._buffer[i].append(value[n])
writer.write_table(value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe change value to table: pa.Table to make the expectation explicit

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM overall, thank you!

My only comment is the table: pa.Table suggestion, assuming CI finishes ok

@peridotml peridotml marked this pull request as ready for review October 29, 2022 03:13
@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @AnandInguva for label python.
R: @ahmedabu98 for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@peridotml peridotml changed the title Draft: WriteParquetBatched Add WriteParquetBatched Oct 30, 2022
@esadler-hbo
Copy link

esadler-hbo commented Oct 31, 2022

@TheNeuralBit ...I had some time to spend on this. Responded to your code review and added better doc strings. Oh whoops I am commenting from my work account. I am also peridotml!

@TheNeuralBit TheNeuralBit merged commit ef7c0c9 into apache:master Nov 1, 2022
@TheNeuralBit
Copy link
Member

Thank you!

ruslan-ikhsan pushed a commit to akvelon/beam that referenced this pull request Nov 11, 2022
* initial draft

* reuse parquet sink for writing rows and pa.Tables

* lint

* fix imports

* cr - rename value to table

* add doc string and example test

* fix doc strings

* specify doctest group to separate tests

Co-authored-by: Evan Sadler <easadler@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants