Skip to content

Commit

Permalink
[BEAM-314] Add zip compression support in TextIO
Browse files Browse the repository at this point in the history
  • Loading branch information
jbonofre committed Jun 13, 2016
1 parent e1b305e commit 1826a0e
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.nio.channels.ReadableByteChannel;
import java.util.NoSuchElementException;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipInputStream;

import javax.annotation.concurrent.GuardedBy;

Expand Down Expand Up @@ -149,6 +150,25 @@ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channe
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
}
},

/**
* Reads a byte channel assuming it is compressed with zip.
*
* Warning: only zip files that have exactly one entry (aka are ZIPs of a single file) are
* supported.
*/
ZIP {
@Override
public boolean matches(String fileName) {
return fileName.toLowerCase().endsWith(".zip");
}

public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
throws IOException {
ZipInputStream zip = new ZipInputStream(Channels.newInputStream(channel));
return Channels.newChannel(zip);
}
};

/**
Expand Down
11 changes: 10 additions & 1 deletion sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ public PCollection<T> apply(PInput input) {
CompressedSource.from(new TextSource<T>(filepattern, coder))
.withDecompression(CompressedSource.CompressionMode.GZIP));
break;
case ZIP:
read = org.apache.beam.sdk.io.Read.from(
CompressedSource.from(new TextSource<T>(filepattern, coder))
.withDecompression(CompressedSource.CompressionMode.ZIP));
break;
default:
throw new IllegalArgumentException("Unknown compression mode: " + compressionType);
}
Expand Down Expand Up @@ -722,7 +727,11 @@ public static enum CompressionType {
/**
* BZipped.
*/
BZIP2(".bz2");
BZIP2(".bz2"),
/**
* Zipped.
*/
ZIP(".zip");

private String filenameSuffix;

Expand Down
166 changes: 166 additions & 0 deletions sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

/**
* Tests for TextIO Read and Write transforms.
Expand Down Expand Up @@ -410,6 +412,170 @@ public void testCompressedRead() throws Exception {
p.run();
}

/**
* Read a zip compressed file. The user provides the ZIP compression type.
* We expect a PCollection with the lines.
*/
@Test
@Category(NeedsRunner.class)
public void testZipCompressedRead() throws Exception {
String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
File tmpFile = tmpFolder.newFile();
String filename = tmpFile.getPath();

List<String> expected = new ArrayList<>();

ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));

out.putNextEntry(new ZipEntry("test.txt"));

PrintStream writer = new PrintStream(out);
for (String line : lines) {
writer.println(line);
expected.add(line);
}
writer.close();
out.close();

Pipeline p = TestPipeline.create();

TextIO.Read.Bound<String> read =
TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP);
PCollection<String> output = p.apply(read);

PAssert.that(output).containsInAnyOrder(expected);
p.run();
}

/**
* Read a zip compressed file. The ZIP compression type is auto-detected based on the
* file extension. We expect a PCollection with the lines.
*/
@Test
@Category(NeedsRunner.class)
public void testZipCompressedAutoDetected() throws Exception {
String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
File tmpFile = tmpFolder.newFile("testZipCompressedAutoDetected.zip");
String filename = tmpFile.getPath();

List<String> expected = new ArrayList<>();

ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));

out.putNextEntry(new ZipEntry("test.txt"));

PrintStream writer = new PrintStream(out);
for (String line : lines) {
writer.println(line);
expected.add(line);
}
writer.close();
out.close();

// test with auto-detect ZIP based on extension.
Pipeline p = TestPipeline.create();

TextIO.Read.Bound<String> read = TextIO.Read.from(filename);
PCollection<String> output = p.apply(read);

PAssert.that(output).containsInAnyOrder(expected);
p.run();
}

/**
* Read a ZIP compressed empty file. We expect an exception.
*/
@Test
@Category(NeedsRunner.class)
public void testZipCompressedReadWithEmptyFile() throws Exception {
File tmpFile = tmpFolder.newFile();
String filename = tmpFile.getPath();

ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));

out.close();

Pipeline p = TestPipeline.create();

PCollection<String> output = p.apply(TextIO.Read.from(filename).withCompressionType
(CompressionType.ZIP));
PAssert.that(output).empty();

p.run();
}

/**
* Read a ZIP compressed file containing an unique empty entry. We expect an empty PCollection.
*/
@Test
@Category(NeedsRunner.class)
public void testZipCompressedReadWithEmptyEntry() throws Exception {
File tmpFile = tmpFolder.newFile();
String filename = tmpFile.getPath();

ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));
out.putNextEntry(new ZipEntry("test.txt"));
out.closeEntry();

out.close();

Pipeline p = TestPipeline.create();

PCollection<String> output = p.apply(TextIO.Read.from(filename).withCompressionType
(CompressionType.ZIP));
PAssert.that(output).empty();

p.run();
}

/**
* Read a ZIP compressed file with multiple entries. Only the first entry is actually read.
* We expect an empty PCollection.
*/
@Test
@Category(NeedsRunner.class)
public void testZipCompressedReadWithMultiEntriesFile() throws Exception {
File tmpFile = tmpFolder.newFile("testZipCompressedAutoDetected.zip");
String filename = tmpFile.getPath();

List<String> expected = new ArrayList<>();

ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));

PrintStream writer = new PrintStream(out);

out.putNextEntry(new ZipEntry("first.txt"));
for (String line : new String[]{ "first", "second", "three" }) {
writer.println(line);
expected.add(line);
}
out.closeEntry();
out.putNextEntry(new ZipEntry("second.txt"));
for (String line : new String[]{ "four", "five", "six" }) {
writer.println(line);
expected.add(line);
}
out.closeEntry();
out.putNextEntry(new ZipEntry("third.txt"));
for (String line : new String[]{ "seven", "eight", "nine" }) {
writer.println(line);
expected.add(line);
}
out.closeEntry();
writer.close();

out.close();

Pipeline p = TestPipeline.create();

TextIO.Read.Bound<String> read =
TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP);
PCollection<String> output = p.apply(read);

PAssert.that(output).containsInAnyOrder(expected);
p.run();
}

@Test
@Category(NeedsRunner.class)
public void testGZIPReadWhenUncompressed() throws Exception {
Expand Down

0 comments on commit 1826a0e

Please sign in to comment.