Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Support for creating a store-service for Storm platform #563

Merged
merged 9 commits into from
Dec 19, 2014
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Copyright 2013 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.summingbird.online

import com.twitter.summingbird.batch.{ Batcher, BatchID }
import com.twitter.storehaus.ReadableStore
import com.twitter.storehaus.algebra.Mergeable

trait CombinedServiceStoreFactory[-K, V] extends MergeableStoreFactory[(K, BatchID), V] with OnlineServiceFactory[K, V]

object CombinedServiceStoreFactory {

def apply[K, V](mStore: () => Mergeable[(K, BatchID), V], b: Batcher) = {
new CombinedServiceStoreFactory[K, V] {
def mergeableStore = mStore
def mergeableBatcher = b
def serviceStore = () => ReadableStore.empty
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a sane constructor to have? left join would always come back None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, yea, we should only have the constructor with both stores defined

}
}

def apply[K, V](mStore: () => Mergeable[(K, BatchID), V], b: Batcher, sStore: () => ReadableStore[K, V]) = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want ReadableStore[(K, BatchID), V] which lets you read the value at a certain BatchID. And then if you have a MergableStore, which is both Mergeable and ReadableStore. Also, when we do the lookup, we should look at the time for the data, and ask the right batch.

You can always ignore the BatchID if you want on the read side.

We kind of messed this up in the past with summingbird and I've been trying to fix it (see #547 )

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, you could use a lazy parameter here and make the function below. This can be easier for users than writing the functions:

def apply[K, V](mStore: => Mergable[(K, BatchID), V], b: Batcher, sStore: => ReadableStore[K, V]) = ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have serviceStore function in Storm platform that takes the lazy parameters and turns them into functions https://github.com/twitter/summingbird/pull/563/files#diff-36ff64139544fb94baad353028959a1eR83. I was assuming users to be using that function like Storm.store and Storm.service.

For lookups, wouldn't we always want the latest value for a specific key? I agree that it would be nice to pass just one MergeableStore to remove the possibility of getting two different stores. ClientMergeableStore would work in this case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want the latest key, we want the latest that is before the timestamp of tuple coming in. Consider the fact that the queues get delayed, and the fact that we have batch boundaries. You don't want some part of the topology (some queue for instance that is low traffic), to get ahead update a service so the the backed up queue is looking into the future.

It is not a huge source of errors, but I think it is more consistent to give a BatchID in the key when doing a service read, but some stores might ignore that (using composeKeyMapping) to discard the BatchID and just look up the key. But if we bake into the API the fact that the BatchID is not available at lookup time, there is no way for a user to make a more precise store.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if we constrain the lookup to use the BatchID then we would be limiting the error on the value to be up to that BatchID since we don't have the actual timestamp in the store? So it's more correct but still not exactly correct. Unless we use very small batches (which we normally don't and they probably wouldn't make sense anyway). Also this change will require some significant changes in the online left join since right now we don't have access to the Batcher there but we would need to in order to cast the timestamp to the BatchID. Do you think this is a common enough usecase for us to change this behavior?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point that we don't have the batcher at the service lookup. This inconsistency bothers me, but I guess fixing it is bigger than this. Added: #568

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess here is where I want it to be easier to use. Can't we add a method that deals with the clientstore business here? That is confusing as to how to wire up, and I'd rather see it documented in code that works for most cases, than a wiki doc that explains the boilerplate to write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, good point. I will try to wire the ClientStore in here to simplify it for the users

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to distinguish between a service that uses online-only results (ClientStore(online)) and the one that combines online and offline (ClientStore(online, offline)). So, in user code, this would be:

val mergeableStore: MergeableStore[(Long, BatchID), Long] = Memcache.getMemcacheStore[(Long, BatchID), Long](Config.online)
val storeService: CombinedServiceStoreFactory[Long, Long] = Storm.storeService(mergeableStore)(Config.batcher)

or

val onlineStore: MergeableStore[(Long, BatchID), Long] = Memcache.getMemcacheStore[(Long, BatchID), Long](Config.online)
val offlineStore: BatchedStore[Long, Long] = VersionedStoreProvider.getVersionedStore[Long, Long](Config.offline)
val storeService: CombinedServiceStoreFactory[Long, Long] = Storm.storeService(onlineStore, offlineStore)(Config.batcher)

And we would have to document this so it is clear when to use what. Does this sound reasonable?

new CombinedServiceStoreFactory[K, V] {
def mergeableStore = mStore
def mergeableBatcher = b
def serviceStore = sStore
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ object FlatMapOperation {
storeSupplier: OnlineServiceFactory[K, JoinedV]): FlatMapOperation[T, (K, (V, Option[JoinedV]))] =
new FlatMapOperation[T, (K, (V, Option[JoinedV]))] {
lazy val fm = fmSupplier
lazy val store = storeSupplier.create
lazy val store = storeSupplier.serviceStore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put the type on this, i suspect each reference your doing here to store is actually a different store which isn't what we want.

(Is store a readable store or a () => Readable store? ) --> Possibly just putting storeSupplier.serviceStore() here instead might be all thats needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a ()=>ReadableStore, yes, we need the () at the end, thanks

override def apply(t: T) =
fm.apply(t).flatMap { trav: TraversableOnce[(K, V)] =>
val resultList = trav.toSeq // Can't go through this twice
Expand All @@ -125,15 +125,15 @@ object FlatMapOperation {
Future.value(Map.empty)
else {
// Do the lookup
val mres: Map[K, Future[Option[JoinedV]]] = store.multiGet(keySet)
val mres: Map[K, Future[Option[JoinedV]]] = store().multiGet(keySet)
val resultFutures = resultList.map { case (k, v) => mres(k).map { k -> (v, _) } }.toIndexedSeq
Future.collect(resultFutures)
}
}

override def close {
fm.close
Await.result(store.close)
Await.result(store().close)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@ import com.twitter.summingbird.batch.{ Batcher, BatchID }
*/
object MergeableStoreFactory {

def apply[K, V](store: () => Mergeable[K, V], batcher: Batcher) = {
new MergeableStoreFactory[K, V] {
def mergeableStore = store
def mergeableBatcher = batcher
}
}

def from[K, V](store: => Mergeable[(K, BatchID), V])(implicit batcher: Batcher): MergeableStoreFactory[(K, BatchID), V] =
MergeableStoreFactory({ () => store }, batcher)
apply({ () => store }, batcher)

def fromOnlineOnly[K, V](store: => MergeableStore[K, V]): MergeableStoreFactory[(K, BatchID), V] = {
implicit val batcher = Batcher.unit
from(store.convert { k: (K, BatchID) => k._1 })
}
}

case class MergeableStoreFactory[-K, V](
store: () => Mergeable[K, V],
batcher: Batcher)
trait MergeableStoreFactory[-K, V] {
def mergeableStore: () => Mergeable[K, V]
def mergeableBatcher: Batcher
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ object MergeableStoreFactoryAlgebra {
def wrapOnlineFactory[K, V](supplier: MergeableStoreFactory[K, V]): MergeableStoreFactory[K, (Timestamp, V)] =
{
val mergeable: () => Mergeable[K, (Timestamp, V)] =
() => { new WrappedTSInMergeable(supplier.store()) }
() => { new WrappedTSInMergeable(supplier.mergeableStore()) }

MergeableStoreFactory[K, (Timestamp, V)](mergeable, supplier.batcher)
MergeableStoreFactory[K, (Timestamp, V)](mergeable, supplier.mergeableBatcher)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ import com.twitter.storehaus.ReadableStore
* The Function1 here is to allow cleaner diasy chaining of operations via andThen.
*/
trait OnlineServiceFactory[-K, +V] extends java.io.Serializable {
def create: ReadableStore[K, V]
def serviceStore: () => ReadableStore[K, V]
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ import com.twitter.storehaus.ReadableStore

case class ReadableServiceFactory[-K, +V](
store: () => ReadableStore[K, V]) extends OnlineServiceFactory[K, V] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can just be:

case class ReadableServiceFactory[-K, +V](override val serviceStore: () => ReadableStore[K, V]) extends OnlineServiceFactory[K, V]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no love for my simplification. :) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't see this note :) will add the simplification

def create = store()
def serviceStore = store
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you write a return type on this. It is ambiguous (are we applying the function or no?).

Also, all public methods should have types.

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class Summer[Key, Value: Semigroup, Event, S, D, RC](

override def init(runtimeContext: RC) {
super.init(runtimeContext)
storePromise.setValue(storeBox.get.store())
storePromise.setValue(storeBox.get.mergeableStore())
store.toString // Do the lazy evaluation now so we can connect before tuples arrive.

successHandlerOpt = if (includeSuccessHandler.get) Some(successHandlerBox.get) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ case class FlatMapBoltProvider(storm: Storm, jobID: JobId, stormDag: Dag[Storm],
val summerProducer = summer.members.collect { case s: Summer[_, _, _] => s }.head.asInstanceOf[Summer[Storm, K, V]]
// When emitting tuples between the Final Flat Map and the summer we encode the timestamp in the value
// The monoid we use in aggregation is timestamp max.
val batcher = summerProducer.store.batcher
val batcher = summerProducer.store.mergeableBatcher
implicit val valueMonoid: Semigroup[V] = summerProducer.semigroup

// Query to get the summer paralellism of the summer down stream of us we are emitting to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ object Storm {

def service[K, V](serv: => ReadableStore[K, V]): ReadableServiceFactory[K, V] = ReadableServiceFactory(() => serv)

def storeService[K, V](mStore: => Mergeable[(K, BatchID), V], sStore: => ReadableStore[K, V])(implicit batcher: Batcher): CombinedServiceStoreFactory[K, V] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you use this? What do you put in for sStore?

Don't we need the ClientMergeable for this? If so, shouldn't we just take either that or the inputs in needs (like the offline store)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, I have it setup with ClientStore like so:

val mergeableStore: MergeableStore[(Long, BatchID), Long] = Memcache.getMemcacheStore[(Long, BatchID), Long](Config.online)
val clientStore: ClientStore[Long, Long] = ClientStore(mergeableStore, Config.batchesToKeep)(Config.batcher, Config.monoid)
val storeService: CombinedServiceStoreFactory[Long, Long] = Storm.storeService(mergeableStore, clientStore)(Config.batcher)

In this example we only use the online store, but the user can pass an offline store to it too. This is not great since the two stores (mStore and sStore) can be different (unless we somehow check that ClientStore wraps the same mergeable store), so we would have to be explicit about that.

ClientMergeable extends Mergeable[(K, BatchID), V] and ReadableStore[(K, BatchID), V] right? So we still would need to handle the BatchID somehow in the leftjoin

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. ClientMergeable could use composeKeyMapping to drop the batch and get a ReadableStore. But I think the ClientMergeable is really only needed when sumByKey output is consumed, not for the joining on store case.
  2. Can you add that function you have there to object ClientStore? Something like?
def storeAndService[K, V](m: MergeableStore[(K, BatchID), V], batchesToKeep: Int)(implicit b: Batcher): CombinedServciceStoreFactory[K, V] =
// you can get the semigroup/monoid from the MergeableStore, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, that would make clientstore depend on summingbird-storm. Can we put that function in the object Storm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about the function:

 def storeService[K, V](mStore: => Mergeable[(K, BatchID), V], sStore: => ReadableStore[K, V])(implicit batcher: Batcher): CombinedServiceStoreFactory[K, V]

? It is already in object Storm

CombinedServiceStoreFactory(() => mStore, batcher, () => sStore)

def toStormSource[T](spout: Spout[T],
defaultSourcePar: Option[Int] = None)(implicit timeOf: TimeExtractor[T]): StormSource[T] =
SpoutSource(spout.map(t => (Timestamp(timeOf(t)), t)), defaultSourcePar.map(SourceParallelism(_)))
Expand Down Expand Up @@ -182,7 +185,7 @@ abstract class Storm(options: Map[String, Options], transformConfig: Summingbird
private def scheduleSummerBolt[K, V](jobID: JobId, stormDag: Dag[Storm], node: StormNode)(implicit topologyBuilder: TopologyBuilder) = {
val summer: Summer[Storm, K, V] = node.members.collect { case c: Summer[Storm, K, V] => c }.head
implicit val semigroup = summer.semigroup
implicit val batcher = summer.store.batcher
implicit val batcher = summer.store.mergeableBatcher
val nodeName = stormDag.getNodeName(node)

type ExecutorKeyType = (K, BatchID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ class WritableStoreSink[K, V](writable: => WritableStore[K, V]) extends StormSin
class StormBuffer[K, V](supplier: => Store[K, V]) extends StormSink[(K, V)] with OnlineServiceFactory[K, V] {
private lazy val constructed = supplier // only construct it once
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why aren't we doing this trick on the other store factories? Is it because this is possibly called twice, once with toFn the other with serviceStore?

Can we add a comment here while we are working on updating this set of items?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure on the rationale behind having this lazy construct here and not in other places. AFAICT StormBuffer is not used anywhere either?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it was just defensive. :/ Let's leave it for the time being.

def toFn = { (kv: (K, V)) => constructed.put((kv._1, Some(kv._2))) }
def create = constructed
def serviceStore = () => constructed
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return type please.

}