Skip to content

Commit

Permalink
Merge pull request alteryx#157 from rxin/kryo
Browse files Browse the repository at this point in the history
3 Kryo related changes.

1. Call Kryo setReferences before calling user specified Kryo registrator. This is done so the user specified registrator can override the default setting.

2. Register more internal classes (MapStatus, BlockManagerId).

3. Slightly refactored the internal class registration to allocate less memory.
  • Loading branch information
mateiz committed Nov 10, 2013
2 parents 3efc019 + c845611 commit 58d4f6c
Showing 1 changed file with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}

import org.apache.spark.{SerializableWritable, Logging}
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.storage.{GetBlock,GotBlock, PutBlock, StorageLevel, TestBlockId}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._

/**
* A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
*/
class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging {
private val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024

private val bufferSize = {
System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
}

def newKryoOutput() = new KryoOutput(bufferSize)

Expand All @@ -42,21 +46,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
val kryo = instantiator.newKryo()
val classLoader = Thread.currentThread.getContextClassLoader

val blockId = TestBlockId("1")
// Register some commonly used classes
val toRegister: Seq[AnyRef] = Seq(
ByteBuffer.allocate(1),
StorageLevel.MEMORY_ONLY,
PutBlock(blockId, ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
GotBlock(blockId, ByteBuffer.allocate(1)),
GetBlock(blockId),
1 to 10,
1 until 10,
1L to 10L,
1L until 10L
)

for (obj <- toRegister) kryo.register(obj.getClass)
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
// Do this before we invoke the user registrator so the user registrator can override this.
kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)

for (cls <- KryoSerializer.toRegister) kryo.register(cls)

// Allow sending SerializableWritable
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
Expand All @@ -78,10 +72,6 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
new AllScalaRegistrar().apply(kryo)

kryo.setClassLoader(classLoader)

// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)

kryo
}

Expand Down Expand Up @@ -165,3 +155,21 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
trait KryoRegistrator {
def registerClasses(kryo: Kryo)
}

private[serializer] object KryoSerializer {
// Commonly used classes.
private val toRegister: Seq[Class[_]] = Seq(
ByteBuffer.allocate(1).getClass,
classOf[StorageLevel],
classOf[PutBlock],
classOf[GotBlock],
classOf[GetBlock],
classOf[MapStatus],
classOf[BlockManagerId],
classOf[Array[Byte]],
(1 to 10).getClass,
(1 until 10).getClass,
(1L to 10L).getClass,
(1L until 10L).getClass
)
}

0 comments on commit 58d4f6c

Please sign in to comment.