Skip to content

Commit

Permalink
Closes #400
Browse files Browse the repository at this point in the history
  • Loading branch information
dhalperi committed Jun 23, 2016
2 parents 748b0c8 + 49b4847 commit f2d2ce5
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;

import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.NoSuchElementException;
import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import javax.annotation.concurrent.GuardedBy;

Expand Down Expand Up @@ -149,8 +152,69 @@ public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channe
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
}
},

/**
* Reads a byte channel assuming it is compressed with zip.
* If the zip file contains multiple entries, files in the zip are concatenated all together.
*/
ZIP {
@Override
public boolean matches(String fileName) {
return fileName.toLowerCase().endsWith(".zip");
}

public ReadableByteChannel createDecompressingChannel(ReadableByteChannel channel)
throws IOException {
FullZipInputStream zip = new FullZipInputStream(Channels.newInputStream(channel));
return Channels.newChannel(zip);
}
};

/**
* Extend of {@link ZipInputStream} to automatically read all entries in the zip.
*/
private static class FullZipInputStream extends InputStream {

private ZipInputStream zipInputStream;
private ZipEntry currentEntry;

public FullZipInputStream(InputStream is) throws IOException {
super();
zipInputStream = new ZipInputStream(is);
currentEntry = zipInputStream.getNextEntry();
}

@Override
public int read() throws IOException {
int result = zipInputStream.read();
while (result == -1) {
currentEntry = zipInputStream.getNextEntry();
if (currentEntry == null) {
return -1;
} else {
result = zipInputStream.read();
}
}
return result;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int result = zipInputStream.read(b, off, len);
while (result == -1) {
currentEntry = zipInputStream.getNextEntry();
if (currentEntry == null) {
return -1;
} else {
result = zipInputStream.read(b, off, len);
}
}
return result;
}

}

/**
* Returns {@code true} if the given file name implies that the contents are compressed
* according to the compression embodied by this factory.
Expand Down
11 changes: 10 additions & 1 deletion sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ public PCollection<T> apply(PInput input) {
CompressedSource.from(new TextSource<T>(filepattern, coder))
.withDecompression(CompressedSource.CompressionMode.GZIP));
break;
case ZIP:
read = org.apache.beam.sdk.io.Read.from(
CompressedSource.from(new TextSource<T>(filepattern, coder))
.withDecompression(CompressedSource.CompressionMode.ZIP));
break;
default:
throw new IllegalArgumentException("Unknown compression mode: " + compressionType);
}
Expand Down Expand Up @@ -721,7 +726,11 @@ public static enum CompressionType {
/**
* BZipped.
*/
BZIP2(".bz2");
BZIP2(".bz2"),
/**
* Zipped.
*/
ZIP(".zip");

private String filenameSuffix;

Expand Down
173 changes: 173 additions & 0 deletions sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import javax.annotation.Nullable;

/**
* Tests for TextIO Read and Write transforms.
Expand Down Expand Up @@ -419,6 +423,175 @@ public void testCompressedRead() throws Exception {
p.run();
}

/**
* Create a zip file with the given lines.
*
* @param expected A list of expected lines, populated in the zip file.
* @param filename Optionally zip file name (can be null).
* @param fieldsEntries Fields to write in zip entries.
* @return The zip filename.
* @throws Exception In case of a failure during zip file creation.
*/
private String createZipFile(List<String> expected, @Nullable String filename, String[]
...
fieldsEntries)
throws Exception {
File tmpFile;
if (filename != null) {
tmpFile = tmpFolder.newFile(filename);
} else {
tmpFile = tmpFolder.newFile();
}
String tmpFileName = tmpFile.getPath();

ZipOutputStream out = new ZipOutputStream(new FileOutputStream(tmpFile));
PrintStream writer = new PrintStream(out, true /* auto-flush on write */);

int index = 0;
for (String[] entry : fieldsEntries) {
out.putNextEntry(new ZipEntry(Integer.toString(index)));
for (String field : entry) {
writer.println(field);
expected.add(field);
}
out.closeEntry();
index++;
}

writer.close();
out.close();

return tmpFileName;
}

/**
* Read a zip compressed file. The user provides the ZIP compression type.
* We expect a PCollection with the lines.
*/
@Test
@Category(NeedsRunner.class)
public void testZipCompressedRead() throws Exception {
String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
List<String> expected = new ArrayList<>();

String filename = createZipFile(expected, null, lines);

Pipeline p = TestPipeline.create();

TextIO.Read.Bound<String> read =
TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP);
PCollection<String> output = p.apply(read);

PAssert.that(output).containsInAnyOrder(expected);
p.run();
}

/**
* Read a zip compressed file. The ZIP compression type is auto-detected based on the
* file extension. We expect a PCollection with the lines.
*/
@Test
@Category(NeedsRunner.class)
public void testZipCompressedAutoDetected() throws Exception {
String[] lines = {"Irritable eagle", "Optimistic jay", "Fanciful hawk"};
List<String> expected = new ArrayList<>();

String filename = createZipFile(expected, "testZipCompressedAutoDetected.zip", lines);

// test with auto-detect ZIP based on extension.
Pipeline p = TestPipeline.create();

TextIO.Read.Bound<String> read = TextIO.Read.from(filename);
PCollection<String> output = p.apply(read);

PAssert.that(output).containsInAnyOrder(expected);
p.run();
}

/**
* Read a ZIP compressed empty file. We expect an empty PCollection.
*/
@Test
@Category(NeedsRunner.class)
public void testZipCompressedReadWithEmptyFile() throws Exception {
String filename = createZipFile(new ArrayList<String>(), null);

Pipeline p = TestPipeline.create();

PCollection<String> output = p.apply(TextIO.Read.from(filename).withCompressionType
(CompressionType.ZIP));
PAssert.that(output).empty();

p.run();
}

/**
* Read a ZIP compressed file containing an unique empty entry. We expect an empty PCollection.
*/
@Test
@Category(NeedsRunner.class)
public void testZipCompressedReadWithEmptyEntry() throws Exception {
String filename = createZipFile(new ArrayList<String>(), null, new String[]{ });

Pipeline p = TestPipeline.create();

PCollection<String> output = p.apply(TextIO.Read.from(filename).withCompressionType
(CompressionType.ZIP));
PAssert.that(output).empty();

p.run();
}

/**
* Read a ZIP compressed file with multiple entries. We expect a PCollection containing
* lines from all entries.
*/
@Test
@Category(NeedsRunner.class)
public void testZipCompressedReadWithMultiEntriesFile() throws Exception {
String[] entry0 = new String[]{ "first", "second", "three" };
String[] entry1 = new String[]{ "four", "five", "six" };
String[] entry2 = new String[]{ "seven", "eight", "nine" };

List<String> expected = new ArrayList<>();

String filename = createZipFile(expected, null, entry0, entry1, entry2);

Pipeline p = TestPipeline.create();

TextIO.Read.Bound<String> read =
TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP);
PCollection<String> output = p.apply(read);

PAssert.that(output).containsInAnyOrder(expected);
p.run();
}

/**
* Read a ZIP compressed file containing data, multiple empty entries, and then more data. We
* expect just the data back.
*/
@Test
@Category(NeedsRunner.class)
public void testZipCompressedReadWithComplexEmptyAndPresentEntries() throws Exception {
String filename = createZipFile(
new ArrayList<String>(),
null,
new String[] {"cat"},
new String[] {},
new String[] {},
new String[] {"dog"});
List<String> expected = ImmutableList.of("cat", "dog");

Pipeline p = TestPipeline.create();

PCollection<String> output =
p.apply(TextIO.Read.from(filename).withCompressionType(CompressionType.ZIP));
PAssert.that(output).containsInAnyOrder(expected);

p.run();
}

@Test
@Category(NeedsRunner.class)
public void testGZIPReadWhenUncompressed() throws Exception {
Expand Down

0 comments on commit f2d2ce5

Please sign in to comment.