Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Feature/summingbird config #339

Merged
merged 14 commits into from
Nov 4, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ object SummingbirdBuild extends Build {
summingbirdStorm,
summingbirdScalding,
summingbirdBuilder,
summingbirdChill,
summingbirdExample
)

Expand Down Expand Up @@ -155,6 +156,15 @@ object SummingbirdBuild extends Build {
)
)

lazy val summingbirdChill = module("chill").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "chill" % chillVersion
)
).dependsOn(
summingbirdCore,
summingbirdBatch
)

lazy val summingbirdClient = module("client").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "algebird-core" % algebirdVersion,
Expand Down Expand Up @@ -200,6 +210,7 @@ object SummingbirdBuild extends Build {
).dependsOn(
summingbirdCore % "test->test;compile->compile",
summingbirdOnline,
summingbirdChill,
summingbirdBatch
)

Expand All @@ -220,6 +231,7 @@ object SummingbirdBuild extends Build {
)
).dependsOn(
summingbirdCore % "test->test;compile->compile",
summingbirdChill,
summingbirdBatch
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,17 @@ limitations under the License.

package com.twitter.summingbird.scalding

import com.twitter.bijection.Conversion.asMethod
import com.twitter.chill.ScalaKryoInstantiator
import com.twitter.chill.java.IterableRegistrar
import com.twitter.chill.{Kryo, IKryoRegistrar, toRich }
import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst, JavaMapConfig }
import com.twitter.scalding.{ Tool => STool, _ }
import com.twitter.scalding.{Args, Hdfs, RichDate, DateParser}
import com.twitter.summingbird.scalding.store.HDFSMetadata
import com.twitter.summingbird.{ Env, Unzip2, Summer, Producer, TailProducer, AbstractJob }
import com.twitter.summingbird.{ Env, Summer, TailProducer, AbstractJob }
import com.twitter.summingbird.batch.{ BatchID, Batcher, Timestamp }
import com.twitter.summingbird.builder.{ SourceBuilder, Reducers, CompletedBuilder }
import com.twitter.summingbird.storm.Storm
import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.util.ToolRunner
import org.apache.hadoop.util.GenericOptionsParser
import java.util.{ HashMap => JHashMap, Map => JMap, TimeZone }
import java.util.TimeZone

import ConfigBijection._

/**
* @author Oscar Boykin
Expand Down Expand Up @@ -87,67 +80,36 @@ case class ScaldingEnv(override val jobName: String, inargs: Array[String])
// Summingbird job.
def reducers : Int = args.getOrElse("reducers","20").toInt

def run {
// Calling abstractJob's constructor and binding it to a variable
// forces any side effects caused by that constructor (building up
// of the environment and defining the builder).
val ajob = abstractJob
val scaldingBuilder = builder.asInstanceOf[CompletedBuilder[Scalding, _, _]]
val updater = { conf : Configuration =>
// Parse the Hadoop args.
// TODO rethink this: https://github.com/twitter/summingbird/issues/136
// I think we shouldn't pretend Configuration is immutable.
new GenericOptionsParser(conf, inargs)

val jConf: JMap[String,AnyRef] = new JHashMap(fromJavaMap.invert(conf))
val kryoConfig = new JavaMapConfig(jConf)
ConfInst.setSerialized(
kryoConfig,
classOf[ScalaKryoInstantiator],
new ScalaKryoInstantiator()
.withRegistrar(builder.registrar)
.withRegistrar(new IterableRegistrar(ajob.registrars))
.withRegistrar(new IKryoRegistrar {
def apply(k: Kryo) {
List(classOf[BatchID], classOf[Timestamp])
.foreach { cls =>
if(!k.alreadyRegistered(cls)) k.register(cls)
}
}
})
)
fromMap(ajob.transformConfig(jConf.as[Map[String, AnyRef]]))
}
val jobName =
args.optional("name")
.getOrElse(ajob.getClass.getName)
run(jobName, scaldingBuilder, updater)
}

// Used to insert a write just before the store so the store
// can be used as a Service
private def addDeltaWrite[K,V](snode: Summer[Scalding, K, V],
sink: ScaldingSink[(K,V)]): Summer[Scalding, K, V] = {
private def addDeltaWrite(snode: Summer[Scalding, Any, Any],
sink: ScaldingSink[(Any, Any)]): Summer[Scalding, Any, Any] = {
val Summer(prod, store, monoid) = snode
Summer(prod.write(sink), store, monoid)
}

def run[K,V](name: String,
scaldingBuilder: CompletedBuilder[Scalding, K, V],
confud: Configuration => Configuration) {
def run {
// Calling abstractJob's constructor and binding it to a variable
// forces any side effects caused by that constructor (building up
// of the environment and defining the builder).
val ajob = abstractJob
val scaldingBuilder = builder.asInstanceOf[CompletedBuilder[Scalding, Any, Any]]
val name = args.optional("name").getOrElse(ajob.getClass.getName)

// Perform config transformations before Hadoop job submission
val opts = SourceBuilder.adjust(
scaldingBuilder.opts, scaldingBuilder.id)(_.set(Reducers(reducers)))

// Support for the old setting based writing
val toRun: TailProducer[Scalding, (K, (Option[V], V))] =
val toRun: TailProducer[Scalding, (Any, (Option[Any], Any))] =
(for {
opt <- opts.get(scaldingBuilder.id)
stid <- opt.get[StoreIntermediateData[K,V]]
stid <- opt.get[StoreIntermediateData[Any, Any]]
} yield addDeltaWrite(scaldingBuilder.node, stid.sink))
.getOrElse(scaldingBuilder.node)
.name(scaldingBuilder.id)

def getStatePath[K1,V1](ss: ScaldingStore[K1, V1]): Option[String] =
def getStatePath(ss: ScaldingStore[_, _]): Option[String] =
ss match {
case store: VersionedBatchStore[_, _, _, _] => Some(store.rootPath)
case initstore: InitialBatchedStore[_, _] => getStatePath(initstore.proxy)
Expand All @@ -157,12 +119,22 @@ case class ScaldingEnv(override val jobName: String, inargs: Array[String])
val statePath = getStatePath(scaldingBuilder.node.store).getOrElse {
sys.error("You must use a VersionedBatchStore with the old Summingbird API!")
}
val conf = confud(new Configuration)

val conf = new Configuration
// Add the generic options
new GenericOptionsParser(conf, inargs)

// VersionedState needs this
implicit val batcher = scaldingBuilder.batcher
val state = VersionedState(HDFSMetadata(conf, statePath), startDate, batches)

try {
new Scalding(name, opts).run(state, Hdfs(true, conf), toRun)
Scalding(name, opts)
.withRegistrars(ajob.registrars ++ builder.registrar.getRegistrars.asScala)
.withConfigUpdater{ c =>
c.updated(ajob.transformConfig(c.toMap))
}
.run(state, Hdfs(true, conf), toRun)
}
catch {
case f@FlowPlanException(errs) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,8 @@ limitations under the License.

package com.twitter.summingbird.storm

import backtype.storm.Config
import com.twitter.scalding.Args
import com.twitter.chill.{ScalaKryoInstantiator, Kryo, toRich, IKryoRegistrar}
import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst, JavaMapConfig }
import com.twitter.chill.java.IterableRegistrar
import com.twitter.summingbird.{ Env, Unzip2, Producer, TailProducer }
import com.twitter.summingbird.batch.{ BatchID, Timestamp }
import com.twitter.summingbird.scalding.Scalding
import com.twitter.summingbird.{ Env, TailProducer }
import scala.collection.JavaConverters._

/**
Expand All @@ -48,26 +42,9 @@ case class StormEnv(override val jobName: String, override val args: Args)
.getOrElse(jobName.split("\\.").last)

Storm.remote(builder.opts)
.withConfigUpdater { config =>
val c = ConfigBijection.invert(config)
val transformed = ConfigBijection(ajob.transformConfig(c))
val kryoConfig = new JavaMapConfig(transformed)
ConfInst.setSerialized(
kryoConfig,
classOf[ScalaKryoInstantiator],
new ScalaKryoInstantiator()
.withRegistrar(builder.registrar)
.withRegistrar(new IterableRegistrar(ajob.registrars))
.withRegistrar(new IKryoRegistrar {
def apply(k: Kryo) {
List(classOf[BatchID], classOf[Timestamp])
.foreach { cls =>
if(!k.alreadyRegistered(cls)) k.register(cls)
}
}
})
)
transformed
.withRegistrars(ajob.registrars ++ builder.registrar.getRegistrars.asScala)
.withConfigUpdater { c =>
c.updated(ajob.transformConfig(c.toMap))
}.run(
builder.node.name(builder.id).asInstanceOf[TailProducer[Storm, _]],
classSuffix
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2013 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird.chill
import com.twitter.summingbird.{MutableStringConfig, SummingbirdConfig}
import com.twitter.chill.{ScalaKryoInstantiator, IKryoRegistrar, Kryo, toRich}
import com.twitter.chill.java.IterableRegistrar
import com.twitter.chill._
import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst }
import com.twitter.summingbird.batch.{BatchID, Timestamp}

object SBChillRegistrar {
def kryoRegClass(clazz: Class[_]*) =
{k: Kryo =>
clazz
.filter(k.alreadyRegistered(_))
.foreach(k.register(_))
}

def apply(cfg: SummingbirdConfig, iterableRegistrars: List[IKryoRegistrar]): SummingbirdConfig = {
val kryoConfig = new com.twitter.chill.config.Config with MutableStringConfig {
def summingbirdConfig = cfg
}

ConfInst.setSerialized(
kryoConfig,
classOf[ScalaKryoInstantiator],
new ScalaKryoInstantiator()
.withRegistrar(new IterableRegistrar(iterableRegistrars))
.withRegistrar(kryoRegClass(classOf[BatchID], classOf[Timestamp]))
)
kryoConfig.unwrap
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright 2013 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird

trait SummingbirdConfig { self =>
def get(key: String): Option[AnyRef]
def put(key: String, v: AnyRef): SummingbirdConfig
final def +(kv: (String, AnyRef)) = put(kv._1, kv._2)
final def -(k: String) = remove(k)
def remove(key: String): SummingbirdConfig
def keys: Iterable[String]
def updates: Map[String, AnyRef]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this extend MapLike, and be an actual Map itself?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oscar didn't want this to be a map, something about a bunch of functionality it doesn't need/shouldn't have. Its not a million miles away from having an immutable map interface though I agree

def removes: Set[String]
def toMap: Map[String, AnyRef] = new Map[String, AnyRef] {
def get(k: String) = self.get(k)
def +[B1 >: AnyRef](kv: (String, B1)) = self.put(kv._1, kv._2.asInstanceOf[AnyRef]).toMap
def -(k: String) = self.-(k).toMap
def iterator = self.keys.iterator.map(k => (k, self.get(k).get))
}
def updated(newMap: Map[String, AnyRef]): SummingbirdConfig = {
val removedKeys: Set[String] = keys.toSet -- newMap.keys
val changedOrAddedKeys = newMap.flatMap{ case (k, v) =>
val oldVal = get(k)
if(oldVal != Some(v)) {
Some((k, v))
} else None
}
val newWithoutRemoved = removedKeys.foldLeft(self)(_.remove(_))
changedOrAddedKeys.foldLeft(newWithoutRemoved) {_ + _}
}
}

trait MutableStringConfig {
protected def summingbirdConfig: SummingbirdConfig
private var config = summingbirdConfig
def get(key: String) = {
assert(config != null)
config.get(key) match {
case Some(s) => s.toString
case None => null
}
}

def set(key: String, value: String) {
assert(config != null)
config = config.put(key, value)
}
def unwrap = config
}

/*
* The ReadableMap is the trait that must be implemented on the actual underlying config for the WrappingConfig.
* That is one of these should exist for an Hadoop Configuration, Storm Configuration, etc..
*/
trait ReadableMap {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's immutable, why call this out directly? docstring? Also, should SummingbirdConfig extend at least this trait (if not Scala's map trait)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This trait is purely for its downstream configs. Type signatures for keys are slightly different. The SummingbirdConfig is what something like the WrappingConfig implements, the wrapping config though needs access to a ReadableMap like object (Wraps an Hadoop config/storm config) to provide the underlying store. I'll add a doc string

def get(key: String): Option[AnyRef]
def keys: Set[String]
}


object WrappingConfig {
def apply(backingConfig: ReadableMap) = new WrappingConfig(
backingConfig,
Map[String, AnyRef](),
Set[String]())
}

case class WrappingConfig(private val backingConfig: ReadableMap,
updates: Map[String, AnyRef],
removes: Set[String]) extends SummingbirdConfig {

def get(key: String) = {
updates.get(key) match {
case s@Some(_) => s
case None =>
if(removes.contains(key))
None
else
backingConfig.get(key)
}
}

def put(key: String, v: AnyRef): WrappingConfig = {
assert(v != null)
this.copy(updates = (updates + (key -> v)), removes = (removes - key))
}

def remove(key: String): WrappingConfig = {
this.copy(updates = (updates - key), removes = (removes + key))
}

def keys: Iterable[String] = {
((backingConfig.keys ++ updates.keys) -- removes)
}
}
Loading