Skip to content

Commit

Permalink
[SPARK-26420][K8S] Generate more unique IDs when creating k8s resourc…
Browse files Browse the repository at this point in the history
…e names.

Using the current time as an ID is more prone to clashes than people generally
realize, so try to make things a bit more unique without necessarily using a
UUID, which would eat too much space in the names otherwise.

The implemented approach uses some bits from the current time, plus some random
bits, which should be more resistant to clashes.

Closes #23805 from vanzin/SPARK-26420.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
Marcelo Vanzin authored and dongjoon-hyun committed Mar 1, 2019
1 parent 8e5f999 commit 14f714f
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ private[spark] object KubernetesConf {
}

def getResourceNamePrefix(appName: String): String = {
val launchTime = System.currentTimeMillis()
s"$appName-$launchTime"
val id = KubernetesUtils.uniqueID()
s"$appName-$id"
.trim
.toLowerCase(Locale.ROOT)
.replaceAll("\\s+", "-")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@
package org.apache.spark.deploy.k8s

import java.io.File
import java.security.SecureRandom

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.commons.codec.binary.Hex

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, SystemClock, Utils}

private[spark] object KubernetesUtils extends Logging {

private val systemClock = new SystemClock()
private lazy val RNG = new SecureRandom()

/**
* Extract and parse Spark configuration properties with a given name prefix and
* return the result as a Map. Keys must not have more than one value.
Expand Down Expand Up @@ -185,4 +190,23 @@ private[spark] object KubernetesUtils extends Logging {
def formatTime(time: String): String = {
if (time != null) time else "N/A"
}

/**
* Generates a unique ID to be used as part of identifiers. The returned ID is a hex string
* of a 64-bit value containing the 40 LSBs from the current time + 24 random bits from a
* cryptographically strong RNG. (40 bits gives about 30 years worth of "unique" timestamps.)
*
* This avoids using a UUID for uniqueness (too long), and relying solely on the current time
* (not unique enough).
*/
def uniqueID(clock: Clock = systemClock): String = {
val random = new Array[Byte](3)
synchronized {
RNG.nextBytes(random)
}

val time = java.lang.Long.toHexString(clock.getTimeMillis() & 0xFFFFFFFFFFL)
Hex.encodeHexString(random) + time
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}

import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.util.{Clock, SystemClock}

private[spark] class DriverServiceFeatureStep(
kubernetesConf: KubernetesDriverConf,
clock: Clock = new SystemClock)
clock: Clock = new SystemClock())
extends KubernetesFeatureConfigStep with Logging {
import DriverServiceFeatureStep._

Expand All @@ -42,7 +42,7 @@ private[spark] class DriverServiceFeatureStep(
private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
preferredServiceName
} else {
val randomServiceId = clock.getTimeMillis()
val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import com.google.common.net.InternetDomainName
import io.fabric8.kubernetes.api.model.Service

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand Down Expand Up @@ -71,7 +72,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
val expectedServiceName = kconf.resourceNamePrefix + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
val expectedHostName = s"$expectedServiceName.my-namespace.svc"
val additionalProps = configurationStep.getAdditionalPodSystemProperties()
verifySparkConfHostNames(additionalProps, expectedHostName)
assert(additionalProps(DRIVER_HOST_ADDRESS.key) === expectedHostName)
}

test("Ports should resolve to defaults in SparkConf and in the service.") {
Expand All @@ -91,26 +92,37 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === DEFAULT_BLOCKMANAGER_PORT.toString)
}

test("Long prefixes should switch to using a generated name.") {
val clock = new ManualClock()
clock.setTime(10000)
test("Long prefixes should switch to using a generated unique name.") {
val sparkConf = new SparkConf(false)
.set(KUBERNETES_NAMESPACE, "my-namespace")
val configurationStep = new DriverServiceFeatureStep(
KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
labels = DRIVER_LABELS),
clock)
val driverService = configurationStep
.getAdditionalKubernetesResources()
.head
.asInstanceOf[Service]
val expectedServiceName = s"spark-10000${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}"
assert(driverService.getMetadata.getName === expectedServiceName)
val expectedHostName = s"$expectedServiceName.my-namespace.svc"
val additionalProps = configurationStep.getAdditionalPodSystemProperties()
verifySparkConfHostNames(additionalProps, expectedHostName)
val kconf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
labels = DRIVER_LABELS)
val clock = new ManualClock()

// Ensure that multiple services created at the same time generate unique names.
val services = (1 to 10).map { _ =>
val configurationStep = new DriverServiceFeatureStep(kconf, clock = clock)
val serviceName = configurationStep
.getAdditionalKubernetesResources()
.head
.asInstanceOf[Service]
.getMetadata
.getName

val hostAddress = configurationStep
.getAdditionalPodSystemProperties()(DRIVER_HOST_ADDRESS.key)

(serviceName -> hostAddress)
}.toMap

assert(services.size === 10)
services.foreach { case (name, address) =>
assert(!name.startsWith(kconf.resourceNamePrefix))
assert(!address.startsWith(kconf.resourceNamePrefix))
assert(InternetDomainName.isValid(address))
}
}

test("Disallow bind address and driver host to be set explicitly.") {
Expand Down Expand Up @@ -156,10 +168,4 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
assert(driverServicePorts(1).getPort.intValue() === blockManagerPort)
assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort)
}

private def verifySparkConfHostNames(
driverSparkConf: Map[String, String], expectedHostName: String): Unit = {
assert(driverSparkConf(
org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key) === expectedHostName)
}
}

0 comments on commit 14f714f

Please sign in to comment.