-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable #771
Conversation
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14979/ |
Jenkins, test this please. |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
@aarondav Mind taking a look ? |
@ScrapCodes, I definitely like the idea. I think we need to update the API a little before it's ready to be publicly accessible, though. First, LeaderElectionAgent and PersistenceEngine should be Second, LeaderElectionAgent should be refactored to not be an Actor (there is a TODO since the Curator refactor, I think it just needs to be done before we can finalize this API). This should be pretty straightforward to do, I would just add a trait like trait LeaderElectable {
def electedLeader()
def revokedLeadership()
} which Master implements by simply calling |
@@ -143,6 +146,9 @@ private[spark] class Master( | |||
leaderElectionAgent = RECOVERY_MODE match { | |||
case "ZOOKEEPER" => | |||
context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf)) | |||
case "CUSTOM" => | |||
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.leaderAgentActor")) | |||
context.actorOf(Props(clazz, self, masterUrl, conf)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be a good idea to abstract out the creation of the persistenceEngine and leaderElectionAgent into something like
abstract class StandaloneRecoveryModeFactory(conf: SparkConf) {
def createPersistenceEngine(): PersistenceEngine
def createLeaderElectionAgent(master: LeaderElectable, masterUrl: String): LeaderElectionAgent
}
with 2 subclasses, FileSystemRecoveryModeFactory
and ZooKeeperRecoveryModeFactory
(maybe the name "RecoveryModeFactory" is not so good).
The advantage of this approach is that we formalize the arguments needed for constructing the 2 APIs, and centralize the creation of everything necessary for one recovery mode. Thoughts?
Merged build triggered. |
@aarondav I am not 100% sure about the idea but for a moment I felt may be we could club |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15479/ |
That does sound reasonable, especially since the LeaderElectionAgent relies on the behavior of the PersistenceEngine in order to actually enable recovery. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15496/ |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
Hey @aarondav, Do you think its worth having in its current condition ? I can rebase it ofcourse. I was actually unsure of changing it further. |
@aarondav could you give feedback one way or the other on this? |
val out = new FileOutputStream(file) | ||
out.write(serialized) | ||
out.write(serialized.array()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could use serializeStream() instead to avoid relying on a ByteBuffer's array()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not understand, what you meant here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just suggesting that we could do
val out = serializer.serializeStream(new FileOutputStream(file))
out.writeObject(value)
out.close()
It's a very minor point about ByteBuffers not always having a valid array().
@DeveloperApi | ||
trait PersistenceEngine { | ||
|
||
def persist(name: String, obj: Object) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add documentation to these three methods to make the requirements clear.
…luggable. Refactored Leader Election agent and added a RecoveryModeFactory. Implemented new proposal with some convenient modifications. Added a read method.
d2a1c62
to
fef35ec
Compare
QA tests have started for PR 771 at commit
|
QA tests have finished for PR 771 at commit
|
Test FAILed. |
QA tests have started for PR 771 at commit
|
QA tests have finished for PR 771 at commit
|
Test FAILed. |
Jenkins, retest this please. |
Test build #22127 has started for PR 771 at commit
|
Test build #22127 has finished for PR 771 at commit
|
Test PASSed. |
Did I miss something ? |
LGTM, merging into master. Thanks! |
…luggable Author: Prashant Sharma <prashant.s@imaginea.com> Closes apache#771 from ScrapCodes/deploy-failover-pluggable and squashes the following commits: 29ba440 [Prashant Sharma] fixed a compilation error fef35ec [Prashant Sharma] Code review 57ee6f0 [Prashant Sharma] SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable.
…luggable Author: Prashant Sharma <prashant.s@imaginea.com> Closes apache#771 from ScrapCodes/deploy-failover-pluggable and squashes the following commits: 29ba440 [Prashant Sharma] fixed a compilation error fef35ec [Prashant Sharma] Code review 57ee6f0 [Prashant Sharma] SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable.
…luggable Author: Prashant Sharma <prashant.s@imaginea.com> Closes apache#771 from ScrapCodes/deploy-failover-pluggable and squashes the following commits: 29ba440 [Prashant Sharma] fixed a compilation error fef35ec [Prashant Sharma] Code review 57ee6f0 [Prashant Sharma] SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable.
No description provided.