-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-314] Add zip compression support in TextIO #400
Conversation
R: @dhalperi |
@@ -404,6 +406,37 @@ public void testCompressedRead() throws Exception { | |||
|
|||
@Test | |||
@Category(NeedsRunner.class) | |||
public void testZipCompressedRead() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a test with an empty (but valid) zip file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, let me do it.
Rebased and updated. I have to figure out the expectException issue in the test. |
if (zip.getNextEntry() != null) { | ||
return Channels.newChannel(zip); | ||
} | ||
throw new IllegalArgumentException("ZIP file doesn't contain any entry"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will the behavior be today on a multi-entry zip? Will it silently produce bad data? Fail in some way?
Please comment, and then also add a test.
Rebased and updated. |
p.run(); | ||
|
||
// test with auto-detect ZIP based on extension. | ||
p = TestPipeline.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please make this a separate unit test -- that way they can pass or fail independently :)
Hi JB, This is looking pretty good! But I have some questions about the tests. Specifically, since we mostly test empty files it seems tough to validate that the decompressor does exactly what we expect. I've added some suggestions for improvements. Thanks, |
Updated. I added explanations on each test.
|
PCollection<String> output = p.apply(read); | ||
|
||
PAssert.that(output).empty(); | ||
p.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd really like to find a way that this case either concats all the files or throws an exception. The current behavior is effectively silent data loss, the worst possible case!
One way to handle this could be a utility InputStream class that wraps ZipInputStream
and under-the-hood concats all the different entry streams. This is probably the base case.
A second possibility is that once the input stream hits EOF, we check for a next entry and only then throw an exception. But this is less desirable as we don't fail until pretty late.
Can you look into it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like your idea of wrapping the stream. Let me figure it out.
I updated the PR to use another approach: I'm using directly the |
|
||
/** | ||
* Read a ZIP compressed file with multiple entries. Only the first entry is actually read. | ||
* We expect an empty PCollection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update the javadoc to be correct.
Thanks JB -- looking great.
Other than that, I'd love to see less code reuse in the tests. This is considered a strong recommendation, but I won't withhold and LGTM on that basis ;). |
My bad about the javadoc. I will fix that. I'm also fixing the multi-entries test. Then I will refactore a bit to use a share method for zip file creation ;) |
PR rebased and updated based on Dan's comments. |
String tmpFileName = tmpFile.getPath(); | ||
|
||
ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile)); | ||
PrintStream writer = new PrintStream(out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use
PrintStream writer = new PrintStream(out, true /* auto-flush on write */);
this way we can be sure that the PrintStream
itself does not buffer any data. See javadoc,
Looking great! Only trivial changes left. |
Rebased and updated according to Dan's comments. |
There are tests failure in the DirectRunner due to the latest changes and rebase. I will fix that. |
Hi JB! One small request:
I checked out your PR and tried to get the tests to pass -- but I was never able to make both the single-file and multi-file tests pass. I think that right now the ZipInputStream does not automatically concat all file contents without more effort. Might need to make a new class that wraps the ZipInputStream and manually calls |
Hi Dan, Thanks for the update and ok for the rebase (sorry about that). I'm checking and fixing the issue. |
Working on a ZipInputStream wrapper (which call getNextEntry() behind the hood). Update soon. |
…l entries in a ZIP
@dhalperi It should be OK now. |
currentEntry = getNextEntry(); | ||
} | ||
|
||
public int read(byte[] b, int off, int len) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are a lot of variants of read
in an InputStream
. Is it obvious that this is the only variant you need to override? (May well be, I just don't know).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's the only read using in the IOChannel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think you should override all applicable functions from InputStream -- implementation of IOChannel might change and/or this stream might be used in a different way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only read that has to be implemented, all the others are just wrappers for this read method.
int read() is the only one that you may want to handle specially since its very inefficient if its not implemented but its really inefficient for people to use so having an efficient implementation is useful.
JB this is awesome. One small fix to get the multiple-eof-in-a-row case fixed and let's merge it. A big accomplishment! |
Great feedback guys ! Much appreciated. Fixing the multi-entries. Thanks ! |
…fixed the read() methods
Fixed the |
LGTM. Jenkins flaked on network errors, but Travis is fine. Merging. |
…pache#400) * Add Additional Exists check to FileIOChannelFactory#create This ensures that if the folder did not exist when first checked, but did by the time mkdirs was executed (and thus mkdirs returned false) the create will not fail. * Dynamically choose number of shards in the InProcessPipelineRunner Add a Write Override Factory that limits the number of shards if unspecified. This ensures that we will not write an output file per-key due to bundling. Do so by obtaining a count of the elements and obtaining the number of shards based on the number of outputs.
…channel. This closes apache#400
PiperOrigin-RevId: 453710308
…dev pin (apache#400) Expand pins on library dependencies in preparation for these dependencies taking a new major version. See googleapis/google-cloud-python#10566.
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.
Add zip compression support in TextIO.