Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #198 from twitter/feature/fix_storm_ser
Browse files Browse the repository at this point in the history
Register Injections in StormEnv
  • Loading branch information
johnynek committed Sep 11, 2013
2 parents 3efd30c + f2d068d commit 8d4d4da
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.twitter.summingbird.storm
import backtype.storm.Config
import com.twitter.scalding.Args
import com.twitter.summingbird.{ Env, Unzip2, Producer }
import com.twitter.summingbird.kryo.KryoRegistrationHelper
import com.twitter.summingbird.scalding.Scalding

/**
Expand All @@ -38,11 +39,17 @@ case class StormEnv(override val jobName: String, override val args: Args)
// of the environment and defining the builder).
val ajob = abstractJob

val codecPairs = Seq(builder.keyCodecPair, builder.valueCodecPair)
val eventCodecPairs = builder.eventCodecPairs

val classSuffix = jobName.split("\\.").last
Storm.remote(classSuffix, builder.opts)
.withConfigUpdater { config =>
val c = ConfigBijection.invert(config)
ConfigBijection(ajob.transformConfig(c))
val transformed = ConfigBijection(ajob.transformConfig(c))
KryoRegistrationHelper.registerInjections(transformed, eventCodecPairs)
KryoRegistrationHelper.registerInjectionDefaults(transformed, codecPairs)
transformed
}.run(builder.node.asInstanceOf[Producer[Storm, _]])
}
}

0 comments on commit 8d4d4da

Please sign in to comment.