Skip to content

Commit

Permalink
Add more tests + defensive programming to DiskBlockObjectWriter.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 28, 2015
1 parent 16564eb commit 8b8fb9e
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,20 @@ private[spark] class DiskBlockObjectWriter(
objOut.flush()
bs.flush()
close()
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
}
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
commitAndCloseHasBeenCalled = true
}

// Discard current writes. We do this by flushing the outstanding writes and then
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)

if (initialized) {
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
objOut.flush()
bs.flush()
close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@ package org.apache.spark.storage

import java.io.File

import org.scalatest.FunSuite
import org.scalatest.{BeforeAndAfterEach, FunSuite}

import org.apache.spark.SparkConf
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils

class BlockObjectWriterSuite extends FunSuite {
class BlockObjectWriterSuite extends FunSuite with BeforeAndAfterEach {

var tempDir: File = _

override def beforeEach(): Unit = {
tempDir = Utils.createTempDir()
}

override def afterEach(): Unit = {
Utils.deleteRecursively(tempDir)
}

test("verify write metrics") {
val file = new File(Utils.createTempDir(), "somefile")
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
Expand All @@ -49,7 +60,7 @@ class BlockObjectWriterSuite extends FunSuite {
}

test("verify write metrics on revert") {
val file = new File(Utils.createTempDir(), "somefile")
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
Expand All @@ -72,7 +83,7 @@ class BlockObjectWriterSuite extends FunSuite {
}

test("Reopening a closed block writer") {
val file = new File(Utils.createTempDir(), "somefile")
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
Expand All @@ -83,4 +94,70 @@ class BlockObjectWriterSuite extends FunSuite {
writer.open()
}
}

test("calling revertPartialWritesAndClose() on a closed block writer should have no effect") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
writer.commitAndClose()
val bytesWritten = writeMetrics.shuffleBytesWritten
assert(writeMetrics.shuffleRecordsWritten === 1000)
writer.revertPartialWritesAndClose()
assert(writeMetrics.shuffleRecordsWritten === 1000)
assert(writeMetrics.shuffleBytesWritten === bytesWritten)
}

test("commitAndClose() should be idempotent") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
writer.commitAndClose()
val bytesWritten = writeMetrics.shuffleBytesWritten
val writeTime = writeMetrics.shuffleWriteTime
assert(writeMetrics.shuffleRecordsWritten === 1000)
writer.commitAndClose()
assert(writeMetrics.shuffleRecordsWritten === 1000)
assert(writeMetrics.shuffleBytesWritten === bytesWritten)
assert(writeMetrics.shuffleWriteTime === writeTime)
}

test("revertPartialWritesAndClose() should be idempotent") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
writer.revertPartialWritesAndClose()
val bytesWritten = writeMetrics.shuffleBytesWritten
val writeTime = writeMetrics.shuffleWriteTime
assert(writeMetrics.shuffleRecordsWritten === 0)
writer.revertPartialWritesAndClose()
assert(writeMetrics.shuffleRecordsWritten === 0)
assert(writeMetrics.shuffleBytesWritten === bytesWritten)
assert(writeMetrics.shuffleWriteTime === writeTime)
}

test("fileSegment() can only be called after commitAndClose() has been called") {
val file = new File(tempDir, "somefile")
val writeMetrics = new ShuffleWriteMetrics()
val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
for (i <- 1 to 1000) {
writer.write(i, i)
}
intercept[IllegalStateException] {
writer.fileSegment()
}
writer.close()
}
}

0 comments on commit 8b8fb9e

Please sign in to comment.