Skip to content

Commit

Permalink
Some minor cleanup after SPARK-4550.
Browse files Browse the repository at this point in the history
JoshRosen this PR addresses the comments you left on apache#4450 after it got merged.

Author: Sandy Ryza <sandy@cloudera.com>

Closes apache#5916 from sryza/sandy-spark-4550-cleanup and squashes the following commits:

dee3d85 [Sandy Ryza] Some minor cleanup after SPARK-4550.
  • Loading branch information
sryza authored and JoshRosen committed May 6, 2015
1 parent c688e3c commit 0092abb
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) extends Ou
def write(key: Any, value: Any)

/**
* Notify the writer that a record worth of bytes has been written with writeBytes.
* Notify the writer that a record worth of bytes has been written with OutputStream#write.
*/
def recordWritten()

Expand Down Expand Up @@ -215,12 +215,7 @@ private[spark] class DiskBlockObjectWriter(

objOut.writeKey(key)
objOut.writeValue(value)
numRecordsWritten += 1
writeMetrics.incShuffleRecordsWritten(1)

if (numRecordsWritten % 32 == 0) {
updateBytesWritten()
}
recordWritten()
}

override def write(b: Int): Unit = throw new UnsupportedOperationException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ private[spark] class PartitionedSerializedPairBuffer[K, V](
if (keyStart < 0) {
throw new Exception(s"Can't grow buffer beyond ${1 << 31} bytes")
}
kvSerializationStream.writeObject[Any](key)
kvSerializationStream.writeKey[Any](key)
kvSerializationStream.flush()
val valueStart = kvBuffer.size
kvSerializationStream.writeObject[Any](value)
kvSerializationStream.writeValue[Any](value)
kvSerializationStream.flush()
val valueEnd = kvBuffer.size

Expand Down

0 comments on commit 0092abb

Please sign in to comment.