-
Notifications
You must be signed in to change notification settings - Fork 266
Feature/summingbird config #339
Changes from 9 commits
5ffc48d
babbcf0
b658573
e5d3060
6bcefec
8363517
7af0546
feb14a9
dd0f767
ac8e2a5
693349a
401c675
35c720f
5b5d4e1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import com.twitter.summingbird.{ Env, Unzip2, Producer, TailProducer } | |
import com.twitter.summingbird.batch.{ BatchID, Timestamp } | ||
import com.twitter.summingbird.scalding.Scalding | ||
import scala.collection.JavaConverters._ | ||
import com.twitter.summingbird.MutableStringConfig | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this used? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nope, dropped, it and removed others that weren't being used either |
||
|
||
/** | ||
* Storm-specific extension to Env. StormEnv handles storm-specific configuration | ||
|
@@ -48,26 +49,9 @@ case class StormEnv(override val jobName: String, override val args: Args) | |
.getOrElse(jobName.split("\\.").last) | ||
|
||
Storm.remote(builder.opts) | ||
.withConfigUpdater { config => | ||
val c = ConfigBijection.invert(config) | ||
val transformed = ConfigBijection(ajob.transformConfig(c)) | ||
val kryoConfig = new JavaMapConfig(transformed) | ||
ConfInst.setSerialized( | ||
kryoConfig, | ||
classOf[ScalaKryoInstantiator], | ||
new ScalaKryoInstantiator() | ||
.withRegistrar(builder.registrar) | ||
.withRegistrar(new IterableRegistrar(ajob.registrars)) | ||
.withRegistrar(new IKryoRegistrar { | ||
def apply(k: Kryo) { | ||
List(classOf[BatchID], classOf[Timestamp]) | ||
.foreach { cls => | ||
if(!k.alreadyRegistered(cls)) k.register(cls) | ||
} | ||
} | ||
}) | ||
) | ||
transformed | ||
.withRegistrars(ajob.registrars ++ builder.registrar.getRegistrars.asScala) | ||
.withConfigUpdater { c => | ||
c.updated(ajob.transformConfig(c.toMap)) | ||
}.run( | ||
builder.node.name(builder.id).asInstanceOf[TailProducer[Storm, _]], | ||
classSuffix | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
Copyright 2013 Twitter, Inc. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package com.twitter.summingbird.chill | ||
import com.twitter.summingbird.{MutableStringConfig, SummingbirdConfig} | ||
import com.twitter.chill.{ScalaKryoInstantiator, IKryoRegistrar, Kryo, toRich} | ||
import com.twitter.chill.java.IterableRegistrar | ||
import com.twitter.chill.config.{ ConfiguredInstantiator => ConfInst } | ||
import com.twitter.summingbird.batch.{BatchID, Timestamp} | ||
|
||
object SBChillRegistrar { | ||
def apply(cfg: SummingbirdConfig, iterableRegistrars: List[IKryoRegistrar]): SummingbirdConfig = { | ||
val kryoConfig = new com.twitter.chill.config.Config with MutableStringConfig { | ||
def summingbirdConfig = cfg | ||
} | ||
|
||
ConfInst.setSerialized( | ||
kryoConfig, | ||
classOf[ScalaKryoInstantiator], | ||
new ScalaKryoInstantiator() | ||
.withRegistrar(new IterableRegistrar(iterableRegistrars)) | ||
.withRegistrar(new IKryoRegistrar { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It'd be nice to have an implicit conversion from a function -> IKryoRegistrar, OR just create a new arity of withRegistrar that takes the func. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reworked this with an implicit and a varargs func to make it a bit prettier/more generic |
||
def apply(k: Kryo) { | ||
List(classOf[BatchID], classOf[Timestamp]) | ||
.foreach { cls => | ||
if(!k.alreadyRegistered(cls)) k.register(cls) | ||
} | ||
} | ||
}) | ||
) | ||
kryoConfig.unwrap | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* | ||
Copyright 2013 Twitter, Inc. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package com.twitter.summingbird | ||
|
||
trait SummingbirdConfig { self => | ||
def get(key: String): Option[AnyRef] | ||
def put(key: String, v: AnyRef): SummingbirdConfig | ||
final def +(kv: (String, AnyRef)) = put(kv._1, kv._2) | ||
final def -(k: String) = remove(k) | ||
def remove(key: String): SummingbirdConfig | ||
def keys: Iterable[String] | ||
def updates: Map[String, AnyRef] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this extend MapLike, and be an actual Map itself? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oscar didn't want this to be a map, something about a bunch of functionality it doesn't need/shouldn't have. Its not a million miles away from having an immutable map interface though I agree |
||
def removes: Set[String] | ||
def toMap: Map[String, AnyRef] = new Map[String, AnyRef] { | ||
def get(k: String) = self.get(k) | ||
def +[B1 >: AnyRef](kv: (String, B1)) = self.put(kv._1, kv._2.asInstanceOf[AnyRef]).toMap | ||
def -(k: String) = self.-(k).toMap | ||
def iterator = self.keys.iterator.map(k => (k, self.get(k).get)) | ||
} | ||
def updated(newMap: Map[String, AnyRef]): SummingbirdConfig = { | ||
val removedKeys: Set[String] = keys.toSet -- newMap.keys | ||
val changedOrAddedKeys = newMap.flatMap{ case (k, v) => | ||
val oldVal = get(k) | ||
if(oldVal != Some(v)) { | ||
Some((k, v)) | ||
} else None | ||
} | ||
val newWithoutRemoved = removedKeys.foldLeft(self)(_.remove(_)) | ||
changedOrAddedKeys.foldLeft(newWithoutRemoved) {_ + _} | ||
} | ||
} | ||
|
||
trait MutableStringConfig { | ||
protected def summingbirdConfig: SummingbirdConfig | ||
private var config = summingbirdConfig | ||
def get(key: String) = { | ||
assert(config != null) | ||
config.get(key) match { | ||
case Some(s) => s.toString | ||
case None => null | ||
} | ||
} | ||
|
||
def set(key: String, value: String) { | ||
assert(config != null) | ||
config = config.put(key, value) | ||
} | ||
def unwrap = config | ||
} | ||
|
||
trait ReadableMap { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it's immutable, why call this out directly? docstring? Also, should SummingbirdConfig extend at least this trait (if not Scala's map trait)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This trait is purely for its downstream configs. Type signatures for keys are slightly different. The SummingbirdConfig is what something like the WrappingConfig implements, the wrapping config though needs access to a ReadableMap like object (Wraps an Hadoop config/storm config) to provide the underlying store. I'll add a doc string |
||
def get(key: String): Option[AnyRef] | ||
def keys: Set[String] | ||
} | ||
|
||
|
||
object WrappingConfig { | ||
def apply(backingConfig: ReadableMap) = new WrappingConfig( | ||
backingConfig, | ||
Map[String, AnyRef](), | ||
Set[String]()) | ||
} | ||
|
||
case class WrappingConfig(private val backingConfig: ReadableMap, | ||
updates: Map[String, AnyRef], | ||
removes: Set[String]) extends SummingbirdConfig { | ||
|
||
def get(key: String) = { | ||
updates.get(key) match { | ||
case s@Some(_) => s | ||
case None => | ||
if(removes.contains(key)) | ||
None | ||
else | ||
backingConfig.get(key) | ||
} | ||
} | ||
|
||
def put(key: String, v: AnyRef): WrappingConfig = { | ||
assert(v != null) | ||
this.copy(updates = (updates + (key -> v)), removes = (removes - key)) | ||
} | ||
|
||
def remove(key: String): WrappingConfig = { | ||
this.copy(updates = (updates - key), removes = (removes + key)) | ||
} | ||
|
||
def keys: Iterable[String] = { | ||
((backingConfig.keys ++ updates.keys) -- removes) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
Copyright 2013 Twitter, Inc. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package com.twitter.summingbird.scalding | ||
|
||
import scala.collection.JavaConverters._ | ||
import org.apache.hadoop.conf.Configuration | ||
import com.twitter.summingbird.{WrappingConfig, ReadableMap} | ||
|
||
object ScaldingConfig { | ||
def apply(backingConfig: Configuration) = WrappingConfig(new WrappedHadoopConfig(backingConfig)) | ||
} | ||
|
||
class WrappedHadoopConfig(backingConfig: Configuration) extends ReadableMap { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice, good to wrangle configuration into something that makes more sense. |
||
def get(key: String): Option[AnyRef] = | ||
if(keys.contains(key)) | ||
Some(backingConfig.get(key)) | ||
else | ||
None | ||
|
||
val keys: Set[String] = backingConfig.iterator.asScala.map(_.getKey).toSet | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I know it gets the graph walking to compile, but this makes me nervous... Doesn't this stop the
StoreIntermediateData
call into the builder API from type checking? Do we have an example job that still works with the Any everywhere, or proof that you can't pass in the wrong sink?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could only ever enforce on one line, specifying the type params on the constructor, and then the sink not matching those. -- though that appears to be how its used internally. Will commit change that brings them back. (Also hadn't realized this was in the external API...) Would have just broken users code