-
Notifications
You must be signed in to change notification settings - Fork 708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First draft of pure-scalding memory backend #1697
Conversation
cc @ianoc |
def loop(): R = { | ||
val init = ref.get | ||
val (next, res) = fn(init) | ||
if (ref.compareAndSet(init, next)) res |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify: This is like STM where you're only making the change if the initial conditions when calling fn
are met? Could this loop forever if there is another thread that consistently gets scheduled between the read and write?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. But I don't think that can happen because we only mutate it when jobs finish and there are only a finite number of jobs running at a time, so I think it is exponentially unlikely that they do spin forever.
|
||
def openForRead(config: Config, tap: Tap[_, _, _]): TupleEntryIterator = ??? | ||
def fileExists(filename: String): Boolean = ??? | ||
def newFlowConnector(props: Config): FlowConnector = ??? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are just not implemented yet? Is there an exception we could throw instead of using undefined? Or is this the standard for scala?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These cannot be implemented because this platform does not know about cascading. Ultimately I'd like to remove these methods from Mode
and move them down to a class like CascadingBackedMode
, but I want to do that with your help since you likely have some code somewhere that assumes Mode
has these methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnynek could you add a comment or a specific exception with this explanation?
def get(): T = ref.get | ||
} | ||
|
||
final class MemoryMode private (srcs: Map[TypedSource[_], Iterable[_]], sinks: Map[TypedSink[_], AtomicBox[Option[Iterable[_]]]]) extends Mode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this Map
work if we read from and write to the same path at different points during the flow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, because the sinks and sources are separate. You can't make a source EXCEPT something that is already a list on this platform.
|
||
def result(implicit cec: ConcurrentExecutionContext): Future[ArrayBuffer[(K, V2)]] = | ||
input.result.map { kvs => | ||
val m = MMap[K, ArrayList[V1]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename for clarity to like valuesGroupedByKey
or something?
|
||
def openForRead(config: Config, tap: Tap[_, _, _]): TupleEntryIterator = ??? | ||
def fileExists(filename: String): Boolean = ??? | ||
def newFlowConnector(props: Config): FlowConnector = ??? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johnynek could you add a comment or a specific exception with this explanation?
object MemoryPlanner { | ||
|
||
sealed trait Op[+O] { | ||
def result(implicit cec: ConcurrentExecutionContext): Future[ArrayBuffer[_ <: O]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this new mode be used only for testing? I think a simpler synchronous implementation without parallellism would suffice for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to actually make this production grade as we move forward. I don't think it will be that hard. So I don't want to back out the design of using Futures since a better system will have more explicit parallelism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. I think Future
might not be the best tool to achieve parallelism given its high execution overhead, but I think it's fine to use Future
for now and revisit the decision if necessary.
|
||
case class Concat[O](left: Op[O], right: Op[O]) extends Op[O] { | ||
def result(implicit cec: ConcurrentExecutionContext) = { | ||
// start both futures in parallel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not inline these? They will run in parallel regardless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bad comment from when I did for { }
which is not parallel unless we start the futures first. Now, it doesn't matter (using zip).
sealed trait Op[+O] { | ||
def result(implicit cec: ConcurrentExecutionContext): Future[ArrayBuffer[_ <: O]] | ||
|
||
def flatMap[O1](fn: O => TraversableOnce[O1]): Op[O1] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the naming here confusing. flatMap
indicates a monad, but function doesn't return a monad instance. Also, it's strange the implementation creates a Map
for a flatMap
. Maybe the method could be called mapValues
and the implementation be on top of mapAll
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hear you on flatMap
. I can rename concatMap, but the fact is we used flatMap
for scalding to make it a bit easier for newbies to scala (since it is like flatMap on List in that List accepts a similarly broad return type). TypedPipe[T]
is definitely not a monad (it is Applicative!). But I can change this name here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks!
def flatMap[O1](fn: O => TraversableOnce[O1]): Op[O1] = | ||
Op.Map(this, fn) | ||
|
||
def mapAll[O1 >: O, O2](fn: IndexedSeq[O1] => ArrayBuffer[O2]): Op[O2] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdyt about transform
instead of mapAll
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
|
||
case f@MapValues(_, _) => | ||
def go[K, V, U](node: MapValues[K, V, U]) = { | ||
// don't capture node, which is a TypedPipe, which we avoid serializing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary?
|
||
case Mapped(input, fn) => | ||
val (m1, op) = plan(m, input) | ||
(m1, op.flatMap { t => fn(t) :: Nil }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be worth adding a map
method to avoid creating one list for each element.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
(m, Op.Source({ cec => | ||
mem.readSource(src) match { | ||
case Some(iter) => Future.successful(iter) | ||
case None => Future.failed(new Exception(s"Source: $src not wired")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdyt about explaining how the user can fix the error in the exception message?
go(imr) | ||
} | ||
|
||
case class State( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could it be private[this]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
forced: Map[TypedPipe[_], Future[TypedPipe[_]]] | ||
) | ||
|
||
val state = new AtomicBox[State](State(0, MemoryPlanner.Memo.empty, Map.empty)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could it be private[this]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
will send an update addressing these comments. Thank you for taking the time to look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really cool stuff. Would be nice to add more scenarios in the memory test but we can follow up with that in a future pr.
def newWriter(): Writer = | ||
new MemoryWriter(this) | ||
|
||
def openForRead(config: Config, tap: Tap[_, _, _]): TupleEntryIterator = ??? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw an exception instead? this will evaluate to a NotImplementedError which might lead the caller / user to think it's a implementation bug?
@annotation.tailrec | ||
def loop(): R = { | ||
val init = ref.get | ||
val (next, res) = fn(init) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to worry about fn being computationally expensive? We could call and cache the value in the enclosing method if that's the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understood correctly the purpose of this class, caching doesn't seem possible since fn
depends on the current value and needs to be computed on each retry (the value will be different on each retry). @johnynek maybe renaming loop
to retry
would be more clear?
sealed trait Op[+O] { | ||
def result(implicit cec: ConcurrentExecutionContext): Future[ArrayBuffer[_ <: O]] | ||
|
||
def flatMap[O1](fn: O => TraversableOnce[O1]): Op[O1] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we name the generic type something else rather than O1?
def flatMap[O1](fn: O => TraversableOnce[O1]): Op[O1] = | ||
Op.Map(this, fn) | ||
|
||
def mapAll[O1 >: O, O2](fn: IndexedSeq[O1] => ArrayBuffer[O2]): Op[O2] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something apart from O1, O2?
} | ||
sum(slk) | ||
|
||
case tp@TrappedPipe(_, _, _) => ??? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw not yet implemented exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
???
means not implemented.
case WithDescriptionTypedPipe(pipe, description, dedup) => | ||
plan(m, pipe) | ||
|
||
case WithOnComplete(pipe, fn) => ??? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same.
* with multi-way or we need to keep | ||
* the decomposed series of joins | ||
*/ | ||
??? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be a todo?
okay. Sorry for the delay. Can you all take another look. As you know, this is a long line of changes and I am trying to keep each one somewhat digestible (shooting for ~400 lines of diff). This is slightly longer, so I'm hoping we can address any outstanding issues in a follow up. I'd love to get the optimizations in place so we could think about releasing scalding 0.18 with this change to typed pipe, and in fact this memory platform is just a proof of concept that you can run without cascading. We can polish it more and make it as nice as we like, but the main purpose is to have a realistic example to prove that the API basically works without getting into the weeds of spark or flink. |
Looks good to me. Seems like the CI build has been hitting the 50 min timeout on the hadoop tests (noticed that on: #1700 as well). We'll need to either bump the timeout / maybe breakout the tests in that suite. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This follows up the thread of work leading to #1682
This gives an in-memory backend without using cascading (which for the basic tests is MUCH faster).
This is not a production quality backend yet:
The main point of this is to exercise using the execution API without cascading in the loop. I think this proof of concept shows that a spark backend would not be very hard at this point and the memory backend should be a guide for someone looking to do that.
I think we should merge this despite it not being complete because the PR is already dense enough. I'd like to improve the quality of the test coverage and support all the cases in later PRs.
r? @fwbrasil @piyushnarang