Skip to content

Commit

Permalink
Merge pull request #1576 from apache/master
Browse files Browse the repository at this point in the history
Create a new pull request by comparing changes across two branches
  • Loading branch information
GulajavaMinistudio authored Oct 25, 2023
2 parents 0967f4a + 7db9b22 commit 14e9435
Show file tree
Hide file tree
Showing 82 changed files with 2,037 additions and 282 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ jobs:
# Note that the modules below are from sparktestsupport/modules.py.
modules:
- >-
core, unsafe, kvstore, avro,
core, unsafe, kvstore, avro, utils,
network-common, network-shuffle, repl, launcher,
examples, sketch, graphx
- >-
catalyst, hive-thriftserver
api, catalyst, hive-thriftserver
- >-
mllib-local,mllib
- >-
Expand Down
14 changes: 9 additions & 5 deletions .github/workflows/publish_snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ name: Publish Snapshot
on:
schedule:
- cron: '0 0 * * *'
workflow_dispatch:
inputs:
branch:
description: 'list of branches to publish (JSON)'
required: true
# keep in sync with default value of strategy matrix 'branch'
default: '["master", "branch-3.5", "branch-3.4", "branch-3.3"]'

jobs:
publish-snapshot:
Expand All @@ -30,11 +37,8 @@ jobs:
strategy:
fail-fast: false
matrix:
branch:
- master
- branch-3.5
- branch-3.4
- branch-3.3
# keep in sync with default value of workflow_dispatch input 'branch'
branch: ${{ fromJSON( inputs.branch || '["master", "branch-3.5", "branch-3.4", "branch-3.3"]' ) }}
steps:
- name: Checkout Spark repository
uses: actions/checkout@v3
Expand Down
11 changes: 6 additions & 5 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2745,6 +2745,12 @@
],
"sqlState" : "42K0G"
},
"PYTHON_DATA_SOURCE_FAILED_TO_PLAN_IN_PYTHON" : {
"message" : [
"Failed to plan Python data source <type> in Python: <msg>"
],
"sqlState" : "38000"
},
"RECURSIVE_PROTOBUF_SCHEMA" : {
"message" : [
"Found recursive reference in Protobuf schema, which can not be processed by Spark by default: <fieldDescriptor>. try setting the option `recursive.fields.max.depth` 0 to 10. Going beyond 10 levels of recursion is not allowed."
Expand Down Expand Up @@ -4184,11 +4190,6 @@
"ALTER COLUMN cannot find column <colName> in v1 table. Available: <fieldNames>."
]
},
"_LEGACY_ERROR_TEMP_1055" : {
"message" : [
"The database name is not valid: <quoted>."
]
},
"_LEGACY_ERROR_TEMP_1057" : {
"message" : [
"SHOW COLUMNS with conflicting databases: '<dbA>' != '<dbB>'."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {

conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), "
+ "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, "
+ "dbl DOUBLE)").executeUpdate()
+ "dbl DOUBLE, tiny TINYINT)").executeUpdate()
conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', "
+ "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, "
+ "42.75, 1.0000000000000002)").executeUpdate()
+ "42.75, 1.0000000000000002, -128)").executeUpdate()

conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, "
+ "yr YEAR)").executeUpdate()
Expand Down Expand Up @@ -89,7 +89,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
val rows = df.collect()
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
assert(types.length == 9)
assert(types.length == 10)
assert(types(0).equals("class java.lang.Boolean"))
assert(types(1).equals("class java.lang.Long"))
assert(types(2).equals("class java.lang.Integer"))
Expand All @@ -99,6 +99,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(types(6).equals("class java.math.BigDecimal"))
assert(types(7).equals("class java.lang.Double"))
assert(types(8).equals("class java.lang.Double"))
assert(types(9).equals("class java.lang.Byte"))
assert(rows(0).getBoolean(0) == false)
assert(rows(0).getLong(1) == 0x225)
assert(rows(0).getInt(2) == 17)
Expand All @@ -109,6 +110,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(rows(0).getAs[BigDecimal](6).equals(bd))
assert(rows(0).getDouble(7) == 42.75)
assert(rows(0).getDouble(8) == 1.0000000000000002)
assert(rows(0).getByte(9) == 0x80.toByte)
}

test("Date types") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private[producer] class InternalKafkaProducerPool(
val curTimeNs = clock.nanoTime()
val producers = new mutable.ArrayBuffer[CachedProducerEntry]()
synchronized {
cache.retain { case (_, v) =>
cache.filterInPlace { case (_, v) =>
if (v.expired(curTimeNs)) {
producers += v
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class ProtobufCatalystDataConversionSuite
data != null &&
(data.get(0) == defaultValue ||
(dt.fields(0).dataType == BinaryType &&
data.get(0) != null &&
data.get(0).asInstanceOf[Array[Byte]].isEmpty)))
data = generator().asInstanceOf[Row]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ private[spark] class ExecutorAllocationManager(
private def onSchedulerQueueEmpty(): Unit = synchronized {
logDebug("Clearing timer to add executors because there are no more pending tasks")
addTime = NOT_SET
numExecutorsToAddPerResourceProfileId.transform { case (_, _) => 1 }
numExecutorsToAddPerResourceProfileId.mapValuesInPlace { case (_, _) => 1 }
}

private case class StageAttempt(stageId: Int, stageAttemptId: Int) {
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ private[spark] class SecurityManager(
*/
def isSslRpcEnabled(): Boolean = sslRpcEnabled

/**
* Returns the SSLOptions object for the RPC namespace
* @return the SSLOptions object for the RPC namespace
*/
def getRpcSSLOptions(): SSLOptions = rpcSSLOptions

/**
* Gets the user used for authenticating SASL connections.
* For now use a single hardcoded user.
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,12 @@ object SparkEnv extends Logging {
}

val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
val transConf = SparkTransportConf.fromSparkConf(
conf,
"shuffle",
numUsableCores,
sslOptions = Some(securityManager.getRpcSSLOptions())
)
Some(new ExternalBlockStoreClient(transConf, securityManager,
securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val registeredExecutorsDB = "registeredExecutors"

private val transportConf =
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
SparkTransportConf.fromSparkConf(
sparkConf,
"shuffle",
numUsableCores = 0,
sslOptions = Some(securityManager.getRpcSSLOptions()))
private val blockHandler = newShuffleBlockHandler(transportConf)
private var transportContext: TransportContext = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
private def clearInaccessibleList(expireTimeInSeconds: Long): Unit = {
val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
inaccessibleList.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
inaccessibleList.asScala.filterInPlace((_, creationTime) => creationTime >= expiredThreshold)
}

private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ private[spark] class CoarseGrainedExecutorBackend(

logInfo("Connecting to driver: " + driverUrl)
try {
val shuffleClientTransportConf = SparkTransportConf.fromSparkConf(env.conf, "shuffle")
val securityManager = new SecurityManager(env.conf)
val shuffleClientTransportConf = SparkTransportConf.fromSparkConf(
env.conf, "shuffle", sslOptions = Some(securityManager.getRpcSSLOptions()))
if (NettyUtils.preferDirectBufs(shuffleClientTransportConf) &&
PlatformDependent.maxDirectMemory() < env.conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)) {
throw new SparkException(s"Netty direct memory should at least be bigger than " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ private[spark] class NettyBlockTransferService(
val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager)
var serverBootstrap: Option[TransportServerBootstrap] = None
var clientBootstrap: Option[TransportClientBootstrap] = None
this.transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numCores)
this.transportConf = SparkTransportConf.fromSparkConf(
conf,
"shuffle",
numCores,
sslOptions = Some(securityManager.getRpcSSLOptions()))
if (authEnabled) {
serverBootstrap = Some(new AuthServerBootstrap(transportConf, securityManager))
clientBootstrap = Some(new AuthClientBootstrap(transportConf, conf.getAppId, securityManager))
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ private[netty] class NettyRpcEnv(
conf.clone.set(RPC_IO_NUM_CONNECTIONS_PER_PEER, 1),
"rpc",
conf.get(RPC_IO_THREADS).getOrElse(numUsableCores),
role)
role,
sslOptions = Some(securityManager.getRpcSSLOptions())
)

private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

Expand Down Expand Up @@ -391,7 +393,11 @@ private[netty] class NettyRpcEnv(
}

val ioThreads = clone.getInt("spark.files.io.threads", 1)
val downloadConf = SparkTransportConf.fromSparkConf(clone, module, ioThreads)
val downloadConf = SparkTransportConf.fromSparkConf(
clone,
module,
ioThreads,
sslOptions = Some(securityManager.getRpcSSLOptions()))
val downloadContext = new TransportContext(downloadConf, new NoOpRpcHandler(), true)
fileDownloadFactory = downloadContext.createClientFactory(createClientBootstraps())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.nio.file.Files

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException}
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.io.NioBufferedFileInputStream
Expand Down Expand Up @@ -58,7 +58,11 @@ private[spark] class IndexShuffleBlockResolver(

private lazy val blockManager = Option(_blockManager).getOrElse(SparkEnv.get.blockManager)

private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
private val transportConf = {
val securityManager = new SecurityManager(conf)
SparkTransportConf.fromSparkConf(
conf, "shuffle", sslOptions = Some(securityManager.getRpcSSLOptions()))
}

private val remoteShuffleMaxDisk: Option[Long] =
conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.ExecutorService
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.util.control.NonFatal

import org.apache.spark.{ShuffleDependency, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.{SecurityManager, ShuffleDependency, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.annotation.Since
import org.apache.spark.executor.{CoarseGrainedExecutorBackend, ExecutorBackend}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -108,7 +108,9 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
dep: ShuffleDependency[_, _, _],
mapIndex: Int): Unit = {
val numPartitions = dep.partitioner.numPartitions
val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
val securityManager = new SecurityManager(conf)
val transportConf = SparkTransportConf.fromSparkConf(
conf, "shuffle", sslOptions = Some(securityManager.getRpcSSLOptions()))
this.shuffleId = dep.shuffleId
this.shuffleMergeId = dep.shuffleMergeId
this.mapIndex = mapIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ private[spark] class AppStatusListener(
}

// remove any dead executors that were not running for any currently active stages
deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec))
deadExecutors.filterInPlace((execId, exec) => isExecutorActiveForLiveStages(exec))
}

private def removeExcludedStageFrom(exec: LiveExecutor, stageId: Int, now: Long) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1252,7 +1252,8 @@ private[spark] class BlockManager(
new EncryptedBlockData(file, blockSize, conf, key))

case _ =>
val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
val transportConf = SparkTransportConf.fromSparkConf(
conf, "shuffle", sslOptions = Some(securityManager.getRpcSSLOptions()))
new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
}
Some(managedBuffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We

def render(request: HttpServletRequest): Seq[Node] = {
val appInfo = store.applicationInfo()
val startTime = appInfo.attempts.head.startTime.getTime()
val startDate = appInfo.attempts.head.startTime
val startTime = startDate.getTime()
val endTime = appInfo.attempts.head.endTime.getTime()

val activeJobs = new ListBuffer[v1.JobData]()
Expand Down Expand Up @@ -327,6 +328,10 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
<strong>User:</strong>
{parent.getSparkUser}
</li>
<li>
<strong>Started At:</strong>
{UIUtils.formatDate(startDate)}
</li>
<li>
<strong>Total Uptime:</strong>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
var transportContext: TransportContext = _
var rpcHandler: ExternalBlockHandler = _

override def beforeAll(): Unit = {
super.beforeAll()
protected def initializeHandlers(): Unit = {
val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 2)
rpcHandler = new ExternalBlockHandler(transportConf, null)
transportContext = new TransportContext(transportConf, rpcHandler)
Expand All @@ -61,6 +60,11 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
conf.set(config.SHUFFLE_SERVICE_PORT, server.getPort)
}

override def beforeAll(): Unit = {
super.beforeAll()
initializeHandlers()
}

override def afterAll(): Unit = {
Utils.tryLogNonFatalError{
server.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.config
import org.apache.spark.network.TransportContext
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.ExternalBlockHandler

/**
* This suite creates an external shuffle server and routes all shuffle fetches through it.
* Note that failures in this suite may arise due to changes in Spark that invalidate expectations
* set up in `ExternalShuffleBlockHandler`, such as changing the format of shuffle files or how
* we hash files into folders.
*/
class SslExternalShuffleServiceSuite extends ExternalShuffleServiceSuite {

override def initializeHandlers(): Unit = {
SslTestUtils.updateWithSSLConfig(conf)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf);
// Show that we can successfully inherit options defined in the `spark.ssl` namespace
val defaultSslOptions = SSLOptions.parse(conf, hadoopConf, "spark.ssl")
val sslOptions = SSLOptions.parse(
conf, hadoopConf, "spark.ssl.rpc", defaults = Some(defaultSslOptions))
val transportConf = SparkTransportConf.fromSparkConf(
conf, "shuffle", numUsableCores = 2, sslOptions = Some(sslOptions))

rpcHandler = new ExternalBlockHandler(transportConf, null)
transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()

conf.set(config.SHUFFLE_MANAGER, "sort")
conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
conf.set(config.SHUFFLE_SERVICE_PORT, server.getPort)
}
}
Loading

0 comments on commit 14e9435

Please sign in to comment.