Skip to content

Commit

Permalink
SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent P…
Browse files Browse the repository at this point in the history
…luggable

Author: Prashant Sharma <prashant.s@imaginea.com>

Closes #771 from ScrapCodes/deploy-failover-pluggable and squashes the following commits:

29ba440 [Prashant Sharma] fixed a compilation error
fef35ec [Prashant Sharma] Code review
57ee6f0 [Prashant Sharma] SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable.
  • Loading branch information
ScrapCodes authored and aarondav committed Nov 11, 2014
1 parent 6e03de3 commit deefd9d
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer

import akka.actor.ActorRef

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.master

import java.util.Date

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.DriverDescription
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.spark.deploy.master

import java.io._

import akka.serialization.Serialization
import java.nio.ByteBuffer

import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer

import scala.reflect.ClassTag

/**
* Stores data in a single on-disk directory with one file per application and worker.
Expand All @@ -32,65 +34,39 @@ import org.apache.spark.Logging
*/
private[spark] class FileSystemPersistenceEngine(
val dir: String,
val serialization: Serialization)
val serialization: Serializer)
extends PersistenceEngine with Logging {

val serializer = serialization.newInstance()
new File(dir).mkdir()

override def addApplication(app: ApplicationInfo) {
val appFile = new File(dir + File.separator + "app_" + app.id)
serializeIntoFile(appFile, app)
}

override def removeApplication(app: ApplicationInfo) {
new File(dir + File.separator + "app_" + app.id).delete()
}

override def addDriver(driver: DriverInfo) {
val driverFile = new File(dir + File.separator + "driver_" + driver.id)
serializeIntoFile(driverFile, driver)
override def persist(name: String, obj: Object): Unit = {
serializeIntoFile(new File(dir + File.separator + name), obj)
}

override def removeDriver(driver: DriverInfo) {
new File(dir + File.separator + "driver_" + driver.id).delete()
override def unpersist(name: String): Unit = {
new File(dir + File.separator + name).delete()
}

override def addWorker(worker: WorkerInfo) {
val workerFile = new File(dir + File.separator + "worker_" + worker.id)
serializeIntoFile(workerFile, worker)
}

override def removeWorker(worker: WorkerInfo) {
new File(dir + File.separator + "worker_" + worker.id).delete()
}

override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
val driverFiles = sortedFiles.filter(_.getName.startsWith("driver_"))
val drivers = driverFiles.map(deserializeFromFile[DriverInfo])
val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
(apps, drivers, workers)
override def read[T: ClassTag](prefix: String) = {
val files = new File(dir).listFiles().filter(_.getName.startsWith(prefix))
files.map(deserializeFromFile[T])
}

private def serializeIntoFile(file: File, value: AnyRef) {
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }

val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)

val out = new FileOutputStream(file)
val out = serializer.serializeStream(new FileOutputStream(file))
try {
out.write(serialized)
out.writeObject(value)
} finally {
out.close()
}

}

def deserializeFromFile[T](file: File)(implicit m: Manifest[T]): T = {
def deserializeFromFile[T](file: File): T = {
val fileData = new Array[Byte](file.length().asInstanceOf[Int])
val dis = new DataInputStream(new FileInputStream(file))
try {
Expand All @@ -99,8 +75,6 @@ private[spark] class FileSystemPersistenceEngine(
dis.close()
}

val clazz = m.runtimeClass.asInstanceOf[Class[T]]
val serializer = serialization.serializerFor(clazz)
serializer.fromBinary(fileData).asInstanceOf[T]
serializer.deserializeStream(dis).readObject()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,27 @@

package org.apache.spark.deploy.master

import akka.actor.{Actor, ActorRef}

import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
import org.apache.spark.annotation.DeveloperApi

/**
* A LeaderElectionAgent keeps track of whether the current Master is the leader, meaning it
* is the only Master serving requests.
* In addition to the API provided, the LeaderElectionAgent will use of the following messages
* to inform the Master of leader changes:
* [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]]
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
* :: DeveloperApi ::
*
* A LeaderElectionAgent tracks current master and is a common interface for all election Agents.
*/
private[spark] trait LeaderElectionAgent extends Actor {
// TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
val masterActor: ActorRef
@DeveloperApi
trait LeaderElectionAgent {
val masterActor: LeaderElectable
def stop() {} // to avoid noops in implementations.
}

/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent {
override def preStart() {
masterActor ! ElectedLeader
}
@DeveloperApi
trait LeaderElectable {
def electedLeader()
def revokedLeadership()
}

override def receive = {
case _ =>
}
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
private[spark] class MonarchyLeaderAgent(val masterActor: LeaderElectable)
extends LeaderElectionAgent {
masterActor.electedLeader()
}
40 changes: 24 additions & 16 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] class Master(
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager)
extends Actor with ActorLogReceive with Logging {
extends Actor with ActorLogReceive with Logging with LeaderElectable {

import context.dispatcher // to use Akka's scheduler.schedule()

Expand All @@ -61,7 +61,6 @@ private[spark] class Master(
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "")
val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")

val workers = new HashSet[WorkerInfo]
Expand Down Expand Up @@ -103,7 +102,7 @@ private[spark] class Master(

var persistenceEngine: PersistenceEngine = _

var leaderElectionAgent: ActorRef = _
var leaderElectionAgent: LeaderElectionAgent = _

private var recoveryCompletionTask: Cancellable = _

Expand All @@ -130,23 +129,24 @@ private[spark] class Master(
masterMetricsSystem.start()
applicationMetricsSystem.start()

persistenceEngine = RECOVERY_MODE match {
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
val zkFactory = new ZooKeeperRecoveryModeFactory(conf)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
val fsFactory = new FileSystemRecoveryModeFactory(conf)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(conf.getClass)
.newInstance(conf).asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
new BlackHolePersistenceEngine()
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}

leaderElectionAgent = RECOVERY_MODE match {
case "ZOOKEEPER" =>
context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
case _ =>
context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}

override def preRestart(reason: Throwable, message: Option[Any]) {
Expand All @@ -165,7 +165,15 @@ private[spark] class Master(
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
context.stop(leaderElectionAgent)
leaderElectionAgent.stop()
}

override def electedLeader() {
self ! ElectedLeader
}

override def revokedLeadership() {
self ! RevokedLeadership
}

override def receiveWithLogging = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.deploy.master

import org.apache.spark.annotation.DeveloperApi

import scala.reflect.ClassTag

/**
* Allows Master to persist any state that is necessary in order to recover from a failure.
* The following semantics are required:
Expand All @@ -25,36 +29,70 @@ package org.apache.spark.deploy.master
* Given these two requirements, we will have all apps and workers persisted, but
* we might not have yet deleted apps or workers that finished (so their liveness must be verified
* during recovery).
*
* The implementation of this trait defines how name-object pairs are stored or retrieved.
*/
private[spark] trait PersistenceEngine {
def addApplication(app: ApplicationInfo)
@DeveloperApi
trait PersistenceEngine {

def removeApplication(app: ApplicationInfo)
/**
* Defines how the object is serialized and persisted. Implementation will
* depend on the store used.
*/
def persist(name: String, obj: Object)

def addWorker(worker: WorkerInfo)
/**
* Defines how the object referred by its name is removed from the store.
*/
def unpersist(name: String)

def removeWorker(worker: WorkerInfo)
/**
* Gives all objects, matching a prefix. This defines how objects are
* read/deserialized back.
*/
def read[T: ClassTag](prefix: String): Seq[T]

def addDriver(driver: DriverInfo)
final def addApplication(app: ApplicationInfo): Unit = {
persist("app_" + app.id, app)
}

def removeDriver(driver: DriverInfo)
final def removeApplication(app: ApplicationInfo): Unit = {
unpersist("app_" + app.id)
}

final def addWorker(worker: WorkerInfo): Unit = {
persist("worker_" + worker.id, worker)
}

final def removeWorker(worker: WorkerInfo): Unit = {
unpersist("worker_" + worker.id)
}

final def addDriver(driver: DriverInfo): Unit = {
persist("driver_" + driver.id, driver)
}

final def removeDriver(driver: DriverInfo): Unit = {
unpersist("driver_" + driver.id)
}

/**
* Returns the persisted data sorted by their respective ids (which implies that they're
* sorted by time of creation).
*/
def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo])
final def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
}

def close() {}
}

private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
override def addApplication(app: ApplicationInfo) {}
override def removeApplication(app: ApplicationInfo) {}
override def addWorker(worker: WorkerInfo) {}
override def removeWorker(worker: WorkerInfo) {}
override def addDriver(driver: DriverInfo) {}
override def removeDriver(driver: DriverInfo) {}

override def readPersistedData() = (Nil, Nil, Nil)

override def persist(name: String, obj: Object): Unit = {}

override def unpersist(name: String): Unit = {}

override def read[T: ClassTag](name: String): Seq[T] = Nil

}
Loading

0 comments on commit deefd9d

Please sign in to comment.