Skip to content

Commit

Permalink
[SPARK-46957][CORE] Decommission migrated shuffle files should be abl…
Browse files Browse the repository at this point in the history
…e to cleanup from executor

### What changes were proposed in this pull request?

This PR uses `SortShuffleManager#taskIdMapsForShuffle` to track the migrated shuffle files on the destination executor.

### Why are the changes needed?

This is a long-standing bug in decommission where the migrated shuffle files can't be cleaned up from the executor. Normally, the shuffle files are tracked by `taskIdMapsForShuffle` during the map task execution. Upon receiving the `RemoveShuffle(shuffleId)` request from driver, executor can clean up those shuffle files by searching `taskIdMapsForShuffle`. However, for the migrated shuffle files by decommission, they lose the track in the destination executor's  `taskIdMapsForShuffle` and can't be deleted as a result.

Note this bug only affects shuffle removal on the executor. For shuffle removal on the external shuffle service (when `spark.shuffle.service.removeShuffle` enabled and the executor stores the shuffle files has gone), we don't rely on `taskIdMapsForShuffle` but using the specific shuffle block id to locate the shuffle file directly. So it won't be an issue there.

### Does this PR introduce _any_ user-facing change?

No. (Common users won't see the difference underlying.)

### How was this patch tested?

Add unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #47037 from Ngone51/SPARK-46957.

Authored-by: Yi Wu <yi.wu@databricks.com>
Signed-off-by: Yi Wu <yi.wu@databricks.com>
  • Loading branch information
Ngone51 committed Jun 27, 2024
1 parent b11608c commit b5a55e4
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public void initializeExecutor(String appId, String execId, Map<String, String>
if (blockManager == null) {
throw new IllegalStateException("No blockManager available from the SparkEnv.");
}
blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager);
blockResolver =
new IndexShuffleBlockResolver(sparkConf, blockManager, Map.of() /* Shouldn't be accessed */);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.nio.ByteBuffer
import java.nio.channels.Channels
import java.nio.file.Files
import java.util.{Map => JMap}

import scala.collection.mutable.ArrayBuffer

Expand All @@ -40,6 +41,7 @@ import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
import org.apache.spark.storage._
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.OpenHashSet

/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
Expand All @@ -55,7 +57,8 @@ import org.apache.spark.util.Utils
private[spark] class IndexShuffleBlockResolver(
conf: SparkConf,
// var for testing
var _blockManager: BlockManager = null)
var _blockManager: BlockManager = null,
val taskIdMapsForShuffle: JMap[Int, OpenHashSet[Long]] = JMap.of())
extends ShuffleBlockResolver
with Logging with MigratableResolver {

Expand Down Expand Up @@ -285,6 +288,21 @@ private[spark] class IndexShuffleBlockResolver(
throw SparkCoreErrors.failedRenameTempFileError(fileTmp, file)
}
}
blockId match {
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
shuffleId, _ => new OpenHashSet[Long](8)
)
mapTaskIds.add(mapId)

case ShuffleDataBlockId(shuffleId, mapId, _) =>
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
shuffleId, _ => new OpenHashSet[Long](8)
)
mapTaskIds.add(mapId)

case _ => // Unreachable
}
blockManager.reportBlockStatus(blockId, BlockStatus(StorageLevel.DISK_ONLY, 0, diskSize))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager

private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf)

override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
override val shuffleBlockResolver =
new IndexShuffleBlockResolver(conf, taskIdMapsForShuffle = taskIdMapsForShuffle)

/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ public void writeWithoutSpilling() throws Exception {

@Test
public void writeChecksumFileWithoutSpill() throws Exception {
IndexShuffleBlockResolver blockResolver = new IndexShuffleBlockResolver(conf, blockManager);
IndexShuffleBlockResolver blockResolver =
new IndexShuffleBlockResolver(conf, blockManager, Map.of());
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
Expand Down Expand Up @@ -344,7 +345,8 @@ public void writeChecksumFileWithoutSpill() throws Exception {

@Test
public void writeChecksumFileWithSpill() throws Exception {
IndexShuffleBlockResolver blockResolver = new IndexShuffleBlockResolver(conf, blockManager);
IndexShuffleBlockResolver blockResolver =
new IndexShuffleBlockResolver(conf, blockManager, Map.of());
ShuffleChecksumBlockId checksumBlockId =
new ShuffleChecksumBlockId(0, 0, IndexShuffleBlockResolver.NOOP_REDUCE_ID());
String checksumAlgorithm = conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ALGORITHM());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.spark.storage

import java.io.File
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore, TimeUnit}

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

import org.apache.commons.io.FileUtils
import org.scalatest.concurrent.Eventually

import org.apache.spark._
Expand Down Expand Up @@ -353,4 +355,78 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
import scala.language.reflectiveCalls
assert(listener.removeReasonValidated)
}

test("SPARK-46957: Migrated shuffle files should be able to cleanup from executor") {

val sparkTempDir = System.getProperty("java.io.tmpdir")

def shuffleFiles: Seq[File] = {
FileUtils
.listFiles(new File(sparkTempDir), Array("data", "index"), true)
.asScala
.toSeq
}

val existingShuffleFiles = shuffleFiles

val conf = new SparkConf()
.setAppName("SPARK-46957")
.setMaster("local-cluster[2,1,1024]")
.set(config.DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val shuffleBlockUpdates = new ArrayBuffer[BlockId]()
var isDecommissionedExecutorRemoved = false
val execToDecommission = sc.getExecutorIds().head
sc.addSparkListener(new SparkListener {
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
if (blockUpdated.blockUpdatedInfo.blockId.isShuffle) {
shuffleBlockUpdates += blockUpdated.blockUpdatedInfo.blockId
}
}

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
assert(execToDecommission === executorRemoved.executorId)
isDecommissionedExecutorRemoved = true
}
})

// Run a job to create shuffle data
val result = sc.parallelize(1 to 1000, 10)
.map { i => (i % 2, i) }
.reduceByKey(_ + _).collect()

assert(result.head === (0, 250500))
assert(result.tail.head === (1, 250000))
sc.schedulerBackend
.asInstanceOf[StandaloneSchedulerBackend]
.decommissionExecutor(
execToDecommission,
ExecutorDecommissionInfo("test", None),
adjustTargetNumExecutors = true
)

eventually(timeout(1.minute), interval(10.milliseconds)) {
assert(isDecommissionedExecutorRemoved)
// Ensure there are shuffle data have been migrated
assert(shuffleBlockUpdates.size >= 2)
}

val shuffleId = shuffleBlockUpdates
.find(_.isInstanceOf[ShuffleIndexBlockId])
.map(_.asInstanceOf[ShuffleIndexBlockId].shuffleId)
.get

val newShuffleFiles = shuffleFiles.diff(existingShuffleFiles)
assert(newShuffleFiles.size >= shuffleBlockUpdates.size)

// Remove the shuffle data
sc.shuffleDriverComponents.removeShuffle(shuffleId, true)

eventually(timeout(1.minute), interval(10.milliseconds)) {
assert(newShuffleFiles.intersect(shuffleFiles).isEmpty)
}
}
}

0 comments on commit b5a55e4

Please sign in to comment.