-
Notifications
You must be signed in to change notification settings - Fork 707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cache the zipped up write executions #1415
Cache the zipped up write executions #1415
Conversation
@@ -291,8 +294,23 @@ object Execution { | |||
*/ | |||
def finished(): Unit = messageQueue.put(Stop) | |||
|
|||
def getOrElseInsert[T](ex: Execution[T], | |||
res: => Future[(T, ExecutionCounters)])(implicit ec: ConcurrentExecutionContext): Future[(T, ExecutionCounters)] = { | |||
def getOrLock(write: ToWrite)(implicit ec: ConcurrentExecutionContext): Either[Promise[ExecutionCounters], Future[ExecutionCounters]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the implicit ec
?
val cacheLookup: List[(ToWrite, Either[Promise[ExecutionCounters], Future[ExecutionCounters]])] = (head :: tail).map{ tw => (tw, cache.getOrLock(tw)) } | ||
val (weDoOperation, someoneElseDoesOperation) = cacheLookup.partition(_._2.isLeft) | ||
|
||
val localFlowDefCountersFuture: Future[ExecutionCounters] = if (!weDoOperation.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if(weDo.nonEmpty) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually what about:
weDoOperation match {
case all@(h :: tail) =>
// no need to call head and tail
case Nil => Future.successful(...)
}
futCounters | ||
} else Future.successful(ExecutionCounters.empty) | ||
|
||
Future.sequence(someoneElseDoesOperation.map(_._2.right.get)).zip(localFlowDefCountersFuture).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use the failFastZip
and probably our own sequence
that does the same:
def failFastSequence(t: Iterable[Future[T]]): Future[List[T]] = {
t.foldLeft(Future.successful(Nil: List[T])) { (f, i) =>
failFastZip(f, i).map { case (tail, h) => h :: tail }
}
.map(_.reverse)
}
// Anything not already ran we run as part of a single flow def, using their combined counters for the others | ||
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = { | ||
val cacheLookup: List[(ToWrite, Either[Promise[ExecutionCounters], Future[ExecutionCounters]])] = (head :: tail).map{ tw => (tw, cache.getOrLock(tw)) } | ||
val (weDoOperation, someoneElseDoesOperation) = cacheLookup.partition(_._2.isLeft) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the geekout, but can we get the types right here:
``val (weDoOperation, someoneElseDoesOperation): List[(ToWrite, Promise[...]), (ToWrite, Future[..])])` without a cast?
I think something like:
def go[A, B, C](it: List[(A, Either[B, C])]): (List[(A, B)], List[(A, C)]) = it match {
case (a, Left(b)) :: tail =>
val (l, r) = go(tail)
((a, b) :: l, r)
case (a, Right(c)) :: tail =>
val (l, r) = go(tail)
(l, (a, c) :: r)
case Nil => (Nil, Nil)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which cast are you seeing? or you mean without doing the unwraps lower down for right and left?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with the go function, I think thats what you want it used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just the .left.get
val summedCounters: ExecutionCounters = Monoid.sum(fdCounters :: lCounters) | ||
(fn(conf, mode), summedCounters) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could override def map
and get better composition I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it avoid anything to inline Map instead of having the MappedExecution? if its in the presentation function we won't cache it anyway. (Indeed if the map function is expensive we would want to cache it as we do MappedExecutions right?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a.map(fn).zip(b)
would not get this optimization now, but if you inline map it would.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, there is a trade-off of how expensive the map function is. Right now, since you could execute prefixes twice. No big deal either way if you ask me. Maybe keep it simpler now and just stick with what you have.
def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = { | ||
val cacheLookup: List[(ToWrite, Either[Promise[ExecutionCounters], Future[ExecutionCounters]])] = (head :: tail).map{ tw => (tw, cache.getOrLock(tw)) } | ||
val (weDoOperation, someoneElseDoesOperation) = unwrapListEither(cacheLookup) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worth checking if
val otherResult = failFastSequence(someoneElseDoesOperation.map(_._2))
otherResult.value match {
case Some(Failure(err)) => Future.failed(err)
case _ =>
// not complete yet, or successful:
// schedule the write, and do the zip on otherResult.
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats a good idea, can just fail fast the whole process
👍 nice! this is much better: compose when possible, still guarantee each write is done at most once. So, the lesson here: if you use |
Cache the zipped up write executions
Merged this into the Other PR for any more comments/review before going to master |
No description provided.