Skip to content

Commit

Permalink
Some minor cleanup after SPARK-4550.
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed May 5, 2015
1 parent f32e69e commit dee3d85
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) extends Ou
def write(key: Any, value: Any)

/**
* Notify the writer that a record worth of bytes has been written with writeBytes.
* Notify the writer that a record worth of bytes has been written with OutputStream#write.
*/
def recordWritten()

Expand Down Expand Up @@ -215,12 +215,7 @@ private[spark] class DiskBlockObjectWriter(

objOut.writeKey(key)
objOut.writeValue(value)
numRecordsWritten += 1
writeMetrics.incShuffleRecordsWritten(1)

if (numRecordsWritten % 32 == 0) {
updateBytesWritten()
}
recordWritten()
}

override def write(b: Int): Unit = throw new UnsupportedOperationException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ private[spark] class PartitionedSerializedPairBuffer[K, V](
if (keyStart < 0) {
throw new Exception(s"Can't grow buffer beyond ${1 << 31} bytes")
}
kvSerializationStream.writeObject[Any](key)
kvSerializationStream.writeKey[Any](key)
kvSerializationStream.flush()
val valueStart = kvBuffer.size
kvSerializationStream.writeObject[Any](value)
kvSerializationStream.writeValue[Any](value)
kvSerializationStream.flush()
val valueEnd = kvBuffer.size

Expand Down

0 comments on commit dee3d85

Please sign in to comment.