Skip to content

Commit

Permalink
Close channels
Browse files Browse the repository at this point in the history
Plus a little bit of cleanup.
  • Loading branch information
jean-philippe-martin committed May 6, 2016
1 parent 03dab3e commit c1b6397
Showing 1 changed file with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.io.BaseEncoding;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -49,8 +50,16 @@ public class ParallelCountBytes {

/**
* WorkUnit holds a buffer and the instructions for what to put in it.
*
* <p>Use it like this:
* <ol>
* <li> call()
* <li> the data is now in buf, you can access it directly
* <li> if need more, call resetForIndex(...) and go back to the top.
* <li> else, call close()
* </ol>
*/
private class WorkUnit implements Callable<WorkUnit> {
private static class WorkUnit implements Callable<WorkUnit>, Closeable {
public final ByteBuffer buf;
final SeekableByteChannel chan;
final int blockSize;
Expand All @@ -70,7 +79,7 @@ public WorkUnit call() throws IOException {
return this;
}
chan.position(pos);
// read until buffer is full, or EOF
// read until buffer it is full, or EOF
while (chan.read(buf) > 0) {};
return this;
}
Expand All @@ -80,16 +89,16 @@ public WorkUnit resetForIndex(int blockIndex) {
buf.flip();
return this;
}

public void close() throws IOException {
chan.close();
}
}

/**
* See the class documentation.
*/
public static void main(String[] args) throws IOException {
new ParallelCountBytes().start(args);
}

public void start(String[] args) throws IOException {
public static void main(String[] args) throws Exception {
if (args.length == 0 || args[0].equals("--help")) {
help();
return;
Expand All @@ -100,16 +109,15 @@ public void start(String[] args) throws IOException {
}

/**
* Print the length of the indicated file.
* Print the length and MD5 of the indicated file.
*
* <p>This uses the normal Java NIO Api, so it can take advantage of any installed
* NIO Filesystem provider without any extra effort.
*/
private void countFile(String fname) throws IOException{
private static void countFile(String fname) throws Exception {
// large buffers pay off
final int bufSize = 50 * 1024 * 1024;
Queue<Future<WorkUnit>> work = new ArrayDeque<>();
try {
Path path = Paths.get(new URI(fname));
long size = Files.size(path);
System.out.println(fname + ": " + size + " bytes.");
Expand All @@ -125,14 +133,15 @@ private void countFile(String fname) throws IOException{
for (blockIndex = 0; blockIndex < nThreads; blockIndex++) {
work.add(exec.submit(new WorkUnit(Files.newByteChannel(path), bufSize, blockIndex)));
}
while (true) {
while (!work.isEmpty()) {
WorkUnit full = work.remove().get();
md.update(full.buf.array(), 0, full.buf.position());
total += full.buf.position();
if (full.buf.hasRemaining()) {
break;
full.close();
} else {
work.add(exec.submit(full.resetForIndex(blockIndex++)));
}
work.add(exec.submit(full.resetForIndex(blockIndex++)));
}
exec.shutdown();

Expand All @@ -141,12 +150,9 @@ private void countFile(String fname) throws IOException{
String hex = String.valueOf(BaseEncoding.base16().encode(md.digest()));
System.out.println("The MD5 is: 0x" + hex);
if (total != size) {
System.out.println("Wait, this doesn't match! We saw " + total + " bytes, " +
"yet the file size is listed at " + size + " bytes.");
System.out.println("Wait, this doesn't match! We saw " + total + " bytes, "
+ "yet the file size is listed at " + size + " bytes.");
}
} catch (Exception ex) {
System.out.println(fname + ": " + ex.toString());
}
}

private static void help() {
Expand Down

0 comments on commit c1b6397

Please sign in to comment.