Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #339 from twitter/feature/SummingbirdConfig
Browse files Browse the repository at this point in the history
Feature/summingbird config
  • Loading branch information
johnynek committed Nov 4, 2013
2 parents 15f4909 + 5b5d4e1 commit 0f0a078
Show file tree
Hide file tree
Showing 13 changed files with 397 additions and 196 deletions.
12 changes: 12 additions & 0 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ object SummingbirdBuild extends Build {
summingbirdStorm,
summingbirdScalding,
summingbirdBuilder,
summingbirdChill,
summingbirdExample
)

Expand Down Expand Up @@ -155,6 +156,15 @@ object SummingbirdBuild extends Build {
)
)

lazy val summingbirdChill = module("chill").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "chill" % chillVersion
)
).dependsOn(
summingbirdCore,
summingbirdBatch
)

lazy val summingbirdClient = module("client").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "algebird-core" % algebirdVersion,
Expand Down Expand Up @@ -200,6 +210,7 @@ object SummingbirdBuild extends Build {
).dependsOn(
summingbirdCore % "test->test;compile->compile",
summingbirdOnline,
summingbirdChill,
summingbirdBatch
)

Expand All @@ -220,6 +231,7 @@ object SummingbirdBuild extends Build {
)
).dependsOn(
summingbirdCore % "test->test;compile->compile",
summingbirdChill,
summingbirdBatch
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,17 @@ limitations under the License.

package com.twitter.summingbird.scalding

import com.twitter.bijection.Conversion.asMethod
import com.twitter.chill.ScalaKryoInstantiator
import com.twitter.chill.java.IterableRegistrar
import com.twitter.chill.{Kryo, IKryoRegistrar, toRich }
import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst, JavaMapConfig }
import com.twitter.scalding.{ Tool => STool, _ }
import com.twitter.scalding.{Args, Hdfs, RichDate, DateParser}
import com.twitter.summingbird.scalding.store.HDFSMetadata
import com.twitter.summingbird.{ Env, Unzip2, Summer, Producer, TailProducer, AbstractJob }
import com.twitter.summingbird.{ Env, Summer, TailProducer, AbstractJob }
import com.twitter.summingbird.batch.{ BatchID, Batcher, Timestamp }
import com.twitter.summingbird.builder.{ SourceBuilder, Reducers, CompletedBuilder }
import com.twitter.summingbird.storm.Storm
import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.util.ToolRunner
import org.apache.hadoop.util.GenericOptionsParser
import java.util.{ HashMap => JHashMap, Map => JMap, TimeZone }
import java.util.TimeZone

import ConfigBijection._

/**
* @author Oscar Boykin
Expand Down Expand Up @@ -87,67 +80,36 @@ case class ScaldingEnv(override val jobName: String, inargs: Array[String])
// Summingbird job.
def reducers : Int = args.getOrElse("reducers","20").toInt

def run {
// Calling abstractJob's constructor and binding it to a variable
// forces any side effects caused by that constructor (building up
// of the environment and defining the builder).
val ajob = abstractJob
val scaldingBuilder = builder.asInstanceOf[CompletedBuilder[Scalding, _, _]]
val updater = { conf : Configuration =>
// Parse the Hadoop args.
// TODO rethink this: https://github.com/twitter/summingbird/issues/136
// I think we shouldn't pretend Configuration is immutable.
new GenericOptionsParser(conf, inargs)

val jConf: JMap[String,AnyRef] = new JHashMap(fromJavaMap.invert(conf))
val kryoConfig = new JavaMapConfig(jConf)
ConfInst.setSerialized(
kryoConfig,
classOf[ScalaKryoInstantiator],
new ScalaKryoInstantiator()
.withRegistrar(builder.registrar)
.withRegistrar(new IterableRegistrar(ajob.registrars))
.withRegistrar(new IKryoRegistrar {
def apply(k: Kryo) {
List(classOf[BatchID], classOf[Timestamp])
.foreach { cls =>
if(!k.alreadyRegistered(cls)) k.register(cls)
}
}
})
)
fromMap(ajob.transformConfig(jConf.as[Map[String, AnyRef]]))
}
val jobName =
args.optional("name")
.getOrElse(ajob.getClass.getName)
run(jobName, scaldingBuilder, updater)
}

// Used to insert a write just before the store so the store
// can be used as a Service
private def addDeltaWrite[K,V](snode: Summer[Scalding, K, V],
sink: ScaldingSink[(K,V)]): Summer[Scalding, K, V] = {
private def addDeltaWrite(snode: Summer[Scalding, Any, Any],
sink: ScaldingSink[(Any, Any)]): Summer[Scalding, Any, Any] = {
val Summer(prod, store, monoid) = snode
Summer(prod.write(sink), store, monoid)
}

def run[K,V](name: String,
scaldingBuilder: CompletedBuilder[Scalding, K, V],
confud: Configuration => Configuration) {
def run {
// Calling abstractJob's constructor and binding it to a variable
// forces any side effects caused by that constructor (building up
// of the environment and defining the builder).
val ajob = abstractJob
val scaldingBuilder = builder.asInstanceOf[CompletedBuilder[Scalding, Any, Any]]
val name = args.optional("name").getOrElse(ajob.getClass.getName)

// Perform config transformations before Hadoop job submission
val opts = SourceBuilder.adjust(
scaldingBuilder.opts, scaldingBuilder.id)(_.set(Reducers(reducers)))

// Support for the old setting based writing
val toRun: TailProducer[Scalding, (K, (Option[V], V))] =
val toRun: TailProducer[Scalding, (Any, (Option[Any], Any))] =
(for {
opt <- opts.get(scaldingBuilder.id)
stid <- opt.get[StoreIntermediateData[K,V]]
stid <- opt.get[StoreIntermediateData[Any, Any]]
} yield addDeltaWrite(scaldingBuilder.node, stid.sink))
.getOrElse(scaldingBuilder.node)
.name(scaldingBuilder.id)

def getStatePath[K1,V1](ss: ScaldingStore[K1, V1]): Option[String] =
def getStatePath(ss: ScaldingStore[_, _]): Option[String] =
ss match {
case store: VersionedBatchStore[_, _, _, _] => Some(store.rootPath)
case initstore: InitialBatchedStore[_, _] => getStatePath(initstore.proxy)
Expand All @@ -157,12 +119,22 @@ case class ScaldingEnv(override val jobName: String, inargs: Array[String])
val statePath = getStatePath(scaldingBuilder.node.store).getOrElse {
sys.error("You must use a VersionedBatchStore with the old Summingbird API!")
}
val conf = confud(new Configuration)

val conf = new Configuration
// Add the generic options
new GenericOptionsParser(conf, inargs)

// VersionedState needs this
implicit val batcher = scaldingBuilder.batcher
val state = VersionedState(HDFSMetadata(conf, statePath), startDate, batches)

try {
new Scalding(name, opts).run(state, Hdfs(true, conf), toRun)
Scalding(name, opts)
.withRegistrars(ajob.registrars ++ builder.registrar.getRegistrars.asScala)
.withConfigUpdater{ c =>
c.updated(ajob.transformConfig(c.toMap))
}
.run(state, Hdfs(true, conf), toRun)
}
catch {
case f@FlowPlanException(errs) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,8 @@ limitations under the License.

package com.twitter.summingbird.storm

import backtype.storm.Config
import com.twitter.scalding.Args
import com.twitter.chill.{ScalaKryoInstantiator, Kryo, toRich, IKryoRegistrar}
import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst, JavaMapConfig }
import com.twitter.chill.java.IterableRegistrar
import com.twitter.summingbird.{ Env, Unzip2, Producer, TailProducer }
import com.twitter.summingbird.batch.{ BatchID, Timestamp }
import com.twitter.summingbird.scalding.Scalding
import com.twitter.summingbird.{ Env, TailProducer }
import scala.collection.JavaConverters._

/**
Expand All @@ -48,26 +42,9 @@ case class StormEnv(override val jobName: String, override val args: Args)
.getOrElse(jobName.split("\\.").last)

Storm.remote(builder.opts)
.withConfigUpdater { config =>
val c = ConfigBijection.invert(config)
val transformed = ConfigBijection(ajob.transformConfig(c))
val kryoConfig = new JavaMapConfig(transformed)
ConfInst.setSerialized(
kryoConfig,
classOf[ScalaKryoInstantiator],
new ScalaKryoInstantiator()
.withRegistrar(builder.registrar)
.withRegistrar(new IterableRegistrar(ajob.registrars))
.withRegistrar(new IKryoRegistrar {
def apply(k: Kryo) {
List(classOf[BatchID], classOf[Timestamp])
.foreach { cls =>
if(!k.alreadyRegistered(cls)) k.register(cls)
}
}
})
)
transformed
.withRegistrars(ajob.registrars ++ builder.registrar.getRegistrars.asScala)
.withConfigUpdater { c =>
c.updated(ajob.transformConfig(c.toMap))
}.run(
builder.node.name(builder.id).asInstanceOf[TailProducer[Storm, _]],
classSuffix
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2013 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird.chill
import com.twitter.summingbird.{MutableStringConfig, SummingbirdConfig}
import com.twitter.chill.{ScalaKryoInstantiator, IKryoRegistrar, Kryo, toRich}
import com.twitter.chill.java.IterableRegistrar
import com.twitter.chill._
import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst }
import com.twitter.summingbird.batch.{BatchID, Timestamp}

object SBChillRegistrar {
def kryoRegClass(clazz: Class[_]*) =
{k: Kryo =>
clazz
.filter(k.alreadyRegistered(_))
.foreach(k.register(_))
}

def apply(cfg: SummingbirdConfig, iterableRegistrars: List[IKryoRegistrar]): SummingbirdConfig = {
val kryoConfig = new com.twitter.chill.config.Config with MutableStringConfig {
def summingbirdConfig = cfg
}

ConfInst.setSerialized(
kryoConfig,
classOf[ScalaKryoInstantiator],
new ScalaKryoInstantiator()
.withRegistrar(new IterableRegistrar(iterableRegistrars))
.withRegistrar(kryoRegClass(classOf[BatchID], classOf[Timestamp]))
)
kryoConfig.unwrap
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright 2013 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird

trait SummingbirdConfig { self =>
def get(key: String): Option[AnyRef]
def put(key: String, v: AnyRef): SummingbirdConfig
final def +(kv: (String, AnyRef)) = put(kv._1, kv._2)
final def -(k: String) = remove(k)
def remove(key: String): SummingbirdConfig
def keys: Iterable[String]
def updates: Map[String, AnyRef]
def removes: Set[String]
def toMap: Map[String, AnyRef] = new Map[String, AnyRef] {
def get(k: String) = self.get(k)
def +[B1 >: AnyRef](kv: (String, B1)) = self.put(kv._1, kv._2.asInstanceOf[AnyRef]).toMap
def -(k: String) = self.-(k).toMap
def iterator = self.keys.iterator.map(k => (k, self.get(k).get))
}
def updated(newMap: Map[String, AnyRef]): SummingbirdConfig = {
val removedKeys: Set[String] = keys.toSet -- newMap.keys
val changedOrAddedKeys = newMap.flatMap{ case (k, v) =>
val oldVal = get(k)
if(oldVal != Some(v)) {
Some((k, v))
} else None
}
val newWithoutRemoved = removedKeys.foldLeft(self)(_.remove(_))
changedOrAddedKeys.foldLeft(newWithoutRemoved) {_ + _}
}
}

trait MutableStringConfig {
protected def summingbirdConfig: SummingbirdConfig
private var config = summingbirdConfig
def get(key: String) = {
assert(config != null)
config.get(key) match {
case Some(s) => s.toString
case None => null
}
}

def set(key: String, value: String) {
assert(config != null)
config = config.put(key, value)
}
def unwrap = config
}

/*
* The ReadableMap is the trait that must be implemented on the actual underlying config for the WrappingConfig.
* That is one of these should exist for an Hadoop Configuration, Storm Configuration, etc..
*/
trait ReadableMap {
def get(key: String): Option[AnyRef]
def keys: Set[String]
}


object WrappingConfig {
def apply(backingConfig: ReadableMap) = new WrappingConfig(
backingConfig,
Map[String, AnyRef](),
Set[String]())
}

case class WrappingConfig(private val backingConfig: ReadableMap,
updates: Map[String, AnyRef],
removes: Set[String]) extends SummingbirdConfig {

def get(key: String) = {
updates.get(key) match {
case s@Some(_) => s
case None =>
if(removes.contains(key))
None
else
backingConfig.get(key)
}
}

def put(key: String, v: AnyRef): WrappingConfig = {
assert(v != null)
this.copy(updates = (updates + (key -> v)), removes = (removes - key))
}

def remove(key: String): WrappingConfig = {
this.copy(updates = (updates - key), removes = (removes + key))
}

def keys: Iterable[String] = {
((backingConfig.keys ++ updates.keys) -- removes)
}
}
Loading

0 comments on commit 0f0a078

Please sign in to comment.