Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Support for creating a store-service for Storm platform #563

Merged
merged 9 commits into from
Dec 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ object SummingbirdBuild extends Build {
)
).dependsOn(
summingbirdCore % "test->test;compile->compile",
summingbirdBatch
summingbirdBatch,
summingbirdClient
)

lazy val summingbirdStorm = module("storm").settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.

package com.twitter.summingbird.store

import com.twitter.algebird.{ Monoid, Semigroup }
import com.twitter.algebird.Semigroup
import com.twitter.algebird.util.UtilAlgebras._
import com.twitter.bijection.Pivot
import com.twitter.storehaus.{ FutureCollector, FutureOps, ReadableStore }
Expand All @@ -32,28 +32,28 @@ import com.twitter.util.Future
*/

object ClientStore {
def apply[K, V](onlineStore: ReadableStore[(K, BatchID), V], batchesToKeep: Int)(implicit batcher: Batcher, monoid: Monoid[V]): ClientStore[K, V] =
def apply[K, V](onlineStore: ReadableStore[(K, BatchID), V], batchesToKeep: Int)(implicit batcher: Batcher, semigroup: Semigroup[V]): ClientStore[K, V] =
apply(ReadableStore.empty, onlineStore, batchesToKeep)

// If no online store exists, supply an empty store and instruct the
// client to keep a single batch.
def apply[K, V](offlineStore: ReadableStore[K, (BatchID, V)])(implicit batcher: Batcher, monoid: Monoid[V]): ClientStore[K, V] =
def apply[K, V](offlineStore: ReadableStore[K, (BatchID, V)])(implicit batcher: Batcher, semigroup: Semigroup[V]): ClientStore[K, V] =
apply(offlineStore, ReadableStore.empty, 1)

def defaultOnlineKeyFilter[K] = (k: K) => true

def apply[K, V](
offlineStore: ReadableStore[K, (BatchID, V)],
onlineStore: ReadableStore[(K, BatchID), V],
batchesToKeep: Int)(implicit batcher: Batcher, monoid: Monoid[V]): ClientStore[K, V] =
batchesToKeep: Int)(implicit batcher: Batcher, semigroup: Semigroup[V]): ClientStore[K, V] =
new ClientStore[K, V](offlineStore, onlineStore,
batcher, batchesToKeep, defaultOnlineKeyFilter[K], FutureCollector.bestEffort)

def apply[K, V](
offlineStore: ReadableStore[K, (BatchID, V)],
onlineStore: ReadableStore[(K, BatchID), V],
batchesToKeep: Int,
onlineKeyFilter: K => Boolean)(implicit batcher: Batcher, monoid: Monoid[V]): ClientStore[K, V] =
onlineKeyFilter: K => Boolean)(implicit batcher: Batcher, semigroup: Semigroup[V]): ClientStore[K, V] =
new ClientStore[K, V](offlineStore, onlineStore,
batcher, batchesToKeep, onlineKeyFilter, FutureCollector.bestEffort)

Expand All @@ -62,7 +62,7 @@ object ClientStore {
onlineStore: ReadableStore[(K, BatchID), V],
batchesToKeep: Int,
onlineKeyFilter: K => Boolean,
collector: FutureCollector[(K, Iterable[BatchID])])(implicit batcher: Batcher, monoid: Monoid[V]): ClientStore[K, V] =
collector: FutureCollector[(K, Iterable[BatchID])])(implicit batcher: Batcher, semigroup: Semigroup[V]): ClientStore[K, V] =
new ClientStore[K, V](offlineStore, onlineStore, batcher, batchesToKeep, onlineKeyFilter, collector)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2013 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird.online

import com.twitter.summingbird.batch.{ Batcher, BatchID }
import com.twitter.summingbird.store.ClientStore
import com.twitter.storehaus.ReadableStore
import com.twitter.storehaus.algebra.MergeableStore

trait CombinedServiceStoreFactory[-K, V] extends MergeableStoreFactory[(K, BatchID), V] with OnlineServiceFactory[K, V]

object CombinedServiceStoreFactory {

def apply[K, V](onlineStore: MergeableStore[(K, BatchID), V], batchesToKeep: Int)(implicit b: Batcher) = {

val clientStore = ClientStore[K, V](onlineStore, batchesToKeep)(b, onlineStore.semigroup)

new CombinedServiceStoreFactory[K, V] {
def mergeableStore = () => onlineStore
def mergeableBatcher = b
def serviceStore = () => clientStore
}
}

def apply[K, V](offlineStore: ReadableStore[K, (BatchID, V)], onlineStore: MergeableStore[(K, BatchID), V], batchesToKeep: Int)(implicit b: Batcher) = {

val clientStore = ClientStore[K, V](offlineStore, onlineStore, batchesToKeep)(b, onlineStore.semigroup)

new CombinedServiceStoreFactory[K, V] {
def mergeableStore = () => onlineStore
def mergeableBatcher = b
def serviceStore = () => clientStore
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ object FlatMapOperation {
storeSupplier: OnlineServiceFactory[K, JoinedV]): FlatMapOperation[T, (K, (V, Option[JoinedV]))] =
new FlatMapOperation[T, (K, (V, Option[JoinedV]))] {
lazy val fm = fmSupplier
lazy val store = storeSupplier.create
lazy val store = storeSupplier.serviceStore()
override def apply(t: T) =
fm.apply(t).flatMap { trav: TraversableOnce[(K, V)] =>
val resultList = trav.toSeq // Can't go through this twice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@ import com.twitter.summingbird.batch.{ Batcher, BatchID }
*/
object MergeableStoreFactory {

def apply[K, V](store: () => Mergeable[K, V], batcher: Batcher) = {
new MergeableStoreFactory[K, V] {
def mergeableStore = store
def mergeableBatcher = batcher
}
}

def from[K, V](store: => Mergeable[(K, BatchID), V])(implicit batcher: Batcher): MergeableStoreFactory[(K, BatchID), V] =
MergeableStoreFactory({ () => store }, batcher)
apply({ () => store }, batcher)

def fromOnlineOnly[K, V](store: => MergeableStore[K, V]): MergeableStoreFactory[(K, BatchID), V] = {
implicit val batcher = Batcher.unit
from(store.convert { k: (K, BatchID) => k._1 })
}
}

case class MergeableStoreFactory[-K, V](
store: () => Mergeable[K, V],
batcher: Batcher)
trait MergeableStoreFactory[-K, V] {
def mergeableStore: () => Mergeable[K, V]
def mergeableBatcher: Batcher
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ object MergeableStoreFactoryAlgebra {
def wrapOnlineFactory[K, V](supplier: MergeableStoreFactory[K, V]): MergeableStoreFactory[K, (Timestamp, V)] =
{
val mergeable: () => Mergeable[K, (Timestamp, V)] =
() => { new WrappedTSInMergeable(supplier.store()) }
() => { new WrappedTSInMergeable(supplier.mergeableStore()) }

MergeableStoreFactory[K, (Timestamp, V)](mergeable, supplier.batcher)
MergeableStoreFactory[K, (Timestamp, V)](mergeable, supplier.mergeableBatcher)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ import com.twitter.storehaus.ReadableStore
* The Function1 here is to allow cleaner diasy chaining of operations via andThen.
*/
trait OnlineServiceFactory[-K, +V] extends java.io.Serializable {
def create: ReadableStore[K, V]
def serviceStore: () => ReadableStore[K, V]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,4 @@ import com.twitter.storehaus.ReadableStore
* This is our default supplied instance of the OnlineServiceFactory.
* This is a class for wrapping ReadableStore constructors into our wrapping type.
*/

case class ReadableServiceFactory[-K, +V](
store: () => ReadableStore[K, V]) extends OnlineServiceFactory[K, V] {
def create = store()
}
case class ReadableServiceFactory[-K, +V](override val serviceStore: () => ReadableStore[K, V]) extends OnlineServiceFactory[K, V]
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class Summer[Key, Value: Semigroup, Event, S, D, RC](

override def init(runtimeContext: RC) {
super.init(runtimeContext)
storePromise.setValue(storeBox.get.store())
storePromise.setValue(storeBox.get.mergeableStore())
store.toString // Do the lazy evaluation now so we can connect before tuples arrive.

successHandlerOpt = if (includeSuccessHandler.get) Some(successHandlerBox.get) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
val summerProducer = summer.members.collect { case s: Summer[_, _, _] => s }.head.asInstanceOf[Summer[Storm, K, V]]
// When emitting tuples between the Final Flat Map and the summer we encode the timestamp in the value
// The monoid we use in aggregation is timestamp max.
val batcher = summerProducer.store.batcher
val batcher = summerProducer.store.mergeableBatcher
implicit val valueMonoid: Semigroup[V] = summerProducer.semigroup

// Query to get the summer paralellism of the summer down stream of us we are emitting to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ object Storm {

def service[K, V](serv: => ReadableStore[K, V]): ReadableServiceFactory[K, V] = ReadableServiceFactory(() => serv)

/**
* Returns a store that is also a service, i.e. is a ReadableStore[K, V] and a Mergeable[(K, BatchID), V]
* The values used for the service are from the online store only.
* Uses ClientStore internally to create ReadableStore[K, V]
*/
def storeServiceOnlineOnly[K, V](store: => MergeableStore[(K, BatchID), V], batchesToKeep: Int)(implicit batcher: Batcher): CombinedServiceStoreFactory[K, V] =
CombinedServiceStoreFactory(store, batchesToKeep)(batcher)

/**
* Returns a store that is also a service, i.e. is a ReadableStore[K, V] and a Mergeable[(K, BatchID), V]
* The values used for the service are from the online *and* offline stores.
* Uses ClientStore internally to combine the offline and online stores to create ReadableStore[K, V]
*/
def storeService[K, V](offlineStore: ReadableStore[K, (BatchID, V)], onlineStore: => MergeableStore[(K, BatchID), V], batchesToKeep: Int)(implicit batcher: Batcher): CombinedServiceStoreFactory[K, V] =
CombinedServiceStoreFactory(offlineStore, onlineStore, batchesToKeep)(batcher)

def toStormSource[T](spout: Spout[T],
defaultSourcePar: Option[Int] = None)(implicit timeOf: TimeExtractor[T]): StormSource[T] =
SpoutSource(spout.map(t => (Timestamp(timeOf(t)), t)), defaultSourcePar.map(SourceParallelism(_)))
Expand Down Expand Up @@ -182,7 +198,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
private def scheduleSummerBolt[K, V](jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = {
val summer: Summer[Storm, K, V] = node.members.collect { case c: Summer[Storm, K, V] => c }.head
implicit val semigroup = summer.semigroup
implicit val batcher = summer.store.batcher
implicit val batcher = summer.store.mergeableBatcher
val nodeName = stormDag.getNodeName(node)

type ExecutorKeyType = (K, BatchID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.twitter.summingbird.storm

import com.twitter.util.Future
import com.twitter.storehaus.{ Store, WritableStore }
import com.twitter.storehaus.{ Store, ReadableStore, WritableStore }

import com.twitter.summingbird.online._

Expand All @@ -40,5 +40,5 @@ class WritableStoreSink[K, V](writable: => WritableStore[K, V]) extends StormSin
class StormBuffer[K, V](supplier: => Store[K, V]) extends StormSink[(K, V)] with OnlineServiceFactory[K, V] {
private lazy val constructed = supplier // only construct it once
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why aren't we doing this trick on the other store factories? Is it because this is possibly called twice, once with toFn the other with serviceStore?

Can we add a comment here while we are working on updating this set of items?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure on the rationale behind having this lazy construct here and not in other places. AFAICT StormBuffer is not used anywhere either?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it was just defensive. :/ Let's leave it for the time being.

def toFn = { (kv: (K, V)) => constructed.put((kv._1, Some(kv._2))) }
def create = constructed
def serviceStore: () => ReadableStore[K, V] = () => constructed
}