Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable #771

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer

import akka.actor.ActorRef

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.ApplicationDescription

private[spark] class ApplicationInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.master

import java.util.Date

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.DriverDescription

private[spark] class DriverInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.spark.deploy.master

import java.io._

import akka.serialization.Serialization
import java.nio.ByteBuffer

import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer

import scala.reflect.ClassTag

/**
* Stores data in a single on-disk directory with one file per application and worker.
Expand All @@ -32,65 +34,39 @@ import org.apache.spark.Logging
*/
private[spark] class FileSystemPersistenceEngine(
val dir: String,
val serialization: Serialization)
val serialization: Serializer)
extends PersistenceEngine with Logging {

val serializer = serialization.newInstance()
new File(dir).mkdir()

override def addApplication(app: ApplicationInfo) {
val appFile = new File(dir + File.separator + "app_" + app.id)
serializeIntoFile(appFile, app)
}

override def removeApplication(app: ApplicationInfo) {
new File(dir + File.separator + "app_" + app.id).delete()
}

override def addDriver(driver: DriverInfo) {
val driverFile = new File(dir + File.separator + "driver_" + driver.id)
serializeIntoFile(driverFile, driver)
override def persist(name: String, obj: Object): Unit = {
serializeIntoFile(new File(dir + File.separator + name), obj)
}

override def removeDriver(driver: DriverInfo) {
new File(dir + File.separator + "driver_" + driver.id).delete()
override def unpersist(name: String): Unit = {
new File(dir + File.separator + name).delete()
}

override def addWorker(worker: WorkerInfo) {
val workerFile = new File(dir + File.separator + "worker_" + worker.id)
serializeIntoFile(workerFile, worker)
}

override def removeWorker(worker: WorkerInfo) {
new File(dir + File.separator + "worker_" + worker.id).delete()
}

override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
val driverFiles = sortedFiles.filter(_.getName.startsWith("driver_"))
val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
(apps, drivers, workers)
override def read[T: ClassTag](prefix: String) = {
val files = new File(dir).listFiles().filter(_.getName.startsWith(prefix))
files.map(deserializeFromFile[T])
}

private def serializeIntoFile(file: File, value: AnyRef) {
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }

val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)

val out = new FileOutputStream(file)
val out = serializer.serializeStream(new FileOutputStream(file))
try {
out.write(serialized)
out.writeObject(value)
} finally {
out.close()
}

}

def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
def deserializeFromFile[T](file: File): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
try {
Expand All @@ -99,8 +75,6 @@ private[spark] class FileSystemPersistenceEngine(
dis.close()
}

val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
serializer.deserializeStream(dis).readObject()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,27 @@

package org.apache.spark.deploy.master

import akka.actor.{Actor, ActorRef}

import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
import org.apache.spark.annotation.DeveloperApi

/**
* A LeaderElectionAgent keeps track of whether the current Master is the leader, meaning it
* is the only Master serving requests.
* In addition to the API provided, the LeaderElectionAgent will use of the following messages
* to inform the Master of leader changes:
* [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]]
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
* :: DeveloperApi ::
*
* A LeaderElectionAgent tracks current master and is a common interface for all election Agents.
*/
private[spark] trait LeaderElectionAgent extends Actor {
// TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
val masterActor: ActorRef
@DeveloperApi
trait LeaderElectionAgent {
val masterActor: LeaderElectable
def stop() {} // to avoid noops in implementations.
}

/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent {
override def preStart() {
masterActor ! ElectedLeader
}
@DeveloperApi
trait LeaderElectable {
def electedLeader()
def revokedLeadership()
}

override def receive = {
case _ =>
}
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
private[spark] class MonarchyLeaderAgent(val masterActor: LeaderElectable)
extends LeaderElectionAgent {
masterActor.electedLeader()
}
40 changes: 24 additions & 16 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] class Master(
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager)
extends Actor with ActorLogReceive with Logging {
extends Actor with ActorLogReceive with Logging with LeaderElectable {

import context.dispatcher // to use Akka's scheduler.schedule()

Expand All @@ -61,7 +61,6 @@ private[spark] class Master(
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")

val workers = new HashSet[WorkerInfo]
Expand Down Expand Up @@ -103,7 +102,7 @@ private[spark] class Master(

var persistenceEngine: PersistenceEngine = _

var leaderElectionAgent: ActorRef = _
var leaderElectionAgent: LeaderElectionAgent = _

private var recoveryCompletionTask: Cancellable = _

Expand All @@ -130,23 +129,24 @@ private[spark] class Master(
masterMetricsSystem.start()
applicationMetricsSystem.start()

persistenceEngine = RECOVERY_MODE match {
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
val fsFactory = new FileSystemRecoveryModeFactory(conf)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(conf.getClass)
.newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
new BlackHolePersistenceEngine()
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}

leaderElectionAgent = RECOVERY_MODE match {
case "ZOOKEEPER" =>
context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
case _ =>
context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}

override def preRestart(reason: Throwable, message: Option[Any]) {
Expand All @@ -165,7 +165,15 @@ private[spark] class Master(
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
context.stop(leaderElectionAgent)
leaderElectionAgent.stop()
}

override def electedLeader() {
self ! ElectedLeader
}

override def revokedLeadership() {
self ! RevokedLeadership
}

override def receiveWithLogging = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.deploy.master

import org.apache.spark.annotation.DeveloperApi

import scala.reflect.ClassTag

/**
* Allows Master to persist any state that is necessary in order to recover from a failure.
* The following semantics are required:
Expand All @@ -25,36 +29,70 @@ package org.apache.spark.deploy.master
* Given these two requirements, we will have all apps and workers persisted, but
* we might not have yet deleted apps or workers that finished (so their liveness must be verified
* during recovery).
*
* The implementation of this trait defines how name-object pairs are stored or retrieved.
*/
private[spark] trait PersistenceEngine {
def addApplication(app: ApplicationInfo)
@DeveloperApi
trait PersistenceEngine {

def removeApplication(app: ApplicationInfo)
/**
* Defines how the object is serialized and persisted. Implementation will
* depend on the store used.
*/
def persist(name: String, obj: Object)

def addWorker(worker: WorkerInfo)
/**
* Defines how the object referred by its name is removed from the store.
*/
def unpersist(name: String)

def removeWorker(worker: WorkerInfo)
/**
* Gives all objects, matching a prefix. This defines how objects are
* read/deserialized back.
*/
def read[T: ClassTag](prefix: String): Seq[T]

def addDriver(driver: DriverInfo)
final def addApplication(app: ApplicationInfo): Unit = {
persist("app_" + app.id, app)
}

def removeDriver(driver: DriverInfo)
final def removeApplication(app: ApplicationInfo): Unit = {
unpersist("app_" + app.id)
}

final def addWorker(worker: WorkerInfo): Unit = {
persist("worker_" + worker.id, worker)
}

final def removeWorker(worker: WorkerInfo): Unit = {
unpersist("worker_" + worker.id)
}

final def addDriver(driver: DriverInfo): Unit = {
persist("driver_" + driver.id, driver)
}

final def removeDriver(driver: DriverInfo): Unit = {
unpersist("driver_" + driver.id)
}

/**
* Returns the persisted data sorted by their respective ids (which implies that they're
* sorted by time of creation).
*/
def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo])
final def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
}

def close() {}
}

private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
override def addApplication(app: ApplicationInfo) {}
override def removeApplication(app: ApplicationInfo) {}
override def addWorker(worker: WorkerInfo) {}
override def removeWorker(worker: WorkerInfo) {}
override def addDriver(driver: DriverInfo) {}
override def removeDriver(driver: DriverInfo) {}

override def readPersistedData() = (Nil, Nil, Nil)

override def persist(name: String, obj: Object): Unit = {}

override def unpersist(name: String): Unit = {}

override def read[T: ClassTag](name: String): Seq[T] = Nil

}
Loading