-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][SPARK-29261][SQL][CORE] Support recover live entities from KVStore for (SQL)AppStatusListener #25943
[WIP][SPARK-29261][SQL][CORE] Support recover live entities from KVStore for (SQL)AppStatusListener #25943
Conversation
@@ -103,6 +104,81 @@ private[spark] class AppStatusListener( | |||
} | |||
} | |||
|
|||
// visible for tests | |||
private[spark] def recoverLiveEntities(): Unit = { | |||
if (!live) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may need to cooperate with SPARK-28594's config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think it should be run by default to guarantee AppStatusListener and KVStore is in sync, but let's wait for another voices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I've found we don't recover AppStatusSource as well since it will not be passed when live == false
. If possible I'd put assertion whether KVStore is empty when live == true
then, but it depends on the possibility. If not, we may have to live with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If possible I'd put assertion whether KVStore is empty when live == true then, but it depends on the possibility.
I think our current usage of a live(true) AppStatusListener
guarantees the empty KVStore at the initialization step. So I don't understand well for depends on the possibility
? Do I miss something ?
And yet, it seems that we don't have isEmpty
api for KVStore. Otherwise, I could put an assertion to reach a compromises between us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think our current usage of a live(true) AppStatusListener guarantees the empty KVStore at the initialization step. So I don't understand well for depends on the possibility ?
That's a "context" we might have a chance to break eventually and as a side-effect it will break here. I'm in favor of doing defensive programming: if there're preconditions it should be mentioned anywhere or asserted. But I agree we don't have isEmpty api for KVStore - let's leave it as it is.
Test build #111425 has finished for PR 25943 at commit
|
It would be helpful if you could describe remaining TODOs to remove |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a skimmed once except suites. For now I'm not familiar with how live entities are calculated and handled, so it would take some time to learn before taking a deep look. After then I guess I could also take a look at suites as well.
@@ -103,6 +104,81 @@ private[spark] class AppStatusListener( | |||
} | |||
} | |||
|
|||
// visible for tests | |||
private[spark] def recoverLiveEntities(): Unit = { | |||
if (!live) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think it should be run by default to guarantee AppStatusListener and KVStore is in sync, but let's wait for another voices.
stageData.info.status == v1.StageStatus.PENDING || | ||
stageData.info.status == v1.StageStatus.ACTIVE | ||
} | ||
.map { stageData => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation is not consistent between fliter and map. could be appended to previous line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean ?
.filter { stageData =>
...
}.map { stageData =>
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, see foreach in below.
val stageId = stageData.info.stageId | ||
val jobs = liveJobs.values.filter(_.stageIds.contains(stageId)).toSeq | ||
stageData.toLiveStage(jobs) | ||
}.foreach { stage => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indentation of block for foreach looks to be off
}.foreach { stage => | ||
val stageId = stage.info.stageId | ||
val stageAttempt = stage.info.attemptNumber() | ||
liveStages.put((stageId, stageAttempt), stage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Define stageKey
, and reuse the key for all places in this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the good advice. Actually, I also noticed the usage of [stageId, stageAttempt] across spark core.
In AppStatusListener
and EventLoggingListener
, we have Tuple2 for [stageId, stageAttempt], while in ExecutorAllocationManager
we have a case class named StageAttempt
for [stageId, stageAttempt]. Maybe, there are other formats in other places.
So, I'm thinking that, will it be better if we could unify those usages of [stageId, stageAttempt] across spark core ? e.g. use a global type definition named stageKey
. This would be more significant thing to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be nice, it would beyond the scope of this PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. But for now, I'd prefer to use Tuple2 (stageId, stageAttempt) as AppStatusListener
already do. And we could leave such optimization in a separate PR later.
.index(TaskIndexNames.STATUS) | ||
.first(TaskState.RUNNING.toString) | ||
.last(TaskState.RUNNING.toString) | ||
.closeableIterator().asScala |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a sake of understanding, any reason to use closeableIterator here and don't use it for others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, I mimicked the usage of KVStore from AppStatusStore
. After having a second look on this, I realized that closeableIterator()
is only used when we use Utils.tryWithResource
(which required closable resource). We don't really need this here. I'll remove that.
.foreach(stageMetrics.remove) | ||
.foreach { stageId => | ||
stageMetrics.remove(stageId) | ||
if (live) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this new requirement, or existing bug? If that's an existing bug, it would be nice to separate both cases and deal with other (small) PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's a new requirement.
@@ -424,13 +456,28 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity { | |||
|
|||
} | |||
|
|||
private class LiveStageMetrics( | |||
private[spark] class LiveStageMetrics ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove space
} | ||
|
||
/** | ||
* Used to recover LiveStageMetrics in SQLAppStatusListener. It would be wrote into KVStore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wrote -> written
@@ -181,6 +182,7 @@ class RDDStorageInfo private[spark]( | |||
val partitions: Option[Seq[RDDPartitionInfo]]) | |||
|
|||
class RDDDataDistribution private[spark]( | |||
val executorId: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we changing model in api/v1? Seems like the change is needed, but we may also need to check where it doesn't break existing one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the api versioning requirements are listed here: https://spark.apache.org/docs/latest/monitoring.html#api-versioning-policy
New fields may be added to existing endpoints
so its OK to add new fields.
but, if we're adding things that really don't make sense as part of the api, then maybe we should just store a different object instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @squito, thanks for providing the api versioning requirements. That's really helpful.
* Return the StorageLevel object with the specified description. | ||
*/ | ||
@DeveloperApi | ||
def fromDescription(desc: String): StorageLevel = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worried that we're relying on string parse which is easy to be broken. Is this covered by UT or so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion, I'll add a unit test for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still learning, just checked the conditions of entities to be "live".
} | ||
stage.savedTasks.addAndGet(kvstore.count(classOf[TaskDataWrapper]).intValue()) | ||
} | ||
kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to restore deadExecutors
for the same. (isActive == false
)
kvstore.view(classOf[TaskDataWrapper]) | ||
.parent(Array(stageId, stageAttempt)) | ||
.index(TaskIndexNames.STATUS) | ||
.first(TaskState.RUNNING.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we check we are safe to ignore LAUNCHING
? TaskInfo.isFinished
is safer approach to filter active tasks, but I also see the benefit to filter with index, so once it could be ignored then great to go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore LAUNCHING
is safe, because status
in TaskDataWrapper
is actually from LiveTask.TaskInfo.status
:
spark/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Lines 96 to 112 in e1ea806
def status: String = { | |
if (running) { | |
if (gettingResult) { | |
"GET RESULT" | |
} else { | |
"RUNNING" | |
} | |
} else if (failed) { | |
"FAILED" | |
} else if (killed) { | |
"KILLED" | |
} else if (successful) { | |
"SUCCESS" | |
} else { | |
"UNKNOWN" | |
} | |
} |
And there's no LAUNCHING
.
Another available running status is GET RESULT
. But this also seems impossible. TaskInfo
in LiveTask
is only updated when SparkListenerTaskStart
and SparkListenerTaskEnd
events comes. And these two events don't change task's status to GET RESULT
.
private[spark] def recoverLiveEntities(): Unit = { | ||
if (!live) { | ||
kvstore.view(classOf[JobDataWrapper]) | ||
.asScala.filter(_.info.status == JobExecutionStatus.RUNNING) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Safer condition would be either UNKNOWN
or RUNNING
, but I guess it shouldn't be UNKNOWN so seems OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I didn't see any place we set Job status to UNKNOWN
.
|
||
kvstore.view(classOf[StageDataWrapper]).asScala | ||
.filter { stageData => | ||
stageData.info.status == v1.StageStatus.PENDING || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logically, activeTasks > 0
is counted in liveStages
regardless of status:
spark/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Lines 605 to 614 in 5a512e8
// [SPARK-24415] Wait for all tasks to finish before removing stage from live list | |
val removeStage = | |
stage.activeTasks == 0 && | |
(v1.StageStatus.COMPLETE.equals(stage.status) || | |
v1.StageStatus.FAILED.equals(stage.status)) | |
if (removeStage) { | |
update(stage, now, last = true) | |
} else { | |
maybeUpdate(stage, now) | |
} |
spark/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Lines 743 to 748 in 5a512e8
// Remove stage only if there are no active tasks remaining | |
val removeStage = stage.activeTasks == 0 | |
update(stage, now, last = removeStage) | |
if (removeStage) { | |
liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)) | |
} |
... except status is SKIPPED:
spark/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Lines 408 to 429 in 5a512e8
// Check if there are any pending stages that match this job; mark those as skipped. | |
val it = liveStages.entrySet.iterator() | |
while (it.hasNext()) { | |
val e = it.next() | |
if (job.stageIds.contains(e.getKey()._1)) { | |
val stage = e.getValue() | |
if (v1.StageStatus.PENDING.equals(stage.status)) { | |
stage.status = v1.StageStatus.SKIPPED | |
job.skippedStages += stage.info.stageId | |
job.skippedTasks += stage.info.numTasks | |
job.activeStages -= 1 | |
pools.get(stage.schedulingPool).foreach { pool => | |
pool.stageIds = pool.stageIds - stage.info.stageId | |
update(pool, now) | |
} | |
it.remove() | |
update(stage, now, last = true) | |
} | |
} | |
} |
so the condition is actually much complicated than that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch here ! And I attached another possible condition for this:
(stageData.info.numActiveTasks > 0 && stageData.info.status != v1.StageStatus.SKIPPED)
@@ -76,6 +109,29 @@ private[spark] class JobDataWrapper( | |||
|
|||
@JsonIgnore @KVIndex("completionTime") | |||
private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L) | |||
|
|||
def toLiveJob: LiveJob = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
completedIndices
and completedStages
are not assigned, and they're not computed from AppStatusListener.recoverLiveEntities
. Is it missing, or valid reason to do so, or technical issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While converting LiveJob
to JobDataWrapper
, only completedIndices.size
/completedStages.size
are written into KVStore. So, it's impossible to recover the detail completedIndices
/completedStages
. So, as a compromises, we only recover the number in this case.
And this brings a problem that the recovered live job wouldn't be 100% the same as the previous one. But, this can be acceptable when you look at their(completedIndices
, completedStages
) usages in AppStatusListener
, as it does not mistake the final result(that is number).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can get multiple taskend events for the same taskInfo.index
however, so you actually need those indices
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can see how that would happen. But I think there would be only one successful task out of those end tasks, right ? And we only update completedIndices
when we receive a successful end task:
spark/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Lines 579 to 588 in d2f21b0
val (completedDelta, failedDelta, killedDelta) = event.reason match { | |
case Success => | |
(1, 0, 0) | |
case _: TaskKilled => | |
(0, 0, 1) | |
case _: TaskCommitDenied => | |
(0, 0, 1) | |
case _ => | |
(0, 1, 0) | |
} |
spark/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Lines 621 to 623 in d2f21b0
if (completedDelta > 0) { | |
job.completedIndices.add(taskIndex) | |
} |
WDYT ? @squito
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
two speculative tasks can both complete successfully. And the scheduler intentionally posts TaskEnd events for both of them:
spark/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Lines 1369 to 1375 in 6390f02
if (!stageIdToStage.contains(task.stageId)) { | |
// The stage may have already finished when we get this event -- eg. maybe it was a | |
// speculative task. It is important that we send the TaskEnd event in any case, so listeners | |
// are properly notified and can chose to handle it. For instance, some listeners are | |
// doing their own accounting and if they don't get the task end event they think | |
// tasks are still running when they really aren't. | |
postTaskEnd(event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any thoughts here ? @HeartSaVioR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we are OK with lossy information - we shouldn't let developer access information to do something which can be lost after restoring, but how? Leaving comments? Whenever we modify AppStatusListener to add some feature, we may have to struggle with both things - compatibility with old snapshot of state, and uncertain of lossy information if we cannot control well.
Ideally we would have to restore state of listener as same as it was; if we have to snapshot the state of listener in driver, memory consumption and performance would really matter as it affects live application. For SHS, we will have less pressure on this, and actually memory consumption would less matter as in fact we are not tied to store the state of listener in "KVStore". It could be another file along with snapshot file on KVStore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't fully followed the discussion here, not yet thought thoroughly about all the implications of the snapshotting approaches being discussed. But I just wanted to point out that the problems being talked about here (snapshotting a potentially large amount of data) are ridiculously worse on the SQL side.
Because of the way SQL metrics are calculated you need to have all data points, meaning that a snapshot taken in the middle of a stage of a SQL query can have a humongous amount of data.
The 100k task stage here could be encoded with 25k characters if you encode the bit set as hex characters (4 bits per char). That's not so bad. But on the SQL side, if you have let's say half of the tasks finished, you have to encode 50k longs * number of metrics being tracked by the stage, which is a lot more data to encode in json.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, will it be acceptable if we only consider snapshotting in the SHS ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, the concerns about putting excess work on the driver are certainly not as important on the SHS. Performance still matters, of course, but there maybe more options to speed things up (eg., running multiple processes to create snapshots in parallel). And we're still concerned about having a small snapshot file (if its too big, it won't be fast to load, of course). We dont' know for sure what that file size will be till we actually build it ... but my gut is that we can store all the required info in the snapshot file and keep things reasonably sized.
|
||
var activeTasks = 0 | ||
var completedTasks = 0 | ||
var failedTasks = 0 | ||
|
||
// Holds both the stage ID and the task index, packed into a single long value. | ||
val completedIndices = new OpenHashSet[Long]() | ||
// will only be set when recover LiveJob is needed. | ||
var completedIndicesNum = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we follow the name style from JobData? They have fields for number of XXX as numXXX
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea!
I've reviewed only a small part yet, but feel that supporting bi-directional conversions between LiveEntity and instance to be stored to KVStore wouldn't be easy, as LiveEntity has more information than stored instance to deal with incoming events. I've found a missing spot on converting JobDataWrapper to LiveJob (#25943 (comment)), and I'm guessing it would be due to technical reason. I'm not sure it is viable, but it might be pretty much easier if we find a way to store/load live entities to KVStore, with keeping the feature to convert live entities to instance to be stored and update. (may need to do POC - I would be happy to experiment on this) Then we just need to store live entities to KVStore once we flush the (SQL)AppStatusListener (to perform snapshot, or close AppStatusListener) and restore live entities from KVStore once initializing (SQL)AppStatusListener. |
Just gone through POC, and realized LiveEntities are not considered to be stored in KVStore - encountered many issues. Let's continue going through this approach. Will continue reviewing. |
|
||
var activeTasks = 0 | ||
var completedTasks = 0 | ||
var failedTasks = 0 | ||
|
||
// Holds both the stage ID and the task index, packed into a single long value. | ||
val completedIndices = new OpenHashSet[Long]() | ||
// will only be set when recover LiveJob is needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add completedIndices
and completedStages
into new KVStore entity class, and restore LiveJob from both JobDataWrapper and new class? I guess you would like to avoid modifying JobDataWrapper, but I'd worry the state of instance differs from origin vs restored so would like to explore new option here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mind adding these to JobDataWrapper as well - it could bring problems on backward compatibility though. It sounds me as tradeoff between "possibly inaccurate" vs "no backward compatibility".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding completedIndices
and completedStages
to JobDataWrapper
would be OK since JobDataWrapper
is not under api.v1
package ?
And, I think correctness is aways the first thing we need to take care of.
@@ -370,6 +374,8 @@ private class LiveStage extends LiveEntity { | |||
var completedTasks = 0 | |||
var failedTasks = 0 | |||
val completedIndices = new OpenHashSet[Int]() | |||
// will only be set when recover LiveStage is needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here: Could we add completedIndices
into new KVStore entity class, and restore LiveStage from both StageDataWrapper and new class? Adding it to the StageDataWrapper
is also fine for me.
info.attemptId, | ||
info.name, | ||
info.numTasks, | ||
Nil, // rddInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like rddInfo
must be available as toApi()
requires it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. It seems that we should at least recover all rddInfo
's id.
info.name, | ||
info.numTasks, | ||
Nil, // rddInfo | ||
Nil, // parentIds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parentIds
, taskMetrics
, taskLocalityPreference
, shuffleDepId
are not assigned here as well. May want to leave comments to ensure we're OK with this.
liveStage.activeTasks = info.numActiveTasks | ||
liveStage.completedTasks = info.numCompleteTasks | ||
liveStage.failedTasks = info.numFailedTasks | ||
liveStage.completedIndicesNum = info.numCompletedIndices |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here completedIndices
is not assigned as I commented in other place.
liveStage.killedSummary = info.killedTasksSummary | ||
liveStage.firstLaunchTime = firstLaunchTime | ||
liveStage.localitySummary = locality | ||
liveStage.metrics = metrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to leave a comment that executorSummaries
, activeTasksPerExecutor
, blackListedExecutors
, and savedTasks
are computed later from recoverLiveEntities
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!
Hi @HeartSaVioR , thanks for the review and just want to let you know that I've gone through part of them. And I think I could find the time to give responses and updates according to your comments today. |
Still handled partial comments yet, will continue @HeartSaVioR |
val offHeapMemoryRemaining: Option[Long]) { | ||
|
||
def toLiveRDDDistribution(executors: scala.collection.Map[String, LiveExecutor]) | ||
: LiveRDDDistribution = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these added methods should probably be private[spark]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary ? Seems we've already has private[spark]
scope for RDDDataDistribution
and other objects which has toLiveXXX methods in storeTypes
. So, those objects can't be constructed out of spark and these toLiveXXX methods won't be accessible, either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The classes themselves are public. The private[spark]
is just applied to the constructor (that's why its in an unusual position, that's the scala way of scoping the constructor).
Sorry I am getting caught up on a lot of stuff here -- how is this related to #25577 ? They seem to be two different approaches to the same problem (though #25577 does a little bit more than just what is here). Just for storing live entities, this approach seems more promising to me -- using Java serialization is really bad for compatibility, any changes to classes will break deserialization. If we want the snapshot / checkpoint files to be able to entirely replace the event log files, we'll need the compatibility. |
This PR just extracts recovering live entities functionality from #25577.
Hmm...This PR doesn't store live entities, instead, it recovers (or say, restore) live entities from a KVStore. For snapshot / checkpoint files, I think @HeartSaVioR has already posted a good solution in #25811, which has good compatibility for serialization/de-serialization. |
I see, this is probably the key part I was missing. So you're saying the KVStore already has enough info, this PR just repopulates the in-memory version of all the LiveEntities? |
Yeah. The snapshot is used to restore a KVStore. And then, we recover live entities for AppStatusListener/SQLAppStatusListener from that restored KVStore. |
Test build #111999 has finished for PR 25943 at commit
|
Test build #112036 has finished for PR 25943 at commit
|
Test build #112180 has finished for PR 25943 at commit
|
@Ngone51 Based on the agreement, I feel we could just try to serialize live entities by itself (although we should have to recover references between them) instead of recreating them from KVStore entities which seem to be too complicated and likely lossy. Could you update your patch to simplify based on this agreement? Actually I have a POC commit to deal with this, and got a feedback, too verbose (like just let jackson works for me). I'd be really appreciate if you could take this up and simplify the code change. No need to retain authorship - it's yours if you are willing to take up. Please also let me know if you are not willing to work on. I'll take up the issue. Apologize of the change of direction. I'm really sorry about this for missing critical one to check and leading wrong direction. Please let me know if you have any question or uncertain. Thanks in advance. |
@HeartSaVioR I've gone through your commit and I see it's a big change and totally a different way to deal with live entities. So I feel that it would be more appropriate for you to take this up, since you're more familiar with your own code. I could help review then. Never mind that paid effort on this PR. At least, we've moved to a more viable way. |
@Ngone51 Thanks for understanding, and thanks again for bearing with me and the situation around stuff. Btw, I'm exploring the idea about avoiding to rely on both KVStore models and live entity models - it's just sketched idea and I have to go through POC to verify whether it works. I'll share if the idea seems to work. Otherwise I'll just proceed adjusted plan - no guarantee of compatibility. |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This PR enables SQLAppStatusListener and AppStatusListener to recover
live entities from KVStore(InMemoryStore or LevelDB). And it mainly adds
a method named
recoverLiveEntities()
, which does the recover work, in(SQL)AppStatusListener. And it adds a series of
toLiveXXX()
methods for KVStoreobject types, in order to convert a KVStore object into a live entity.
Why are the changes needed?
SPARK-28594 and SPARK-28867 are both planed to do incremental replay
in HistoryServer basing on snapshotting mechanism. And when a KVStore
is restored from a snapshot, we need to recover those still running objects
(e.g. jobs, stages, tasks) into live entities. Then, we could continue to replay
with the following events.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added tests in
AppStatusListenerSuite
andSQLAppStatusListenerSuite