From 1826a0e4775bbeddf82109a85e1b4cc36624beb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Tue, 31 May 2016 14:23:27 +0200 Subject: [PATCH] [BEAM-314] Add zip compression support in TextIO --- .../apache/beam/sdk/io/CompressedSource.java | 20 +++ .../java/org/apache/beam/sdk/io/TextIO.java | 11 +- .../org/apache/beam/sdk/io/TextIOTest.java | 166 ++++++++++++++++++ 3 files changed, 196 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index 8bccf5f59d3f9..2b622138b46ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -37,6 +37,7 @@ import java.nio.channels.ReadableByteChannel; import java.util.NoSuchElementException; import java.util.zip.GZIPInputStream; +import java.util.zip.ZipInputStream; import javax.annotation.concurrent.GuardedBy; @@ -149,6 +150,25 @@ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channe return Channels.newChannel( new BZip2CompressorInputStream(Channels.newInputStream(channel))); } + }, + + /** + * Reads a byte channel assuming it is compressed with zip. + * + * Warning: only zip files that have exactly one entry (aka are ZIPs of a single file) are + * supported. + */ + ZIP { + @Override + public boolean matches(String fileName) { + return fileName.toLowerCase().endsWith(".zip"); + } + + public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) + throws IOException { + ZipInputStream zip = new ZipInputStream(Channels.newInputStream(channel)); + return Channels.newChannel(zip); + } }; /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 13cb45e2099a3..1dbdb8f1c5ca1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -329,6 +329,11 @@ public PCollection apply(PInput input) { CompressedSource.from(new TextSource(filepattern, coder)) .withDecompression(CompressedSource.CompressionMode.GZIP)); break; + case ZIP: + read = org.apache.beam.sdk.io.Read.from( + CompressedSource.from(new TextSource(filepattern, coder)) + .withDecompression(CompressedSource.CompressionMode.ZIP)); + break; default: throw new IllegalArgumentException("Unknown compression mode: " + compressionType); } @@ -722,7 +727,11 @@ public static enum CompressionType { /** * BZipped. */ - BZIP2(".bz2"); + BZIP2(".bz2"), + /** + * Zipped. + */ + ZIP(".zip"); private String filenameSuffix; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 724a1135f1538..762d1ceb9c7ac 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -72,6 +72,8 @@ import java.util.Arrays; import java.util.List; import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; /** * Tests for TextIO Read and Write transforms. @@ -410,6 +412,170 @@ public void testCompressedRead() throws Exception { p.run(); } + /** + * Read a zip compressed file. The user provides the ZIP compression type. + * We expect a PCollection with the lines. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedRead() throws Exception { + String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; + File tmpFile = tmpFolder.newFile(); + String filename = tmpFile.getPath(); + + List expected = new ArrayList<>(); + + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile)); + + out.putNextEntry(new ZipEntry("test.txt")); + + PrintStream writer = new PrintStream(out); + for (String line : lines) { + writer.println(line); + expected.add(line); + } + writer.close(); + out.close(); + + Pipeline p = TestPipeline.create(); + + TextIO.Read.Bound read = + TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP); + PCollection output = p.apply(read); + + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + + /** + * Read a zip compressed file. The ZIP compression type is auto-detected based on the + * file extension. We expect a PCollection with the lines. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedAutoDetected() throws Exception { + String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; + File tmpFile = tmpFolder.newFile("testZipCompressedAutoDetected.zip"); + String filename = tmpFile.getPath(); + + List expected = new ArrayList<>(); + + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile)); + + out.putNextEntry(new ZipEntry("test.txt")); + + PrintStream writer = new PrintStream(out); + for (String line : lines) { + writer.println(line); + expected.add(line); + } + writer.close(); + out.close(); + + // test with auto-detect ZIP based on extension. + Pipeline p = TestPipeline.create(); + + TextIO.Read.Bound read = TextIO.Read.from(filename); + PCollection output = p.apply(read); + + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + + /** + * Read a ZIP compressed empty file. We expect an exception. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedReadWithEmptyFile() throws Exception { + File tmpFile = tmpFolder.newFile(); + String filename = tmpFile.getPath(); + + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile)); + + out.close(); + + Pipeline p = TestPipeline.create(); + + PCollection output = p.apply(TextIO.Read.from(filename).withCompressionType + (CompressionType.ZIP)); + PAssert.that(output).empty(); + + p.run(); + } + + /** + * Read a ZIP compressed file containing an unique empty entry. We expect an empty PCollection. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedReadWithEmptyEntry() throws Exception { + File tmpFile = tmpFolder.newFile(); + String filename = tmpFile.getPath(); + + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile)); + out.putNextEntry(new ZipEntry("test.txt")); + out.closeEntry(); + + out.close(); + + Pipeline p = TestPipeline.create(); + + PCollection output = p.apply(TextIO.Read.from(filename).withCompressionType + (CompressionType.ZIP)); + PAssert.that(output).empty(); + + p.run(); + } + + /** + * Read a ZIP compressed file with multiple entries. Only the first entry is actually read. + * We expect an empty PCollection. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedReadWithMultiEntriesFile() throws Exception { + File tmpFile = tmpFolder.newFile("testZipCompressedAutoDetected.zip"); + String filename = tmpFile.getPath(); + + List expected = new ArrayList<>(); + + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile)); + + PrintStream writer = new PrintStream(out); + + out.putNextEntry(new ZipEntry("first.txt")); + for (String line : new String[]{ "first", "second", "three" }) { + writer.println(line); + expected.add(line); + } + out.closeEntry(); + out.putNextEntry(new ZipEntry("second.txt")); + for (String line : new String[]{ "four", "five", "six" }) { + writer.println(line); + expected.add(line); + } + out.closeEntry(); + out.putNextEntry(new ZipEntry("third.txt")); + for (String line : new String[]{ "seven", "eight", "nine" }) { + writer.println(line); + expected.add(line); + } + out.closeEntry(); + writer.close(); + + out.close(); + + Pipeline p = TestPipeline.create(); + + TextIO.Read.Bound read = + TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP); + PCollection output = p.apply(read); + + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + @Test @Category(NeedsRunner.class) public void testGZIPReadWhenUncompressed() throws Exception {