From 49b484725afe378f2f042a4da003dbf25debc198 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Tue, 31 May 2016 14:23:27 +0200 Subject: [PATCH] [BEAM-314] Add zip compression support in TextIO --- .../apache/beam/sdk/io/CompressedSource.java | 64 +++++++ .../java/org/apache/beam/sdk/io/TextIO.java | 11 +- .../org/apache/beam/sdk/io/TextIOTest.java | 173 ++++++++++++++++++ 3 files changed, 247 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index 75bfc8f9447b2..48e0b7a678e65 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -30,6 +30,7 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.PushbackInputStream; import java.io.Serializable; import java.nio.ByteBuffer; @@ -37,6 +38,8 @@ import java.nio.channels.ReadableByteChannel; import java.util.NoSuchElementException; import java.util.zip.GZIPInputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; import javax.annotation.concurrent.GuardedBy; @@ -149,8 +152,69 @@ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channe return Channels.newChannel( new BZip2CompressorInputStream(Channels.newInputStream(channel))); } + }, + + /** + * Reads a byte channel assuming it is compressed with zip. + * If the zip file contains multiple entries, files in the zip are concatenated all together. + */ + ZIP { + @Override + public boolean matches(String fileName) { + return fileName.toLowerCase().endsWith(".zip"); + } + + public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel) + throws IOException { + FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel)); + return Channels.newChannel(zip); + } }; + /** + * Extend of {@link ZipInputStream} to automatically read all entries in the zip. + */ + private static class FullZipInputStream extends InputStream { + + private ZipInputStream zipInputStream; + private ZipEntry currentEntry; + + public FullZipInputStream(InputStream is) throws IOException { + super(); + zipInputStream = new ZipInputStream(is); + currentEntry = zipInputStream.getNextEntry(); + } + + @Override + public int read() throws IOException { + int result = zipInputStream.read(); + while (result == -1) { + currentEntry = zipInputStream.getNextEntry(); + if (currentEntry == null) { + return -1; + } else { + result = zipInputStream.read(); + } + } + return result; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int result = zipInputStream.read(b, off, len); + while (result == -1) { + currentEntry = zipInputStream.getNextEntry(); + if (currentEntry == null) { + return -1; + } else { + result = zipInputStream.read(b, off, len); + } + } + return result; + } + + } + /** * Returns {@code true} if the given file name implies that the contents are compressed * according to the compression embodied by this factory. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index dc50a8c58d552..a7e5e296756f0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -328,6 +328,11 @@ public PCollection apply(PInput input) { CompressedSource.from(new TextSource(filepattern, coder)) .withDecompression(CompressedSource.CompressionMode.GZIP)); break; + case ZIP: + read = org.apache.beam.sdk.io.Read.from( + CompressedSource.from(new TextSource(filepattern, coder)) + .withDecompression(CompressedSource.CompressionMode.ZIP)); + break; default: throw new IllegalArgumentException("Unknown compression mode: " + compressionType); } @@ -721,7 +726,11 @@ public static enum CompressionType { /** * BZipped. */ - BZIP2(".bz2"); + BZIP2(".bz2"), + /** + * Zipped. + */ + ZIP(".zip"); private String filenameSuffix; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 9a762d17dcfc8..c3a50849020f9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -81,6 +81,10 @@ import java.util.Arrays; import java.util.List; import java.util.zip.GZIPOutputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import javax.annotation.Nullable; /** * Tests for TextIO Read and Write transforms. @@ -419,6 +423,175 @@ public void testCompressedRead() throws Exception { p.run(); } + /** + * Create a zip file with the given lines. + * + * @param expected A list of expected lines, populated in the zip file. + * @param filename Optionally zip file name (can be null). + * @param fieldsEntries Fields to write in zip entries. + * @return The zip filename. + * @throws Exception In case of a failure during zip file creation. + */ + private String createZipFile(List expected, @Nullable String filename, String[] + ... + fieldsEntries) + throws Exception { + File tmpFile; + if (filename != null) { + tmpFile = tmpFolder.newFile(filename); + } else { + tmpFile = tmpFolder.newFile(); + } + String tmpFileName = tmpFile.getPath(); + + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile)); + PrintStream writer = new PrintStream(out, true /* auto-flush on write */); + + int index = 0; + for (String[] entry : fieldsEntries) { + out.putNextEntry(new ZipEntry(Integer.toString(index))); + for (String field : entry) { + writer.println(field); + expected.add(field); + } + out.closeEntry(); + index++; + } + + writer.close(); + out.close(); + + return tmpFileName; + } + + /** + * Read a zip compressed file. The user provides the ZIP compression type. + * We expect a PCollection with the lines. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedRead() throws Exception { + String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; + List expected = new ArrayList<>(); + + String filename = createZipFile(expected, null, lines); + + Pipeline p = TestPipeline.create(); + + TextIO.Read.Bound read = + TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP); + PCollection output = p.apply(read); + + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + + /** + * Read a zip compressed file. The ZIP compression type is auto-detected based on the + * file extension. We expect a PCollection with the lines. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedAutoDetected() throws Exception { + String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"}; + List expected = new ArrayList<>(); + + String filename = createZipFile(expected, "testZipCompressedAutoDetected.zip", lines); + + // test with auto-detect ZIP based on extension. + Pipeline p = TestPipeline.create(); + + TextIO.Read.Bound read = TextIO.Read.from(filename); + PCollection output = p.apply(read); + + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + + /** + * Read a ZIP compressed empty file. We expect an empty PCollection. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedReadWithEmptyFile() throws Exception { + String filename = createZipFile(new ArrayList(), null); + + Pipeline p = TestPipeline.create(); + + PCollection output = p.apply(TextIO.Read.from(filename).withCompressionType + (CompressionType.ZIP)); + PAssert.that(output).empty(); + + p.run(); + } + + /** + * Read a ZIP compressed file containing an unique empty entry. We expect an empty PCollection. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedReadWithEmptyEntry() throws Exception { + String filename = createZipFile(new ArrayList(), null, new String[]{ }); + + Pipeline p = TestPipeline.create(); + + PCollection output = p.apply(TextIO.Read.from(filename).withCompressionType + (CompressionType.ZIP)); + PAssert.that(output).empty(); + + p.run(); + } + + /** + * Read a ZIP compressed file with multiple entries. We expect a PCollection containing + * lines from all entries. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedReadWithMultiEntriesFile() throws Exception { + String[] entry0 = new String[]{ "first", "second", "three" }; + String[] entry1 = new String[]{ "four", "five", "six" }; + String[] entry2 = new String[]{ "seven", "eight", "nine" }; + + List expected = new ArrayList<>(); + + String filename = createZipFile(expected, null, entry0, entry1, entry2); + + Pipeline p = TestPipeline.create(); + + TextIO.Read.Bound read = + TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP); + PCollection output = p.apply(read); + + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + + /** + * Read a ZIP compressed file containing data, multiple empty entries, and then more data. We + * expect just the data back. + */ + @Test + @Category(NeedsRunner.class) + public void testZipCompressedReadWithComplexEmptyAndPresentEntries() throws Exception { + String filename = createZipFile( + new ArrayList(), + null, + new String[] {"cat"}, + new String[] {}, + new String[] {}, + new String[] {"dog"}); + List expected = ImmutableList.of("cat", "dog"); + + Pipeline p = TestPipeline.create(); + + PCollection output = + p.apply(TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP)); + PAssert.that(output).containsInAnyOrder(expected); + + p.run(); + } + @Test @Category(NeedsRunner.class) public void testGZIPReadWhenUncompressed() throws Exception {