Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
Use CountDownLatch instead of SettableFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
ash211 committed Jan 31, 2017
1 parent 2cbe11f commit a69b865
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes

import java.io.{File, FileInputStream}
import java.security.{KeyStore, SecureRandom}
import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}

Expand Down Expand Up @@ -120,8 +120,8 @@ private[spark] class Client(
val containerPorts = buildContainerPorts()

// start outer watch for status logging of driver pod
val driverPodCompletedFuture = SettableFuture.create[Boolean]
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedFuture, kubernetesAppId,
val driverPodCompletedLatch = new CountDownLatch(1)
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
sparkConf.get(REPORT_INTERVAL))
Utils.tryWithResource(kubernetesClient
.pods()
Expand Down Expand Up @@ -201,7 +201,7 @@ private[spark] class Client(
// wait if configured to do so
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
driverPodCompletedFuture.get()
driverPodCompletedLatch.await()
logInfo(s"Application $kubernetesAppId finished.")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,25 @@
*/
package org.apache.spark.deploy.kubernetes

import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}

import scala.collection.JavaConverters._

import com.google.common.util.concurrent.AbstractScheduledService.Scheduler
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model.{Pod, PodStatus}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging

/**
* A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on
* every state change and also at an interval for liveness.
*
* @param podCompletedFuture a SettableFuture that is set to true when the watched pod finishes
* @param podCompletedFuture a CountDownLatch that is set to true when the watched pod finishes
* @param appId
* @param interval ms between each state request
*/
private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: SettableFuture[Boolean],
private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch,
appId: String,
interval: Long)
extends Watcher[Pod] with Logging {
Expand All @@ -63,7 +60,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: SettableFu
prevPhase = phase

if (phase == "Succeeded" || phase == "Failed") {
podCompletedFuture.set(true)
podCompletedFuture.countDown()
}
}

Expand Down

0 comments on commit a69b865

Please sign in to comment.