Skip to content

Commit

Permalink
SPARK-3883: SSL support for HttpServer and Akka
Browse files Browse the repository at this point in the history
- Introduced SSLOptions object
- SSLOptions is created by SecurityManager
- SSLOptions configures Akka and Jetty to use SSL
- Provided utility methods to determine the proper Akka protocol for Akka requests and to configure SSL socket factory for URL connections
- Added tests cases for AkkaUtils, FileServer, SSLOptions and SecurityManager
- Added a way to use node local SSL configuration by executors and driver
- Make CoarseGrainedExecutorBackend not overwrite the settings which are executor startup configuration - they are passed anyway from Worker
  • Loading branch information
jacek-lewandowski committed Feb 2, 2015
1 parent 63dfe21 commit 93050f4
Show file tree
Hide file tree
Showing 35 changed files with 852 additions and 76 deletions.
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.File

import org.eclipse.jetty.server.ssl.SslSocketConnector
import org.eclipse.jetty.util.security.{Constraint, Password}
import org.eclipse.jetty.security.authentication.DigestAuthenticator
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
Expand Down Expand Up @@ -72,7 +73,10 @@ private[spark] class HttpServer(
*/
private def doStart(startPort: Int): (Server, Int) = {
val server = new Server()
val connector = new SocketConnector

val connector = securityManager.sslOptions.createJettySslContextFactory()
.map(new SslSocketConnector(_)).getOrElse(new SocketConnector)

connector.setMaxIdleTime(60 * 1000)
connector.setSoLingerTime(-1)
connector.setPort(startPort)
Expand Down Expand Up @@ -149,13 +153,14 @@ private[spark] class HttpServer(
}

/**
* Get the URI of this HTTP server (http://host:port)
* Get the URI of this HTTP server (http://host:port or https://host:port)
*/
def uri: String = {
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
"http://" + Utils.localIpAddress + ":" + port
val scheme = if (securityManager.sslOptions.enabled) "https" else "http"
s"$scheme://${Utils.localIpAddress}:$port"
}
}
}
124 changes: 124 additions & 0 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import java.io.File

import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.eclipse.jetty.util.ssl.SslContextFactory

private[spark] case class SSLOptions(
enabled: Boolean = false,
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
keyPassword: Option[String] = None,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty) {

/**
* Creates a Jetty SSL context factory according to the SSL settings represented by this object.
*/
def createJettySslContextFactory(): Option[SslContextFactory] = {
if (enabled) {
val sslContextFactory = new SslContextFactory()

keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
protocol.foreach(sslContextFactory.setProtocol)
sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)

Some(sslContextFactory)
} else {
None
}
}

/**
* Creates an Akka configuration object which contains all the SSL settings represented by this
* object. It can be used then to compose the ultimate Akka configuration.
*/
def createAkkaConfig: Option[Config] = {
import scala.collection.JavaConversions._
if (enabled) {
Some(ConfigFactory.empty()
.withValue("akka.remote.netty.tcp.security.key-store",
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
.withValue("akka.remote.netty.tcp.security.key-store-password",
ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.trust-store",
ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
.withValue("akka.remote.netty.tcp.security.trust-store-password",
ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.key-password",
ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.random-number-generator",
ConfigValueFactory.fromAnyRef(""))
.withValue("akka.remote.netty.tcp.security.protocol",
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
.withValue("akka.remote.netty.tcp.enable-ssl",
ConfigValueFactory.fromAnyRef(true)))
} else {
None
}
}

override def toString: String = s"SSLOptions{enabled=$enabled, " +
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"

}

private[spark] object SSLOptions extends Logging {

/**
* Resolves SSLOptions settings from a given Spark configuration object at a given namespace.
* The parent directory of that location is used as a base directory to resolve relative paths
* to keystore and truststore.
*/
def parse(conf: SparkConf, ns: String): SSLOptions = {
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = false)
val keyStore = conf.getOption(s"$ns.keyStore").map(new File(_))
val keyStorePassword = conf.getOption(s"$ns.keyStorePassword")
val keyPassword = conf.getOption(s"$ns.keyPassword")
val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
val protocol = conf.getOption(s"$ns.protocol")
val enabledAlgorithms = conf.get(s"$ns.enabledAlgorithms", defaultValue = "")
.split(",").map(_.trim).filter(_.nonEmpty).toSet

new SSLOptions(
enabled,
keyStore,
keyStorePassword,
keyPassword,
trustStore,
trustStorePassword,
protocol,
enabledAlgorithms)
}

}

50 changes: 49 additions & 1 deletion core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.spark

import java.net.{Authenticator, PasswordAuthentication}
import java.security.KeyStore
import java.security.cert.X509Certificate
import javax.net.ssl._

import com.google.common.io.Files
import org.apache.hadoop.io.Text

import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -144,7 +148,8 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* can take place.
*/

private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
private[spark] class SecurityManager(sparkConf: SparkConf)
extends Logging with SecretKeyHolder {

// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"
Expand Down Expand Up @@ -196,6 +201,49 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with
)
}

val sslOptions = SSLOptions.parse(sparkConf, "spark.ssl")
logDebug(s"SSLConfiguration: $sslOptions")

val (sslSocketFactory, hostnameVerifier) = if (sslOptions.enabled) {
val trustStoreManagers =
for (trustStore <- sslOptions.trustStore) yield {
val input = Files.asByteSource(sslOptions.trustStore.get).openStream()

try {
val ks = KeyStore.getInstance(KeyStore.getDefaultType)
ks.load(input, sslOptions.trustStorePassword.get.toCharArray)

val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(ks)
tmf.getTrustManagers
} finally {
input.close()
}
}

lazy val credulousTrustStoreManagers = Array({
logWarning("Using 'accept-all' trust manager for SSL connections.")
new X509TrustManager {
override def getAcceptedIssuers: Array[X509Certificate] = null

override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {}

override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {}
}: TrustManager
})

val sslContext = SSLContext.getInstance(sslOptions.protocol.getOrElse("Default"))
sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null)

val hostVerifier = new HostnameVerifier {
override def verify(s: String, sslSession: SSLSession): Boolean = true
}

(Some(sslContext.getSocketFactory), Some(hostVerifier))
} else {
(None, None)
}

/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ private[spark] object SparkConf {
isAkkaConf(name) ||
name.startsWith("spark.akka") ||
name.startsWith("spark.auth") ||
name.startsWith("spark.ssl") ||
isSparkPortConf(name)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ private[broadcast] object HttpBroadcast extends Logging {
uc = new URL(url).openConnection()
uc.setConnectTimeout(httpReadTimeout)
}
Utils.setupSecureURLConnection(uc, securityManager)

val in = {
uc.setReadTimeout(httpReadTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,14 @@ private[spark] class ApplicationDescription(

val user = System.getProperty("user.name", "<unknown>")

def copy(
name: String = name,
maxCores: Option[Int] = maxCores,
memoryPerSlave: Int = memoryPerSlave,
command: Command = command,
appUiUrl: String = appUiUrl,
eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)

override def toString: String = "ApplicationDescription(" + name + ")"
}
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
val timeout = AkkaUtils.askTimeout(conf)

override def preStart() = {
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master, conf))

context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

Expand Down Expand Up @@ -161,7 +161,7 @@ object Client {
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
Master.toAkkaUrl(driverArgs.master)
Master.toAkkaUrl(driverArgs.master, conf)
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))

actorSystem.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,13 @@ private[spark] class DriverDescription(
val command: Command)
extends Serializable {

def copy(
jarUrl: String = jarUrl,
mem: Int = mem,
cores: Int = cores,
supervise: Boolean = supervise,
command: Command = command): DriverDescription =
new DriverDescription(jarUrl, mem, cores, supervise, command)

override def toString: String = s"DriverDescription (${command.mainClass})"
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[spark] class AppClient(
conf: SparkConf)
extends Logging {

val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl)
val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, conf))

val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
Expand Down Expand Up @@ -107,8 +107,8 @@ private[spark] class AppClient(
def changeMaster(url: String) {
// activeMasterUrl is a valid Spark url since we receive it from master.
activeMasterUrl = url
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = Master.toAkkaAddress(activeMasterUrl)
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl, conf))
masterAddress = Master.toAkkaAddress(activeMasterUrl, conf)
}

private def isPossibleMaster(remoteUrl: Address) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,19 +860,19 @@ private[spark] object Master extends Logging {
*
* @throws SparkException if the url is invalid
*/
def toAkkaUrl(sparkUrl: String): String = {
def toAkkaUrl(sparkUrl: String, conf: SparkConf): String = {
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
AkkaUtils.address(systemName, host, port, actorName, conf)
}

/**
* Returns an akka `Address` for the Master actor given a sparkUrl `spark://host:port`.
*
* @throws SparkException if the url is invalid
*/
def toAkkaAddress(sparkUrl: String): Address = {
def toAkkaAddress(sparkUrl: String, conf: SparkConf): Address = {
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
Address("akka.tcp", systemName, host, port)
Address(AkkaUtils.protocol(conf), systemName, host, port)
}

def startSystemAndActor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files

import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.util.logging.FileAppender

Expand Down
Loading

0 comments on commit 93050f4

Please sign in to comment.