From 5ffc48d26f336fa24c0d8d4ea4f23f7ef41ccf72 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Fri, 25 Oct 2013 16:17:35 -0700 Subject: [PATCH 01/12] Compiles first pass --- .../twitter/summingbird/storm/StormEnv.scala | 12 ++- .../summingbird/SummingbirdConfig.scala | 102 ++++++++++++++++++ .../summingbird/storm/ConfigBijection.scala | 38 ------- .../summingbird/storm/StormConfig.scala | 34 ++++++ .../summingbird/storm/StormPlatform.scala | 33 +++--- 5 files changed, 163 insertions(+), 56 deletions(-) create mode 100644 summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala delete mode 100644 summingbird-storm/src/main/scala/com/twitter/summingbird/storm/ConfigBijection.scala create mode 100644 summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormConfig.scala diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala index 6492c75b6..d44da18fe 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala @@ -25,6 +25,7 @@ import com.twitter.summingbird.{ Env, Unzip2, Producer, TailProducer } import com.twitter.summingbird.batch.{ BatchID, Timestamp } import com.twitter.summingbird.scalding.Scalding import scala.collection.JavaConverters._ +import com.twitter.summingbird.{MutableStringConfig, StormConfig} /** * Storm-specific extension to Env. StormEnv handles storm-specific configuration @@ -48,10 +49,11 @@ case class StormEnv(override val jobName: String, override val args: Args) .getOrElse(jobName.split("\\.").last) Storm.remote(builder.opts) - .withConfigUpdater { config => - val c = ConfigBijection.invert(config) - val transformed = ConfigBijection(ajob.transformConfig(c)) - val kryoConfig = new JavaMapConfig(transformed) + .withConfigUpdater { c => + val transformed = ajob.transformConfig(c.toMap) + val kryoConfig = new com.twitter.chill.config.Config with MutableStringConfig { + val summingbirdConfig = c.updated(transformed) + } ConfInst.setSerialized( kryoConfig, classOf[ScalaKryoInstantiator], @@ -67,7 +69,7 @@ case class StormEnv(override val jobName: String, override val args: Args) } }) ) - transformed + kryoConfig.unwrap }.run( builder.node.name(builder.id).asInstanceOf[TailProducer[Storm, _]], classSuffix diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala new file mode 100644 index 000000000..b171067b3 --- /dev/null +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala @@ -0,0 +1,102 @@ +/* + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package com.twitter.summingbird + +trait SummingbirdConfig { self => + def get(key: String): Option[AnyRef] + def put(key: String, v: AnyRef): SummingbirdConfig + final def +(kv: (String, AnyRef)) = put(kv._1, kv._2) + final def -(k: String) = remove(k) + def remove(key: String): SummingbirdConfig + def keys: Iterable[String] + def updates: Map[String, AnyRef] + def removes: Set[String] + def toMap: Map[String, AnyRef] = new Map[String, AnyRef] { + def get(k: String) = self.get(k) + def +[B1 >: AnyRef](kv: (String, B1)) = self.put(kv._1, kv._2.asInstanceOf[AnyRef]).toMap + def -(k: String) = self.-(k).toMap + def iterator = self.keys.iterator.map(k => (k, self.get(k).get)) + } + def updated(newMap: Map[String, AnyRef]): SummingbirdConfig = { + val removedKeys: Set[String] = keys.toSet -- newMap.keys + val changedOrAddedKeys = newMap.flatMap{ case (k, v) => + val oldVal = get(k) + if(oldVal != Some(v)) { + Some((k, v)) + } else None + } + val newWithoutRemoved = removedKeys.foldLeft(self)(_.remove(_)) + changedOrAddedKeys.foldLeft(newWithoutRemoved) {_ + _} + } +} + +trait MutableStringConfig { + protected def summingbirdConfig: SummingbirdConfig + private var config = summingbirdConfig + def get(key: String) = { + config.get(key) match { + case Some(s) => s.toString + case None => null + } + } + def set(key: String, value: String) { + config = config.put(key, value) + } + def unwrap = config +} + +trait ReadableMap { + def get(key: String): Option[AnyRef] + def keys: Set[String] +} + + +object WrappingConfig { + def apply(backingConfig: ReadableMap) = new WrappingConfig( + backingConfig, + Map[String, AnyRef](), + Set[String]()) +} + +case class WrappingConfig(private val backingConfig: ReadableMap, + updates: Map[String, AnyRef], + removes: Set[String]) extends SummingbirdConfig { + + def get(key: String) = { + updates.get(key) match { + case s@Some(_) => s + case None => + if(removes.contains(key)) + None + else + backingConfig.get(key) + } + } + + def put(key: String, v: AnyRef): WrappingConfig = { + assert(v != null) + this.copy(updates = (updates + (key -> v)), removes = (removes - key)) + } + + def remove(key: String): WrappingConfig = { + this.copy(updates = (updates - key), removes = (removes + key)) + } + + def keys: Iterable[String] = { + ((backingConfig.keys ++ updates.keys) -- removes) + } +} \ No newline at end of file diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/ConfigBijection.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/ConfigBijection.scala deleted file mode 100644 index a6d926a9d..000000000 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/ConfigBijection.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* -Copyright 2013 Twitter, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package com.twitter.summingbird.storm - -import backtype.storm.Config -import com.twitter.bijection.Bijection -import scala.collection.JavaConverters._ - -/** - * Converts a scala map to and from a storm Config instance. - * - * @author Oscar Boykin - * @author Sam Ritchie - * @author Ashu Singhal - */ - -object ConfigBijection extends Bijection[Map[String,AnyRef], Config] { - override def apply(config: Map[String,AnyRef]) = { - val stormConf = new Config - config foreach { case (k,v) => stormConf.put(k,v) } - stormConf - } - override def invert(config: Config) = config.asScala.toMap -} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormConfig.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormConfig.scala new file mode 100644 index 000000000..466410074 --- /dev/null +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormConfig.scala @@ -0,0 +1,34 @@ +/* + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package com.twitter.summingbird + +import backtype.storm.{Config => BacktypeStormConfig} +import scala.collection.JavaConverters._ + +object StormConfig { + def apply(backingConfig: BacktypeStormConfig) = WrappingConfig(new WrappedBacktypeStormConfig(backingConfig)) +} + +class WrappedBacktypeStormConfig(backingConfig: BacktypeStormConfig) extends ReadableMap { + def get(key: String): Option[AnyRef] = + if(backingConfig.containsKey(key)) + Some(backingConfig.get(key)) + else + None + + val keys: Set[String] = backingConfig.keySet.asScala.toSet +} diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index d72a76908..915383055 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -17,7 +17,7 @@ package com.twitter.summingbird.storm import Constants._ -import backtype.storm.{Config, LocalCluster, StormSubmitter} +import backtype.storm.{Config => BacktypeStormConfig, LocalCluster, StormSubmitter} import backtype.storm.generated.StormTopology import backtype.storm.topology.{BoltDeclarer, TopologyBuilder} import backtype.storm.tuple.Fields @@ -90,7 +90,7 @@ object Storm { implicit def spoutAsSource[T](spout: Spout[T])(implicit timeOf: TimeExtractor[T]): Producer[Storm, T] = source(spout, None)(timeOf) } -abstract class Storm(options: Map[String, Options], updateConf: Config => Config) extends Platform[Storm] { +abstract class Storm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig) extends Platform[Storm] { @transient private val logger = LoggerFactory.getLogger(classOf[Storm]) type Source[+T] = StormSource[T] @@ -232,7 +232,7 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config * Base storm config instances used by the Storm platform. */ def baseConfig = { - val config = new Config + val config = new BacktypeStormConfig config.setFallBackOnJavaSerialization(false) config.setKryoFactory(classOf[com.twitter.chill.storm.BlizzardKryoFactory]) config.setMaxSpoutPending(1000) @@ -244,11 +244,18 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config classOf[ScalaKryoInstantiator], new ScalaKryoInstantiator() ) - transformConfig(config) + val state = transformConfig(StormConfig(config)) + + logger.debug("User config changes:") + logger.debug("Removes: {}", state.removes) + logger.debug("Updates: {}", state.updates) + + state.removes.foreach(config.remove(_)) + state.updates.foreach(kv => config.put(kv._1, kv._2)) + config } - def transformConfig(base: Config): Config = updateConf(base) - def withConfigUpdater(fn: Config => Config): Storm + def withConfigUpdater(fn: SummingbirdConfig => SummingbirdConfig): Storm def plan[T](tail: TailProducer[Storm, T]): StormTopology = { implicit val topologyBuilder = new TopologyBuilder @@ -269,10 +276,10 @@ abstract class Storm(options: Map[String, Options], updateConf: Config => Config def run(topology: StormTopology, jobName: String): Unit } -class RemoteStorm(options: Map[String, Options], updateConf: Config => Config) extends Storm(options, updateConf) { +class RemoteStorm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig) extends Storm(options, transformConfig) { - override def withConfigUpdater(fn: Config => Config) = - new RemoteStorm(options, updateConf.andThen(fn)) + override def withConfigUpdater(fn: SummingbirdConfig => SummingbirdConfig) = + new RemoteStorm(options, transformConfig.andThen(fn)) override def run(topology: StormTopology, jobName: String): Unit = { val topologyName = "summingbird_" + jobName @@ -280,12 +287,12 @@ class RemoteStorm(options: Map[String, Options], updateConf: Config => Config) e } } -class LocalStorm(options: Map[String, Options], updateConf: Config => Config) - extends Storm(options, updateConf) { +class LocalStorm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig) + extends Storm(options, transformConfig) { lazy val localCluster = new LocalCluster - override def withConfigUpdater(fn: Config => Config) = - new LocalStorm(options, updateConf.andThen(fn)) + override def withConfigUpdater(fn: SummingbirdConfig => SummingbirdConfig) = + new LocalStorm(options, transformConfig.andThen(fn)) override def run(topology: StormTopology, jobName: String): Unit = { val topologyName = "summingbird_" + jobName From babbcf0c3324dafbecdb6a4e9a5835eb3a8d6f6c Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 28 Oct 2013 10:56:14 -0700 Subject: [PATCH 02/12] Pull the registrar handling code out into a separate module and try clean it up nicely --- project/Build.scala | 11 +++++ .../twitter/summingbird/storm/StormEnv.scala | 22 +-------- .../summingbird/chill/SBChillRegistrar.scala | 47 +++++++++++++++++++ .../summingbird/storm/StormPlatform.scala | 46 +++++++++--------- 4 files changed, 85 insertions(+), 41 deletions(-) create mode 100644 summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala diff --git a/project/Build.scala b/project/Build.scala index f7917d03b..70e99b65a 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -109,6 +109,7 @@ object SummingbirdBuild extends Build { summingbirdStorm, summingbirdScalding, summingbirdBuilder, + summingbirdChill, summingbirdExample ) @@ -149,6 +150,15 @@ object SummingbirdBuild extends Build { ) ) + lazy val summingbirdChill = module("chill").settings( + libraryDependencies ++= Seq( + "com.twitter" %% "chill" % chillVersion + ) + ).dependsOn( + summingbirdCore, + summingbirdBatch + ) + lazy val summingbirdClient = module("client").settings( libraryDependencies ++= Seq( "com.twitter" %% "algebird-core" % algebirdVersion, @@ -194,6 +204,7 @@ object SummingbirdBuild extends Build { ).dependsOn( summingbirdCore % "test->test;compile->compile", summingbirdOnline, + summingbirdChill, summingbirdBatch ) diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala index d44da18fe..47ab295cb 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala @@ -49,27 +49,9 @@ case class StormEnv(override val jobName: String, override val args: Args) .getOrElse(jobName.split("\\.").last) Storm.remote(builder.opts) + .withRegistrars(ajob.registrars ++ builder.registrar.getRegistrars.asScala) .withConfigUpdater { c => - val transformed = ajob.transformConfig(c.toMap) - val kryoConfig = new com.twitter.chill.config.Config with MutableStringConfig { - val summingbirdConfig = c.updated(transformed) - } - ConfInst.setSerialized( - kryoConfig, - classOf[ScalaKryoInstantiator], - new ScalaKryoInstantiator() - .withRegistrar(builder.registrar) - .withRegistrar(new IterableRegistrar(ajob.registrars)) - .withRegistrar(new IKryoRegistrar { - def apply(k: Kryo) { - List(classOf[BatchID], classOf[Timestamp]) - .foreach { cls => - if(!k.alreadyRegistered(cls)) k.register(cls) - } - } - }) - ) - kryoConfig.unwrap + c.updated(ajob.transformConfig(c.toMap)) }.run( builder.node.name(builder.id).asInstanceOf[TailProducer[Storm, _]], classSuffix diff --git a/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala b/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala new file mode 100644 index 000000000..dfe1dc5eb --- /dev/null +++ b/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala @@ -0,0 +1,47 @@ +/* +Copyright 2013 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.summingbird.chill +import com.twitter.summingbird.{MutableStringConfig, SummingbirdConfig} +import com.twitter.chill.{ScalaKryoInstantiator, IKryoRegistrar, Kryo, toRich} +import com.twitter.chill.java.IterableRegistrar +import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst } +import com.twitter.summingbird.batch.{BatchID, Timestamp} + +object SBChillRegistrar { + def apply(cfg: SummingbirdConfig, iterableRegistrars: List[IKryoRegistrar]): SummingbirdConfig = { + val kryoConfig = new com.twitter.chill.config.Config with MutableStringConfig { + val summingbirdConfig = cfg + } + + ConfInst.setSerialized( + kryoConfig, + classOf[ScalaKryoInstantiator], + new ScalaKryoInstantiator() + .withRegistrar(new IterableRegistrar(iterableRegistrars)) + .withRegistrar(new IKryoRegistrar { + def apply(k: Kryo) { + List(classOf[BatchID], classOf[Timestamp]) + .foreach { cls => + if(!k.alreadyRegistered(cls)) k.register(cls) + } + } + }) + ) + kryoConfig.unwrap + } +} + diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index 915383055..a82433568 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -24,11 +24,11 @@ import backtype.storm.tuple.Fields import backtype.storm.tuple.Tuple import com.twitter.algebird.Monoid -import com.twitter.chill.ScalaKryoInstantiator -import com.twitter.chill.config.{ ConfiguredInstantiator, JavaMapConfig } +import com.twitter.chill.IKryoRegistrar import com.twitter.storehaus.algebra.MergeableStore import com.twitter.storehaus.algebra.MergeableStore.enrich import com.twitter.summingbird._ +import com.twitter.summingbird.chill._ import com.twitter.summingbird.batch.{BatchID, Batcher} import com.twitter.summingbird.storm.option.{AnchorTuples, IncludeSuccessHandler} import com.twitter.summingbird.util.CacheSize @@ -71,10 +71,10 @@ case class SpoutSource[+T](spout: Spout[(Long, T)], parallelism: Option[option.S object Storm { def local(options: Map[String, Options] = Map.empty): LocalStorm = - new LocalStorm(options, identity) + new LocalStorm(options, identity, List()) def remote(options: Map[String, Options] = Map.empty): RemoteStorm = - new RemoteStorm(options, identity) + new RemoteStorm(options, identity, List()) def store[K, V](store: => MergeableStore[(K, BatchID), V])(implicit batcher: Batcher): MergeableStoreSupplier[K, V] = MergeableStoreSupplier.from(store) @@ -90,7 +90,7 @@ object Storm { implicit def spoutAsSource[T](spout: Spout[T])(implicit timeOf: TimeExtractor[T]): Producer[Storm, T] = source(spout, None)(timeOf) } -abstract class Storm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig) extends Platform[Storm] { +abstract class Storm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig, passedRegistrars: List[IKryoRegistrar]) extends Platform[Storm] { @transient private val logger = LoggerFactory.getLogger(classOf[Storm]) type Source[+T] = StormSource[T] @@ -238,23 +238,23 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird config.setMaxSpoutPending(1000) config.setNumAckers(12) config.setNumWorkers(12) - val kryoConfig = new JavaMapConfig(config) - ConfiguredInstantiator.setSerialized( - kryoConfig, - classOf[ScalaKryoInstantiator], - new ScalaKryoInstantiator() - ) - val state = transformConfig(StormConfig(config)) + + val stormConfig = SBChillRegistrar(StormConfig(config), passedRegistrars) + + + val transformedConfig = transformConfig(stormConfig) logger.debug("User config changes:") - logger.debug("Removes: {}", state.removes) - logger.debug("Updates: {}", state.updates) + logger.debug("Removes: {}", transformedConfig.removes) + logger.debug("Updates: {}", transformedConfig.updates) - state.removes.foreach(config.remove(_)) - state.updates.foreach(kv => config.put(kv._1, kv._2)) + transformedConfig.removes.foreach(config.remove(_)) + transformedConfig.updates.foreach(kv => config.put(kv._1, kv._2)) config } + def withRegistrars(registrars: List[IKryoRegistrar]): Storm + def withConfigUpdater(fn: SummingbirdConfig => SummingbirdConfig): Storm def plan[T](tail: TailProducer[Storm, T]): StormTopology = { @@ -276,26 +276,30 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird def run(topology: StormTopology, jobName: String): Unit } -class RemoteStorm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig) extends Storm(options, transformConfig) { +class RemoteStorm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig, passedRegistrars: List[IKryoRegistrar]) extends Storm(options, transformConfig, passedRegistrars) { override def withConfigUpdater(fn: SummingbirdConfig => SummingbirdConfig) = - new RemoteStorm(options, transformConfig.andThen(fn)) + new RemoteStorm(options, transformConfig.andThen(fn), passedRegistrars) override def run(topology: StormTopology, jobName: String): Unit = { val topologyName = "summingbird_" + jobName StormSubmitter.submitTopology(topologyName, baseConfig, topology) } + + override def withRegistrars(registrars: List[IKryoRegistrar]) = new RemoteStorm(options, transformConfig, passedRegistrars ++ registrars) } -class LocalStorm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig) - extends Storm(options, transformConfig) { +class LocalStorm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig, passedRegistrars: List[IKryoRegistrar]) + extends Storm(options, transformConfig, passedRegistrars) { lazy val localCluster = new LocalCluster override def withConfigUpdater(fn: SummingbirdConfig => SummingbirdConfig) = - new LocalStorm(options, transformConfig.andThen(fn)) + new LocalStorm(options, transformConfig.andThen(fn), passedRegistrars) override def run(topology: StormTopology, jobName: String): Unit = { val topologyName = "summingbird_" + jobName localCluster.submitTopology(topologyName, baseConfig, topology) } + + override def withRegistrars(registrars: List[IKryoRegistrar]) = new LocalStorm(options, transformConfig, passedRegistrars ++ registrars) } From b658573b4ccc3607aa07ade669e9416c7e195858 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 28 Oct 2013 12:15:01 -0700 Subject: [PATCH 03/12] Simplify the scalding env, make scalding env/platform configuration closer to storm --- project/Build.scala | 1 + .../summingbird/scalding/ScaldingEnv.scala | 76 +++++++------------ .../twitter/summingbird/storm/StormEnv.scala | 2 +- .../summingbird/scalding/ScaldingConfig.scala | 35 +++++++++ .../scalding/ScaldingPlatform.scala | 39 +++++++++- .../summingbird/storm/StormConfig.scala | 3 +- .../summingbird/storm/StormPlatform.scala | 12 ++- 7 files changed, 111 insertions(+), 57 deletions(-) create mode 100644 summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingConfig.scala diff --git a/project/Build.scala b/project/Build.scala index 70e99b65a..61517aed3 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -225,6 +225,7 @@ object SummingbirdBuild extends Build { ) ).dependsOn( summingbirdCore % "test->test;compile->compile", + summingbirdChill, summingbirdBatch ) diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala index 1fa081bdf..2c0aa3974 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala @@ -27,6 +27,7 @@ import com.twitter.summingbird.{ Env, Unzip2, Summer, Producer, TailProducer, Ab import com.twitter.summingbird.batch.{ BatchID, Batcher, Timestamp } import com.twitter.summingbird.builder.{ SourceBuilder, Reducers, CompletedBuilder } import com.twitter.summingbird.storm.Storm +import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.util.ToolRunner @@ -45,7 +46,7 @@ import ConfigBijection._ // the intermediate key-values for using a store as a service // in another job. // Prefer using .write in the -core API. -case class StoreIntermediateData[K, V](sink: ScaldingSink[(K,V)]) extends java.io.Serializable +case class StoreIntermediateData(sink: ScaldingSink[(Any, Any)]) extends java.io.Serializable // TODO (https://github.com/twitter/summingbird/issues/69): Add // documentation later describing command-line args. start-time, @@ -87,67 +88,36 @@ case class ScaldingEnv(override val jobName: String, inargs: Array[String]) // Summingbird job. def reducers : Int = args.getOrElse("reducers","20").toInt - def run { - // Calling abstractJob's constructor and binding it to a variable - // forces any side effects caused by that constructor (building up - // of the environment and defining the builder). - val ajob = abstractJob - val scaldingBuilder = builder.asInstanceOf[CompletedBuilder[Scalding, _, _]] - val updater = { conf : Configuration => - // Parse the Hadoop args. - // TODO rethink this: https://github.com/twitter/summingbird/issues/136 - // I think we shouldn't pretend Configuration is immutable. - new GenericOptionsParser(conf, inargs) - - val jConf: JMap[String,AnyRef] = new JHashMap(fromJavaMap.invert(conf)) - val kryoConfig = new JavaMapConfig(jConf) - ConfInst.setSerialized( - kryoConfig, - classOf[ScalaKryoInstantiator], - new ScalaKryoInstantiator() - .withRegistrar(builder.registrar) - .withRegistrar(new IterableRegistrar(ajob.registrars)) - .withRegistrar(new IKryoRegistrar { - def apply(k: Kryo) { - List(classOf[BatchID], classOf[Timestamp]) - .foreach { cls => - if(!k.alreadyRegistered(cls)) k.register(cls) - } - } - }) - ) - fromMap(ajob.transformConfig(jConf.as[Map[String, AnyRef]])) - } - val jobName = - args.optional("name") - .getOrElse(ajob.getClass.getName) - run(jobName, scaldingBuilder, updater) - } - // Used to insert a write just before the store so the store // can be used as a Service - private def addDeltaWrite[K,V](snode: Summer[Scalding, K, V], - sink: ScaldingSink[(K,V)]): Summer[Scalding, K, V] = { + private def addDeltaWrite(snode: Summer[Scalding, Any, Any], + sink: ScaldingSink[(Any, Any)]): Summer[Scalding, Any, Any] = { val Summer(prod, store, monoid) = snode Summer(prod.write(sink), store, monoid) } - def run[K,V](name: String, - scaldingBuilder: CompletedBuilder[Scalding, K, V], - confud: Configuration => Configuration) { + def run { + // Calling abstractJob's constructor and binding it to a variable + // forces any side effects caused by that constructor (building up + // of the environment and defining the builder). + val ajob = abstractJob + val scaldingBuilder = builder.asInstanceOf[CompletedBuilder[Scalding, Any, Any]] + val name = args.optional("name").getOrElse(ajob.getClass.getName) + // Perform config transformations before Hadoop job submission val opts = SourceBuilder.adjust( scaldingBuilder.opts, scaldingBuilder.id)(_.set(Reducers(reducers))) + // Support for the old setting based writing - val toRun: TailProducer[Scalding, (K, (Option[V], V))] = + val toRun: TailProducer[Scalding, (Any, (Option[Any], Any))] = (for { opt <- opts.get(scaldingBuilder.id) - stid <- opt.get[StoreIntermediateData[K,V]] + stid <- opt.get[StoreIntermediateData] } yield addDeltaWrite(scaldingBuilder.node, stid.sink)) .getOrElse(scaldingBuilder.node) .name(scaldingBuilder.id) - def getStatePath[K1,V1](ss: ScaldingStore[K1, V1]): Option[String] = + def getStatePath(ss: ScaldingStore[_, _]): Option[String] = ss match { case store: VersionedBatchStore[_, _, _, _] => Some(store.rootPath) case initstore: InitialBatchedStore[_, _] => getStatePath(initstore.proxy) @@ -157,12 +127,22 @@ case class ScaldingEnv(override val jobName: String, inargs: Array[String]) val statePath = getStatePath(scaldingBuilder.node.store).getOrElse { sys.error("You must use a VersionedBatchStore with the old Summingbird API!") } - val conf = confud(new Configuration) + + val conf = new Configuration + // Add the generic options + new GenericOptionsParser(conf, inargs) + // VersionedState needs this implicit val batcher = scaldingBuilder.batcher val state = VersionedState(HDFSMetadata(conf, statePath), startDate, batches) + try { - new Scalding(name, opts).run(state, Hdfs(true, conf), toRun) + Scalding(name, opts) + .withRegistrars(ajob.registrars ++ builder.registrar.getRegistrars.asScala) + .withConfigUpdater{ c => + c.updated(ajob.transformConfig(c.toMap)) + } + .run(state, Hdfs(true, conf), toRun) } catch { case f@FlowPlanException(errs) => diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala index 47ab295cb..38865f02d 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala @@ -25,7 +25,7 @@ import com.twitter.summingbird.{ Env, Unzip2, Producer, TailProducer } import com.twitter.summingbird.batch.{ BatchID, Timestamp } import com.twitter.summingbird.scalding.Scalding import scala.collection.JavaConverters._ -import com.twitter.summingbird.{MutableStringConfig, StormConfig} +import com.twitter.summingbird.MutableStringConfig /** * Storm-specific extension to Env. StormEnv handles storm-specific configuration diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingConfig.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingConfig.scala new file mode 100644 index 000000000..b709692a5 --- /dev/null +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingConfig.scala @@ -0,0 +1,35 @@ +/* + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package com.twitter.summingbird.scalding + +import scala.collection.JavaConverters._ +import org.apache.hadoop.conf.Configuration +import com.twitter.summingbird.{WrappingConfig, ReadableMap} + +object ScaldingConfig { + def apply(backingConfig: Configuration) = WrappingConfig(new WrappedHadoopConfig(backingConfig)) +} + +class WrappedHadoopConfig(backingConfig: Configuration) extends ReadableMap { + def get(key: String): Option[AnyRef] = + if(keys.contains(key)) + Some(backingConfig.get(key)) + else + None + + val keys: Set[String] = backingConfig.iterator.asScala.map(_.getKey).toSet +} diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala index 0a2f7266f..d2e1e68dc 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala @@ -25,6 +25,8 @@ import com.twitter.scalding.{ Tool => STool, Source => SSource, TimePathedSource import com.twitter.summingbird._ import com.twitter.summingbird.scalding.option.{ FlatMapShards, Reducers } import com.twitter.summingbird.batch._ +import com.twitter.chill.IKryoRegistrar +import com.twitter.summingbird.chill._ import com.twitter.summingbird.option._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -43,6 +45,11 @@ import org.slf4j.LoggerFactory object Scalding { @transient private val logger = LoggerFactory.getLogger(classOf[Scalding]) + + def apply(jobName: String, options: Map[String, Options] = Map.empty) = { + new Scalding(jobName, options, identity, List()) + } + implicit val dateRangeInjection: Injection[DateRange, Interval[Time]] = Injection.build { (dr: DateRange) => { val DateRange(l, u) = dr @@ -494,7 +501,10 @@ case class WriteStepsDot(filename: String) class Scalding( jobName: String, - @transient options: Map[String, Options] = Map.empty) + @transient options: Map[String, Options], + @transient transformConfig: SummingbirdConfig => SummingbirdConfig, + @transient passedRegistrars: List[IKryoRegistrar] + ) extends Platform[Scalding] { type Source[T] = PipeFactory[T] @@ -512,6 +522,28 @@ class Scalding( classOf[com.twitter.chill.hadoop.KryoSerialization] ) + def withRegistrars(newRegs: List[IKryoRegistrar]) = + new Scalding(jobName, options, transformConfig, newRegs ++ passedRegistrars) + + def withConfigUpdater(fn: SummingbirdConfig => SummingbirdConfig) = + new Scalding(jobName, options, transformConfig.andThen(fn), passedRegistrars) + + def updateConfig(conf: Configuration) { + val scaldingConfig = SBChillRegistrar(ScaldingConfig(conf), passedRegistrars) + Scalding.logger.debug("Serialization config changes:") + Scalding.logger.debug("Removes: {}", scaldingConfig.removes) + Scalding.logger.debug("Updates: {}", scaldingConfig.updates) + + val transformedConfig = transformConfig(scaldingConfig) + + Scalding.logger.debug("User+Serialization config changes:") + Scalding.logger.debug("Removes: {}", transformedConfig.removes) + Scalding.logger.debug("Updates: {}", transformedConfig.updates) + + transformedConfig.removes.foreach(conf.set(_, null)) + transformedConfig.updates.foreach(kv => conf.set(kv._1, kv._2.toString)) + } + private def setIoSerializations(m: Mode): Unit = m match { case Hdfs(_, conf) => @@ -536,14 +568,15 @@ class Scalding( } def run(state: WaitingState[Interval[Timestamp]], - mode: Mode, + mode: HadoopMode, pf: TailProducer[Scalding, Any]): WaitingState[Interval[Timestamp]] = run(state, mode, plan(pf)) def run(state: WaitingState[Interval[Timestamp]], - mode: Mode, + mode: HadoopMode, pf: PipeFactory[Any]): WaitingState[Interval[Timestamp]] = { + updateConfig(mode.jobConf) setIoSerializations(mode) val prepareState = state.begin diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormConfig.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormConfig.scala index 466410074..f141982e5 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormConfig.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormConfig.scala @@ -14,10 +14,11 @@ limitations under the License. */ -package com.twitter.summingbird +package com.twitter.summingbird.storm import backtype.storm.{Config => BacktypeStormConfig} import scala.collection.JavaConverters._ +import com.twitter.summingbird.{WrappingConfig, ReadableMap} object StormConfig { def apply(backingConfig: BacktypeStormConfig) = WrappingConfig(new WrappedBacktypeStormConfig(backingConfig)) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index a82433568..a7ed9c9a5 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -240,11 +240,13 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird config.setNumWorkers(12) val stormConfig = SBChillRegistrar(StormConfig(config), passedRegistrars) - + logger.debug("Serialization config changes:") + logger.debug("Removes: {}", stormConfig.removes) + logger.debug("Updates: {}", stormConfig.updates) val transformedConfig = transformConfig(stormConfig) - logger.debug("User config changes:") + logger.debug("User+Serialization config changes:") logger.debug("Removes: {}", transformedConfig.removes) logger.debug("Updates: {}", transformedConfig.updates) @@ -286,7 +288,8 @@ class RemoteStorm(options: Map[String, Options], transformConfig: SummingbirdCon StormSubmitter.submitTopology(topologyName, baseConfig, topology) } - override def withRegistrars(registrars: List[IKryoRegistrar]) = new RemoteStorm(options, transformConfig, passedRegistrars ++ registrars) + override def withRegistrars(registrars: List[IKryoRegistrar]) = + new RemoteStorm(options, transformConfig, passedRegistrars ++ registrars) } class LocalStorm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig, passedRegistrars: List[IKryoRegistrar]) @@ -301,5 +304,6 @@ class LocalStorm(options: Map[String, Options], transformConfig: SummingbirdConf localCluster.submitTopology(topologyName, baseConfig, topology) } - override def withRegistrars(registrars: List[IKryoRegistrar]) = new LocalStorm(options, transformConfig, passedRegistrars ++ registrars) + override def withRegistrars(registrars: List[IKryoRegistrar]) = + new LocalStorm(options, transformConfig, passedRegistrars ++ registrars) } From e5d30608d0c2bba9fbbaa619884cb3bc0985e125 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 28 Oct 2013 12:27:26 -0700 Subject: [PATCH 04/12] Add some asserts, fix val not being valid, needs to be def --- .../scala/com/twitter/summingbird/chill/SBChillRegistrar.scala | 2 +- .../main/scala/com/twitter/summingbird/SummingbirdConfig.scala | 3 +++ .../scala/com/twitter/summingbird/storm/StormPlatform.scala | 3 ++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala b/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala index dfe1dc5eb..93ea58342 100644 --- a/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala +++ b/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala @@ -24,7 +24,7 @@ import com.twitter.summingbird.batch.{BatchID, Timestamp} object SBChillRegistrar { def apply(cfg: SummingbirdConfig, iterableRegistrars: List[IKryoRegistrar]): SummingbirdConfig = { val kryoConfig = new com.twitter.chill.config.Config with MutableStringConfig { - val summingbirdConfig = cfg + def summingbirdConfig = cfg } ConfInst.setSerialized( diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala index b171067b3..5a3e83e61 100644 --- a/summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala @@ -48,12 +48,15 @@ trait MutableStringConfig { protected def summingbirdConfig: SummingbirdConfig private var config = summingbirdConfig def get(key: String) = { + assert(config != null) config.get(key) match { case Some(s) => s.toString case None => null } } + def set(key: String, value: String) { + assert(config != null) config = config.put(key, value) } def unwrap = config diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index a7ed9c9a5..b26dd3674 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -239,7 +239,8 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird config.setNumAckers(12) config.setNumWorkers(12) - val stormConfig = SBChillRegistrar(StormConfig(config), passedRegistrars) + val initialStormConfig = StormConfig(config) + val stormConfig = SBChillRegistrar(initialStormConfig, passedRegistrars) logger.debug("Serialization config changes:") logger.debug("Removes: {}", stormConfig.removes) logger.debug("Updates: {}", stormConfig.updates) From 6bcefec250f6af86ed3efe7681701f8148709379 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 28 Oct 2013 15:50:11 -0700 Subject: [PATCH 05/12] Bump scalding, and move back to having a test mode too :s janky. --- project/Build.scala | 2 +- .../scalding/ScaldingPlatform.scala | 22 ++++++++++--------- .../summingbird/scalding/ScaldingLaws.scala | 14 ++++++------ 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 61517aed3..0757ffcfc 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -116,7 +116,7 @@ object SummingbirdBuild extends Build { val dfsDatastoresVersion = "1.3.4" val bijectionVersion = "0.5.4" val algebirdVersion = "0.3.0" - val scaldingVersion = "0.9.0rc1" + val scaldingVersion = "0.9.0rc4" val storehausVersion = "0.6.0" val utilVersion = "6.3.8" val chillVersion = "0.3.3" diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala index d2e1e68dc..7c7861e0f 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/ScaldingPlatform.scala @@ -544,12 +544,8 @@ class Scalding( transformedConfig.updates.foreach(kv => conf.set(kv._1, kv._2.toString)) } - private def setIoSerializations(m: Mode): Unit = - m match { - case Hdfs(_, conf) => - conf.set("io.serializations", ioSerializations.map { _.getName }.mkString(",")) - case _ => () - } + private def setIoSerializations(c: Configuration): Unit = + c.set("io.serializations", ioSerializations.map { _.getName }.mkString(",")) // This is a side-effect-free computation that is called by run def toFlow(timeSpan: Interval[Time], mode: Mode, pf: PipeFactory[_]): Try[(Interval[Time], Flow[_])] = { @@ -568,16 +564,22 @@ class Scalding( } def run(state: WaitingState[Interval[Timestamp]], - mode: HadoopMode, + mode: Mode, pf: TailProducer[Scalding, Any]): WaitingState[Interval[Timestamp]] = run(state, mode, plan(pf)) def run(state: WaitingState[Interval[Timestamp]], - mode: HadoopMode, + mode: Mode, pf: PipeFactory[Any]): WaitingState[Interval[Timestamp]] = { - updateConfig(mode.jobConf) - setIoSerializations(mode) + mode match { + case Hdfs(_, conf) => + updateConfig(conf) + setIoSerializations(conf) + case _ => + } + + val prepareState = state.begin val timeSpan = prepareState.requested.mapNonDecreasing(_.milliSinceEpoch) diff --git a/summingbird-scalding/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala b/summingbird-scalding/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala index 1885b7759..87be8bc4a 100644 --- a/summingbird-scalding/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala +++ b/summingbird-scalding/src/test/scala/com/twitter/summingbird/scalding/ScaldingLaws.scala @@ -322,7 +322,7 @@ object ScaldingLaws extends Specification { val summer = TestGraphs.singleStepJob[Scalding,(Long,Int),Int,Int](source, testStore)(t => fn(t._2)) - val scald = new Scalding("scalaCheckJob") + val scald = Scalding("scalaCheckJob") val intr = batchedCover(batcher, 0L, original.size.toLong) val ws = new LoopState(intr) val mode: Mode = TestMode(t => (testStore.sourceToBuffer ++ buffer).get(t)) @@ -350,7 +350,7 @@ object ScaldingLaws extends Specification { fnA(t._2), fnB) val intr = batchedCover(batcher, 0L, original.size.toLong) - val scald = new Scalding("scalaCheckJob") + val scald = Scalding("scalaCheckJob") val ws = new LoopState(intr) val mode: Mode = TestMode(t => (testStore.sourceToBuffer ++ buffer).get(t)) @@ -378,7 +378,7 @@ object ScaldingLaws extends Specification { val tail = TestGraphs.multipleSummerJob[Scalding, (Long, Int), Int, Int, Int, Int, Int](source, testStoreA, testStoreB)({t => fnA(t._2)}, fnB, fnC) - val scald = new Scalding("scalaCheckMultipleSumJob") + val scald = Scalding("scalaCheckMultipleSumJob") val intr = batchedCover(batcher, 0L, original.size.toLong) val ws = new LoopState(intr) val mode: Mode = TestMode(t => (testStoreA.sourceToBuffer ++ testStoreB.sourceToBuffer ++ buffer).get(t)) @@ -431,7 +431,7 @@ object ScaldingLaws extends Specification { TestGraphs.leftJoinJob[Scalding,(Long, Int),Int,Int,Int,Int](source, testService, testStore) { tup => prejoinMap(tup._2) }(postJoin) val intr = batchedCover(batcher, 0L, original.size.toLong) - val scald = new Scalding("scalaCheckleftJoinJob") + val scald = Scalding("scalaCheckleftJoinJob") val ws = new LoopState(intr) val mode: Mode = TestMode(s => (testStore.sourceToBuffer ++ buffer ++ testService.sourceToBuffer).get(s)) @@ -459,7 +459,7 @@ object ScaldingLaws extends Specification { testSink, testStore)(t => fn1(t._2))(t => fn2(t._2)) - val scald = new Scalding("scalding-diamond-Job") + val scald = Scalding("scalding-diamond-Job") val intr = batchedCover(batcher, 0L, original.size.toLong) val ws = new LoopState(intr) val mode: Mode = TestMode(s => (testStore.sourceToBuffer ++ buffer).get(s)) @@ -493,7 +493,7 @@ object ScaldingLaws extends Specification { val summer = TestGraphs .twoSumByKey[Scalding,Int,Int,Int](source.map(_._2), testStoreA, keyExpand, testStoreB) - val scald = new Scalding("scalding-diamond-Job") + val scald = Scalding("scalding-diamond-Job") val intr = batchedCover(batcher, 0L, original.size.toLong) val ws = new LoopState(intr) val mode: Mode = TestMode((testStoreA.sourceToBuffer ++ testStoreB.sourceToBuffer ++ buffer).get(_)) @@ -552,7 +552,7 @@ class ScaldingSerializationSpecs extends Specification { val mode = HadoopTest(new Configuration, {case x: ScaldingSource => buffer.get(x)}) val intr = Interval.leftClosedRightOpen(0L, inWithTime.size.toLong) - val scald = new Scalding("scalaCheckJob") + val scald = Scalding("scalaCheckJob") (try { scald.toFlow(intr, mode, scald.plan(summer)); true } catch { case t: Throwable => println(toTry(t)); false }) must beTrue From 83635170e63a5af898191190d8d829fc22579ab3 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 28 Oct 2013 16:26:37 -0700 Subject: [PATCH 06/12] Roll back the scalding upgrade so don't need to wait for sonatype to push to maven --- project/Build.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Build.scala b/project/Build.scala index 0757ffcfc..61517aed3 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -116,7 +116,7 @@ object SummingbirdBuild extends Build { val dfsDatastoresVersion = "1.3.4" val bijectionVersion = "0.5.4" val algebirdVersion = "0.3.0" - val scaldingVersion = "0.9.0rc4" + val scaldingVersion = "0.9.0rc1" val storehausVersion = "0.6.0" val utilVersion = "6.3.8" val chillVersion = "0.3.3" From feb14a99cb09067eec6b55a5fbfe638acddf33b6 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 29 Oct 2013 18:02:18 -0700 Subject: [PATCH 07/12] Add the dot graphs into the configs --- .../summingbird/storm/StormPlatform.scala | 38 ++++++++++++------- .../twitter/summingbird/storm/StormLaws.scala | 16 ++++---- .../summingbird/storm/TopologyTests.scala | 20 +++------- 3 files changed, 37 insertions(+), 37 deletions(-) diff --git a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala index e66a10ccb..93cb66ad8 100644 --- a/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala +++ b/summingbird-storm/src/main/scala/com/twitter/summingbird/storm/StormPlatform.scala @@ -23,11 +23,13 @@ import backtype.storm.topology.{BoltDeclarer, TopologyBuilder} import backtype.storm.tuple.Fields import backtype.storm.tuple.Tuple +import com.twitter.bijection.{Base64String, Injection} import com.twitter.algebird.Monoid import com.twitter.chill.IKryoRegistrar import com.twitter.storehaus.algebra.MergeableStore import com.twitter.storehaus.algebra.MergeableStore.enrich import com.twitter.summingbird._ +import com.twitter.summingbird.viz.VizGraph import com.twitter.summingbird.chill._ import com.twitter.summingbird.batch.{BatchID, Batcher} import com.twitter.summingbird.storm.option.{AnchorTuples, IncludeSuccessHandler} @@ -90,6 +92,8 @@ object Storm { implicit def spoutAsSource[T](spout: Spout[T])(implicit timeOf: TimeExtractor[T]): Producer[Storm, T] = source(spout, None)(timeOf) } +case class PlannedTopology(config: BacktypeStormConfig, topology: StormTopology) + abstract class Storm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig, passedRegistrars: List[IKryoRegistrar]) extends Platform[Storm] { @transient private val logger = LoggerFactory.getLogger(classOf[Storm]) @@ -97,7 +101,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird type Store[-K, V] = StormStore[K, V] type Sink[-T] = () => (T => Future[Unit]) type Service[-K, +V] = StormService[K, V] - type Plan[T] = StormTopology + type Plan[T] = PlannedTopology private type Prod[T] = Producer[Storm, T] @@ -235,7 +239,8 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird /** * Base storm config instances used by the Storm platform. */ - def baseConfig = { + + def genConfig(dag: Dag[Storm]) = { val config = new BacktypeStormConfig config.setFallBackOnJavaSerialization(false) config.setKryoFactory(classOf[com.twitter.chill.storm.BlizzardKryoFactory]) @@ -249,9 +254,14 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird logger.debug("Removes: {}", stormConfig.removes) logger.debug("Updates: {}", stormConfig.updates) - val transformedConfig = transformConfig(stormConfig) - logger.debug("User+Serialization config changes:") + val inj = Injection.connect[String, Array[Byte], Base64String] + logger.debug("Adding serialized copy of graphs") + val withViz = stormConfig.put("producer_dot_graph_base64", inj.apply(VizGraph(dag.tail)).str) + .put("planned_dot_graph_base64", inj.apply(VizGraph(dag)).str) + val transformedConfig = transformConfig(withViz) + + logger.debug("Config diff to be applied:") logger.debug("Removes: {}", transformedConfig.removes) logger.debug("Updates: {}", transformedConfig.updates) @@ -264,11 +274,11 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird def withConfigUpdater(fn: SummingbirdConfig => SummingbirdConfig): Storm - def plan[T](tail: TailProducer[Storm, T]): StormTopology = { + def plan[T](tail: TailProducer[Storm, T]): PlannedTopology = { + val stormDag = OnlinePlan(tail) implicit val topologyBuilder = new TopologyBuilder - implicit val config = baseConfig + implicit val config = genConfig(stormDag) - val stormDag = OnlinePlan(tail) stormDag.nodes.foreach { node => node match { @@ -277,10 +287,10 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird case _: SourceNode[_] => scheduleSpout(stormDag, node) } } - topologyBuilder.createTopology + PlannedTopology(config, topologyBuilder.createTopology) } - def run(summer: TailProducer[Storm, _], jobName: String): Unit = run(plan(summer), jobName) - def run(topology: StormTopology, jobName: String): Unit + def run(tail: TailProducer[Storm, _], jobName: String): Unit = run(plan(tail), jobName) + def run(plannedTopology: PlannedTopology, jobName: String): Unit } class RemoteStorm(options: Map[String, Options], transformConfig: SummingbirdConfig => SummingbirdConfig, passedRegistrars: List[IKryoRegistrar]) extends Storm(options, transformConfig, passedRegistrars) { @@ -288,9 +298,9 @@ class RemoteStorm(options: Map[String, Options], transformConfig: SummingbirdCon override def withConfigUpdater(fn: SummingbirdConfig => SummingbirdConfig) = new RemoteStorm(options, transformConfig.andThen(fn), passedRegistrars) - override def run(topology: StormTopology, jobName: String): Unit = { + override def run(plannedTopology: PlannedTopology, jobName: String): Unit = { val topologyName = "summingbird_" + jobName - StormSubmitter.submitTopology(topologyName, baseConfig, topology) + StormSubmitter.submitTopology(topologyName, plannedTopology.config, plannedTopology.topology) } override def withRegistrars(registrars: List[IKryoRegistrar]) = @@ -304,9 +314,9 @@ class LocalStorm(options: Map[String, Options], transformConfig: SummingbirdConf override def withConfigUpdater(fn: SummingbirdConfig => SummingbirdConfig) = new LocalStorm(options, transformConfig.andThen(fn), passedRegistrars) - override def run(topology: StormTopology, jobName: String): Unit = { + override def run(plannedTopology: PlannedTopology, jobName: String): Unit = { val topologyName = "summingbird_" + jobName - localCluster.submitTopology(topologyName, baseConfig, topology) + localCluster.submitTopology(topologyName, plannedTopology.config, plannedTopology.topology) } override def withRegistrars(registrars: List[IKryoRegistrar]) = diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala index 751a4e56a..85899f099 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala +++ b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala @@ -16,7 +16,7 @@ package com.twitter.summingbird.storm -import backtype.storm.{ LocalCluster, Testing } +import backtype.storm.{ Config => BacktypeStormConfig, LocalCluster, Testing } import backtype.storm.generated.StormTopology import backtype.storm.testing.{ CompleteTopologyParam, MockedSources } import com.twitter.algebird.{MapAlgebra, Semigroup} @@ -77,21 +77,21 @@ object TrueGlobalState { */ object StormRunner { - private val completeTopologyParam = { + private def completeTopologyParam(conf: BacktypeStormConfig) = { val ret = new CompleteTopologyParam() ret.setMockedSources(new MockedSources) - ret.setStormConf(Storm.local().baseConfig) + ret.setStormConf(conf) ret.setCleanupState(false) ret } - private def tryRun(topo: StormTopology): Unit = { + private def tryRun(plannedTopology: PlannedTopology): Unit = { //Before running the external Command val oldSecManager = System.getSecurityManager() System.setSecurityManager(new MySecurityManager()); try { val cluster = new LocalCluster() - Testing.completeTopology(cluster, topo, completeTopologyParam) + Testing.completeTopology(cluster, plannedTopology.topology, completeTopologyParam(plannedTopology.config)) // Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667 Thread.sleep(1000) cluster.shutdown @@ -100,14 +100,14 @@ object StormRunner { } } - def run(topo: StormTopology) { + def run(plannedTopology: PlannedTopology) { this.synchronized { try { - tryRun(topo) + tryRun(plannedTopology) } catch { case _: Throwable => Thread.sleep(3000) - tryRun(topo) + tryRun(plannedTopology) } } } diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala index 2ed05788c..182a341c0 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala +++ b/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala @@ -16,9 +16,7 @@ package com.twitter.summingbird.storm -import backtype.storm.{ LocalCluster, Testing } import backtype.storm.generated.StormTopology -import backtype.storm.testing.{ CompleteTopologyParam, MockedSources } import com.twitter.algebird.{MapAlgebra, Semigroup} import com.twitter.storehaus.{ ReadableStore, JMapStore } import com.twitter.storehaus.algebra.MergeableStore @@ -105,14 +103,6 @@ object TopologyTests extends Specification { val storm = Storm.local() - val completeTopologyParam = { - val ret = new CompleteTopologyParam() - ret.setMockedSources(new MockedSources) - ret.setStormConf(storm.baseConfig) - ret.setCleanupState(false) - ret - } - def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get /** @@ -130,7 +120,7 @@ object TopologyTests extends Specification { MergeableStoreSupplier(() => testingStore(id), Batcher.unit) ) - storm.plan(job) + storm.plan(job).topology } "Number of bolts should be as expected" in { @@ -159,7 +149,7 @@ object TopologyTests extends Specification { val opts = Map(nodeName -> Options().set(FlatMapParallelism(50))) val storm = Storm.local(opts) - val stormTopo = storm.plan(p) + val stormTopo = storm.plan(p).topology // Source producer val bolts = stormTopo.get_bolts @@ -180,7 +170,7 @@ object TopologyTests extends Specification { nodeName -> Options().set(FlatMapParallelism(50))) val storm = Storm.local(opts) - val stormTopo = storm.plan(p) + val stormTopo = storm.plan(p).topology // Source producer val bolts = stormTopo.get_bolts @@ -200,7 +190,7 @@ object TopologyTests extends Specification { val opts = Map(otherNodeName -> Options().set(SpoutParallelism(30)), nodeName -> Options().set(FlatMapParallelism(50))) val storm = Storm.local(opts) - val stormTopo = storm.plan(p) + val stormTopo = storm.plan(p).topology // Source producer val bolts = stormTopo.get_bolts @@ -218,7 +208,7 @@ object TopologyTests extends Specification { val opts = Map(nodeName -> Options().set(FlatMapParallelism(50)).set(SpoutParallelism(30))) val storm = Storm.local(opts) - val stormTopo = storm.plan(p) + val stormTopo = storm.plan(p).topology // Source producer val bolts = stormTopo.get_bolts val spouts = stormTopo.get_spouts From ac8e2a51f8fb02a63480ee9e30f23bced5724cca Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 31 Oct 2013 09:16:01 -0700 Subject: [PATCH 08/12] Add types back onto StoreIntermediateData --- .../scala/com/twitter/summingbird/scalding/ScaldingEnv.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala index 2c0aa3974..f45186f7a 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala @@ -46,7 +46,7 @@ import ConfigBijection._ // the intermediate key-values for using a store as a service // in another job. // Prefer using .write in the -core API. -case class StoreIntermediateData(sink: ScaldingSink[(Any, Any)]) extends java.io.Serializable +case class StoreIntermediateData[K, V](sink: ScaldingSink[(K,V)]) extends java.io.Serializable // TODO (https://github.com/twitter/summingbird/issues/69): Add // documentation later describing command-line args. start-time, @@ -112,7 +112,7 @@ case class ScaldingEnv(override val jobName: String, inargs: Array[String]) val toRun: TailProducer[Scalding, (Any, (Option[Any], Any))] = (for { opt <- opts.get(scaldingBuilder.id) - stid <- opt.get[StoreIntermediateData] + stid <- opt.get[StoreIntermediateData[Any, Any]] } yield addDeltaWrite(scaldingBuilder.node, stid.sink)) .getOrElse(scaldingBuilder.node) .name(scaldingBuilder.id) From 693349ad13a583578e253498287dbb3fa0bf773a Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 31 Oct 2013 09:25:43 -0700 Subject: [PATCH 09/12] Cleanup/minify the imports in the ScaldingEnv/StormEnv --- .../twitter/summingbird/scalding/ScaldingEnv.scala | 14 +++----------- .../com/twitter/summingbird/storm/StormEnv.scala | 9 +-------- 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala index f45186f7a..2db8115ec 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/scalding/ScaldingEnv.scala @@ -16,25 +16,17 @@ limitations under the License. package com.twitter.summingbird.scalding -import com.twitter.bijection.Conversion.asMethod -import com.twitter.chill.ScalaKryoInstantiator -import com.twitter.chill.java.IterableRegistrar -import com.twitter.chill.{Kryo, IKryoRegistrar, toRich } -import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst, JavaMapConfig } -import com.twitter.scalding.{ Tool => STool, _ } +import com.twitter.scalding.{Args, Hdfs, RichDate, DateParser} import com.twitter.summingbird.scalding.store.HDFSMetadata -import com.twitter.summingbird.{ Env, Unzip2, Summer, Producer, TailProducer, AbstractJob } +import com.twitter.summingbird.{ Env, Summer, TailProducer, AbstractJob } import com.twitter.summingbird.batch.{ BatchID, Batcher, Timestamp } import com.twitter.summingbird.builder.{ SourceBuilder, Reducers, CompletedBuilder } -import com.twitter.summingbird.storm.Storm import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.util.ToolRunner import org.apache.hadoop.util.GenericOptionsParser -import java.util.{ HashMap => JHashMap, Map => JMap, TimeZone } +import java.util.TimeZone -import ConfigBijection._ /** * @author Oscar Boykin diff --git a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala index 38865f02d..0e267a95a 100644 --- a/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala +++ b/summingbird-builder/src/main/scala/com/twitter/summingbird/storm/StormEnv.scala @@ -16,16 +16,9 @@ limitations under the License. package com.twitter.summingbird.storm -import backtype.storm.Config import com.twitter.scalding.Args -import com.twitter.chill.{ScalaKryoInstantiator, Kryo, toRich, IKryoRegistrar} -import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst, JavaMapConfig } -import com.twitter.chill.java.IterableRegistrar -import com.twitter.summingbird.{ Env, Unzip2, Producer, TailProducer } -import com.twitter.summingbird.batch.{ BatchID, Timestamp } -import com.twitter.summingbird.scalding.Scalding +import com.twitter.summingbird.{ Env, TailProducer } import scala.collection.JavaConverters._ -import com.twitter.summingbird.MutableStringConfig /** * Storm-specific extension to Env. StormEnv handles storm-specific configuration From 401c675395d20e09a12f150190c255ee643ba57b Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 31 Oct 2013 09:35:06 -0700 Subject: [PATCH 10/12] Split and pretty up the registering of classes in the registrar --- .../summingbird/chill/SBChillRegistrar.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala b/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala index 93ea58342..69dba2786 100644 --- a/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala +++ b/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala @@ -22,6 +22,16 @@ import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst } import com.twitter.summingbird.batch.{BatchID, Timestamp} object SBChillRegistrar { + implicit def funcToIKryoReg(f: Function[Kryo, Unit]): IKryoRegistrar = new IKryoRegistrar { + def apply(k: Kryo) = f(k) + } + def kryoRegClass(clazz: Class[_]*) = + {k: Kryo => + clazz + .filter(k.alreadyRegistered(_)) + .foreach(k.register(_)) + } + def apply(cfg: SummingbirdConfig, iterableRegistrars: List[IKryoRegistrar]): SummingbirdConfig = { val kryoConfig = new com.twitter.chill.config.Config with MutableStringConfig { def summingbirdConfig = cfg @@ -32,14 +42,7 @@ object SBChillRegistrar { classOf[ScalaKryoInstantiator], new ScalaKryoInstantiator() .withRegistrar(new IterableRegistrar(iterableRegistrars)) - .withRegistrar(new IKryoRegistrar { - def apply(k: Kryo) { - List(classOf[BatchID], classOf[Timestamp]) - .foreach { cls => - if(!k.alreadyRegistered(cls)) k.register(cls) - } - } - }) + .withRegistrar(kryoRegClass(classOf[BatchID], classOf[Timestamp])) ) kryoConfig.unwrap } From 35c720f50f99eb4758db7ce91bf422999ad5feec Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 31 Oct 2013 09:42:12 -0700 Subject: [PATCH 11/12] Add a doc string --- .../scala/com/twitter/summingbird/SummingbirdConfig.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala index 5a3e83e61..9f6a14fa2 100644 --- a/summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/SummingbirdConfig.scala @@ -62,6 +62,10 @@ trait MutableStringConfig { def unwrap = config } +/* + * The ReadableMap is the trait that must be implemented on the actual underlying config for the WrappingConfig. + * That is one of these should exist for an Hadoop Configuration, Storm Configuration, etc.. + */ trait ReadableMap { def get(key: String): Option[AnyRef] def keys: Set[String] From 5b5d4e135e0f14bb5b8164ff7166cd4888a3684c Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 4 Nov 2013 10:09:37 -0800 Subject: [PATCH 12/12] Use the already existing implicit for functions in chill --- .../com/twitter/summingbird/chill/SBChillRegistrar.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala b/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala index 69dba2786..bfe5b95ab 100644 --- a/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala +++ b/summingbird-chill/src/main/scala/com/twitter/summingbird/chill/SBChillRegistrar.scala @@ -18,13 +18,11 @@ package com.twitter.summingbird.chill import com.twitter.summingbird.{MutableStringConfig, SummingbirdConfig} import com.twitter.chill.{ScalaKryoInstantiator, IKryoRegistrar, Kryo, toRich} import com.twitter.chill.java.IterableRegistrar +import com.twitter.chill._ import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst } import com.twitter.summingbird.batch.{BatchID, Timestamp} object SBChillRegistrar { - implicit def funcToIKryoReg(f: Function[Kryo, Unit]): IKryoRegistrar = new IKryoRegistrar { - def apply(k: Kryo) = f(k) - } def kryoRegClass(clazz: Class[_]*) = {k: Kryo => clazz