Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-12071] Don't re-use WriteToPandasSink instances across windows #14374

Merged
merged 2 commits into from
Mar 31, 2021

Conversation

TheNeuralBit
Copy link
Member

When given a concrete FileSink, WriteToFiles will re-use the same sink across windows:

return lambda x: input_sink

sink = self.sink_fn(destination)

This leads to all data (aside from one partition), being written to the file for a single window. The fix is to pass in a function that creates a new sink instead.

Post-Commit Tests Status (on master branch)

Lang SDK ULR Dataflow Flink Samza Spark Twister2
Go --- --- Build Status --- Build Status ---
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status
Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status --- Build Status Build Status --- Build Status ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status --- --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@@ -521,7 +521,7 @@ def expand(self, pcoll):
return pcoll | fileio.WriteToFiles(
path=dir,
file_naming=fileio.default_file_naming(name),
sink=_WriteToPandasFileSink(
sink=lambda _: _WriteToPandasFileSink(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the ease of making this mistake, is the WriteToFiles API a hazard to other contributors and users?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think so. I'm not sure how to address it though. Should we detect and raise when this mode is used with non global windows? I'll file a jira for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codecov
Copy link

codecov bot commented Mar 30, 2021

Codecov Report

Merging #14374 (b88ebc8) into master (a16bbf7) will decrease coverage by 0.01%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #14374      +/-   ##
==========================================
- Coverage   83.42%   83.40%   -0.02%     
==========================================
  Files         469      469              
  Lines       58770    58770              
==========================================
- Hits        49027    49019       -8     
- Misses       9743     9751       +8     
Impacted Files Coverage Δ
.../build/srcs/sdks/python/apache_beam/io/aws/s3io.py
...che_beam/examples/flink/flink_streaming_impulse.py
...rcs/sdks/python/apache_beam/testing/test_stream.py
...amples/snippets/transforms/aggregation/distinct.py
...srcs/sdks/python/apache_beam/transforms/display.py
..._beam/testing/benchmarks/nexmark/queries/query9.py
...eam/runners/portability/fn_api_runner/fn_runner.py
...dks/python/apache_beam/examples/wordcount_xlang.py
...s/sdks/python/apache_beam/internal/gcp/__init__.py
...ers/dataflow/internal/clients/dataflow/__init__.py
... and 928 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a16bbf7...b88ebc8. Read the comment docs.

@TheNeuralBit
Copy link
Member Author

Run PythonDocker PreCommit

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM just adding nitpicking where chances are I don't have the context of your decision-making

for path in glob.glob(pattern):
with open(path) as fin:
# TODO(Py3): yield from
for line in fin:
yield line.rstrip('\n')
if delete:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good to know pytest has this. We've also used https://docs.python.org/3/library/tempfile.html elsewhere.

In this case I wanted to specifically delete files as we read them so I could confirm there are no other files (L344).

@TheNeuralBit TheNeuralBit merged commit 911771d into apache:master Mar 31, 2021
TheNeuralBit added a commit to TheNeuralBit/beam that referenced this pull request Mar 31, 2021
…pache#14374)

* Add (failing) windowed write test

* Dont re-use pandas sink instances across windows
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants