-
Notifications
You must be signed in to change notification settings - Fork 707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move toIterableExecution and forceToDiskExecution into Execution #1682
Conversation
@ianoc any time for this one? |
ping @benpence |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand what's going on in each localized scope, but am less familiar with the whole picture here.
Overall, it looks fine. Left a few questions/comments.
cache.writer match { | ||
case ar: AsyncFlowDefRunner => | ||
ar.validateAndRun(conf, mode)(result).map { m => ((), Map(m)) } | ||
case other => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not also make this part of the interface like in #start()
instead of matching? Do we anticipate handling writers differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we can easily do this differently with the current interface -- in that FlowDefExecution is tied to cascading. this match would eventually go away with a more cross platform interface I would think. Since we would want to eventually get references to FlowDef out of this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what Ian said. The goal here is to make hopefully all of the typed API not depend on cascading and just make the cascading backend one implementation. I think no one uses this function anyway (creating an Execution from a FlowDef) and I want to just remove it, but in the mean time, I want to not dig us in any deeper.
If no one at Twitter is using it, I would suggest we remove the function, but keeping it around for 1 version deprecated may work okay too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do have a few #fromFn
uses in our code, specifically in a couple places where we have FlowDef => Unit
functions to accommodate both Job
& Execution
. Deprecation is definitely a good idea though.
val summedCounters = (fdCounters :: lCounters).reduce(_ ++ _) | ||
(fn(conf, mode), summedCounters) | ||
} | ||
val bothFutures = failFastZip(otherResult, localFlowDefCountersFuture) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this localFlowDefCountersFuture
is a Future
for the stats from finishing the cascading flow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First note, I didn't change that, I just captured the future into a name so I could use a for comprehension and have it read nicer, in my opinion.
About how this works, I hope the code is somewhat readable, but it is not simple. This is where a lot of the hard work of Execution is happening and it got more complicated over time due to finding and fixing issues with its use at Twitter. The thing we are working very hard to do here is not ever re-run a ToWrite aside from also not re-running ANY execution. The problem is, if things are zipped differently, the same ToWrite might end up in two different Executions. So we look through for all the ToWrites we didn't yet run, we run those in one call to the Writer, and we merge the counter results from the other previous writes (otherwise the counter results are wrong).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya I was just a little confused by what was going on here in the old code. I think it makes sense now. I think the concept of "we" and "someoneElse" was not clear to me before.
Execution.write(writeFn, readFn, filesToDeleteFn) | ||
} | ||
def forceToDiskExecution: Execution[TypedPipe[T]] = | ||
Execution.forceToDisk(this) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is much easier to read :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:)
@@ -68,14 +73,51 @@ object AsyncFlowDefRunner { | |||
} | |||
|
|||
/** | |||
* This holds an internal thread to run | |||
* This holds an internal thread to submit run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a merge issue I think... :/
s | ||
} | ||
private def getState: State = | ||
updateState { s => (s, s) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this a little hard to read because of the use of S
and s
and I anticipate people unfamiliar with this construct will find it confusing. Can we use R
and result
instead or some more verbose in the naming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you be concrete about how you want to change this? These is only one variable here, the state and you are copying that to be the result. Can you write the code you want me to replace it with because I don't follow your comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure something along the lines of
private def updateState[R](modify: State => (State, R)): R =
mutex.synchronized {
val (newState, result) = modify(state)
state = newState
result
}
private def getState: State =
// Expose current state as the result value
updateState { state => (state, state) }
although as is it's probably OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll sweep this change into the next one if that's okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
* This holds an internal thread to submit run | ||
* a Config, Mode, FlowDef and return a Future holding the | ||
* JobStats | ||
*/ | ||
class AsyncFlowDefRunner extends Writer { self => | ||
import AsyncFlowDefRunner._ | ||
|
||
private[this] val filesToCleanup = mutable.Set[String]() | ||
private[this] val mutex = new AnyRef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the mutex rather than self?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
personal OCD. I don't like to lock things others can see. I always like to lock a private val.
s | ||
} | ||
private def getState: State = | ||
updateState { s => (s, s) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure something along the lines of
private def updateState[R](modify: State => (State, R)): R =
mutex.synchronized {
val (newState, result) = modify(state)
state = newState
result
}
private def getState: State =
// Expose current state as the result value
updateState { state => (state, state) }
although as is it's probably OK.
val summedCounters = (fdCounters :: lCounters).reduce(_ ++ _) | ||
(fn(conf, mode), summedCounters) | ||
} | ||
val bothFutures = failFastZip(otherResult, localFlowDefCountersFuture) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya I was just a little confused by what was going on here in the old code. I think it makes sense now. I think the concept of "we" and "someoneElse" was not clear to me before.
cache.writer match { | ||
case ar: AsyncFlowDefRunner => | ||
ar.validateAndRun(conf, mode)(result).map { m => ((), Map(m)) } | ||
case other => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do have a few #fromFn
uses in our code, specifically in a couple places where we have FlowDef => Unit
functions to accommodate both Job
& Execution
. Deprecation is definitely a good idea though.
lgtm |
…tter#1682) * Pull cascading flowDef thread into cascading_backend * Make ToWrite public * remove unused imports * Move toIterableExecution and forceToDiskExecution into Execution * use Writer API a bit more
This is a follow up to #1681
Here we have done almost all of the work to make Execution delegate to a
Writer
that handles all the side effects.With this we can basically do 100% of having an in-memory or spark backend as long as users don't use certain cascading only features (toPipe, TypedPipe from a Pipe, or Execution from a FlowDef).
We are getting very close to where I want to be for the next published version. I want to make an example in-memory backend just to exercise everything to see if we really can run non-trivial jobs. We can leave a spark or flink backend to a future work.
I would like to remove the Execution from a FlowDef, I doubt it was ever used. I'd like to move toPipe to an implicit enrichment that users opt into if they really need it (which I doubt they would).