From f2d068d674dcb4de753d331576055afa3f26e3d8 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Wed, 11 Sep 2013 10:26:53 -0700 Subject: [PATCH] Register injections with Storm side. --- .../scala/com/twitter/summingbird/storm/StormEnv.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala index c6b9be3fe..a0887544c 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala @@ -19,6 +19,7 @@ package com.twitter.summingbird.storm import backtype.storm.Config import com.twitter.scalding.Args import com.twitter.summingbird.{ Env, Unzip2, Producer } +import com.twitter.summingbird.kryo.KryoRegistrationHelper import com.twitter.summingbird.scalding.Scalding /** @@ -38,11 +39,17 @@ case class StormEnv(override val jobName: String, override val args: Args) // of the environment and defining the builder). val ajob = abstractJob + val codecPairs = Seq(builder.keyCodecPair, builder.valueCodecPair) + val eventCodecPairs = builder.eventCodecPairs + val classSuffix = jobName.split("\\.").last Storm.remote(classSuffix, builder.opts) .withConfigUpdater { config => val c = ConfigBijection.invert(config) - ConfigBijection(ajob.transformConfig(c)) + val transformed = ConfigBijection(ajob.transformConfig(c)) + KryoRegistrationHelper.registerInjections(transformed, eventCodecPairs) + KryoRegistrationHelper.registerInjectionDefaults(transformed, codecPairs) + transformed }.run(builder.node.asInstanceOf[Producer[Storm, _]]) } }