Skip to content

Commit

Permalink
Minor fix to cleanup logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 28, 2015
1 parent b5cc35b commit 03f35a4
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}

for (BlockObjectWriter writer : partitionWriters) {
writer.commitAndClose();
}
}

@Override
Expand All @@ -133,9 +137,7 @@ public long[] writePartitionedFile(
// We were passed an empty iterator
return lengths;
}
for (BlockObjectWriter writer : partitionWriters) {
writer.commitAndClose();
}

final FileOutputStream out = new FileOutputStream(outputFile, true);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
Expand All @@ -158,6 +160,7 @@ public long[] writePartitionedFile(
Closeables.close(out, threwException);
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
}

Expand Down

0 comments on commit 03f35a4

Please sign in to comment.