Skip to content

Commit

Permalink
[BEAM-314] Add zip compression support in TextIO
Browse files Browse the repository at this point in the history
  • Loading branch information
jbonofre committed Jun 6, 2016
1 parent c2146b9 commit 4737893
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.nio.channels.ReadableByteChannel;
import java.util.NoSuchElementException;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipInputStream;

import javax.annotation.concurrent.GuardedBy;

Expand Down Expand Up @@ -149,6 +150,26 @@ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channe
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
}
},

/**
* Reads a byte channel assuming it is compressed with zip.
*/
ZIP {
@Override
public boolean matches(String fileName) {
return fileName.toLowerCase().endsWith(".zip");
}

public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
throws IOException {
ZipInputStream zip = new ZipInputStream(Channels.newInputStream(channel));
// TODO deal with multi-entries zip
if (zip.getNextEntry() != null) {
return Channels.newChannel(zip);
}
throw new IllegalArgumentException("ZIP file doesn't contain any entry");
}
};

/**
Expand Down
11 changes: 10 additions & 1 deletion sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ public PCollection<T> apply(PInput input) {
CompressedSource.from(new TextSource<T>(filepattern, coder))
.withDecompression(CompressedSource.CompressionMode.GZIP));
break;
case ZIP:
read = org.apache.beam.sdk.io.Read.from(
CompressedSource.from(new TextSource<T>(filepattern, coder))
.withDecompression(CompressedSource.CompressionMode.ZIP));
break;
default:
throw new IllegalArgumentException("Unknown compression mode: " + compressionType);
}
Expand Down Expand Up @@ -722,7 +727,11 @@ public static enum CompressionType {
/**
* BZipped.
*/
BZIP2(".bz2");
BZIP2(".bz2"),
/**
* Zipped.
*/
ZIP(".zip");

private String filenameSuffix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import com.google.common.collect.ImmutableList;

import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -72,6 +73,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

/**
* Tests for TextIO Read and Write transforms.
Expand Down Expand Up @@ -402,6 +405,63 @@ public void testCompressedRead() throws Exception {
p.run();
}

@Test
@Category(NeedsRunner.class)
public void testZipCompressedRead() throws Exception {
String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
File tmpFile = tmpFolder.newFile();
String filename = tmpFile.getPath();

List<String> expected = new ArrayList<>();

ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));

out.putNextEntry(new ZipEntry("test.txt"));

PrintStream writer = new PrintStream(out);
for (String line : lines) {
writer.println(line);
expected.add(line);
}
writer.close();
out.close();

Pipeline p = TestPipeline.create();

TextIO.Read.Bound<String> read =
TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP);
PCollection<String> output = p.apply(read);

PAssert.that(output).containsInAnyOrder(expected);
p.run();

// test with auto-detect ZIP based on extension.
p = TestPipeline.create();

read = TextIO.Read.from(filename);
output = p.apply(read);

PAssert.that(output).containsInAnyOrder(expected);
p.run();
}

@Test
@Ignore
public void testZipCompressedReadWithEmptyFile() throws Exception {
File tmpFile = tmpFolder.newFile();
String filename = tmpFile.getPath();

ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));

out.close();

Pipeline p = TestPipeline.create();

expectedException.expect(IllegalArgumentException.class);
p.apply(TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP));

}

@Test
@Category(NeedsRunner.class)
public void testGZIPReadWhenUncompressed() throws Exception {
Expand Down

0 comments on commit 4737893

Please sign in to comment.