From 4b3bb0e43ca7e1a27308516608419487b6a844e6 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 8 May 2015 12:24:06 -0700 Subject: [PATCH] [SPARK-6627] Finished rename to ShuffleBlockResolver The previous cleanup-commit for SPARK-6627 renamed ShuffleBlockManager to ShuffleBlockResolver, but didn't rename the associated subclasses and variables; this commit does that. I'm unsure whether it's ok to rename ExternalShuffleBlockManager, since that's technically a public class? cc pwendell Author: Kay Ousterhout Closes #5764 from kayousterhout/SPARK-6627 and squashes the following commits: 43add1e [Kay Ousterhout] Spacing fix 96080bf [Kay Ousterhout] Test fixes d8a5d36 [Kay Ousterhout] [SPARK-6627] Finished rename to ShuffleBlockResolver --- ...r.scala => FileShuffleBlockResolver.scala} | 11 ++---- ....scala => IndexShuffleBlockResolver.scala} | 10 ++--- .../shuffle/hash/HashShuffleManager.scala | 6 +-- .../shuffle/hash/HashShuffleWriter.scala | 4 +- .../shuffle/sort/SortShuffleManager.scala | 6 +-- .../shuffle/sort/SortShuffleWriter.scala | 12 +++--- .../org/apache/spark/storage/BlockId.scala | 2 +- .../apache/spark/storage/BlockManager.scala | 5 ++- .../spark/storage/DiskBlockManager.scala | 2 +- .../hash/HashShuffleManagerSuite.scala | 18 ++++----- .../shuffle/ExternalShuffleBlockHandler.java | 6 +-- ...java => ExternalShuffleBlockResolver.java} | 16 ++++---- .../ExternalShuffleBlockHandlerSuite.java | 16 ++++---- ...=> ExternalShuffleBlockResolverSuite.java} | 30 +++++++-------- .../shuffle/ExternalShuffleCleanupSuite.java | 37 ++++++++++--------- .../shuffle/TestShuffleDataContext.java | 8 ++-- 16 files changed, 94 insertions(+), 95 deletions(-) rename core/src/main/scala/org/apache/spark/shuffle/{FileShuffleBlockManager.scala => FileShuffleBlockResolver.scala} (97%) rename core/src/main/scala/org/apache/spark/shuffle/{IndexShuffleBlockManager.scala => IndexShuffleBlockResolver.scala} (93%) rename network/shuffle/src/main/java/org/apache/spark/network/shuffle/{ExternalShuffleBlockManager.java => ExternalShuffleBlockResolver.java} (95%) rename network/shuffle/src/test/java/org/apache/spark/network/shuffle/{ExternalShuffleBlockManagerSuite.java => ExternalShuffleBlockResolverSuite.java} (77%) diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala similarity index 97% rename from core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala rename to core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index e9b4e2b955dc8..6ad427bcac7f9 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -18,7 +18,6 @@ package org.apache.spark.shuffle import java.io.File -import java.nio.ByteBuffer import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger @@ -29,7 +28,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.serializer.Serializer -import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup +import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup import org.apache.spark.storage._ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} @@ -64,9 +63,8 @@ private[spark] trait ShuffleWriterGroup { * files within a ShuffleFileGroups associated with the block's reducer. */ // Note: Changes to the format in this file should be kept in sync with -// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData(). -private[spark] -class FileShuffleBlockManager(conf: SparkConf) +// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData(). +private[spark] class FileShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver with Logging { private val transportConf = SparkTransportConf.fromSparkConf(conf) @@ -242,8 +240,7 @@ class FileShuffleBlockManager(conf: SparkConf) } } -private[spark] -object FileShuffleBlockManager { +private[spark] object FileShuffleBlockResolver { /** * A group of shuffle files, one per reducer. * A particular mapper will be assigned a single ShuffleFileGroup to write its output to. diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala similarity index 93% rename from core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala rename to core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala index a1741e2875c16..d9c63b6e7bbb9 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala @@ -18,7 +18,6 @@ package org.apache.spark.shuffle import java.io._ -import java.nio.ByteBuffer import com.google.common.io.ByteStreams @@ -28,7 +27,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.storage._ import org.apache.spark.util.Utils -import IndexShuffleBlockManager.NOOP_REDUCE_ID +import IndexShuffleBlockResolver.NOOP_REDUCE_ID /** * Create and maintain the shuffle blocks' mapping between logic block and physical file location. @@ -40,9 +39,8 @@ import IndexShuffleBlockManager.NOOP_REDUCE_ID * */ // Note: Changes to the format in this file should be kept in sync with -// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData(). -private[spark] -class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver { +// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData(). +private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver { private lazy val blockManager = SparkEnv.get.blockManager @@ -115,7 +113,7 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver { override def stop(): Unit = {} } -private[spark] object IndexShuffleBlockManager { +private[spark] object IndexShuffleBlockResolver { // No-op reduce ID used in interactions with disk store and BlockObjectWriter. // The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort // shuffle outputs for several reduces are glommed into a single file. diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala index 2a7df8dd5bd83..c089088f409dd 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala @@ -26,7 +26,7 @@ import org.apache.spark.shuffle._ */ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager { - private val fileShuffleBlockManager = new FileShuffleBlockManager(conf) + private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf) /* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ override def registerShuffle[K, V, C]( @@ -61,8 +61,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager shuffleBlockResolver.removeShuffle(shuffleId) } - override def shuffleBlockResolver: FileShuffleBlockManager = { - fileShuffleBlockManager + override def shuffleBlockResolver: FileShuffleBlockResolver = { + fileShuffleBlockResolver } /** Shut down this ShuffleManager. */ diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala index cd27c9e07a3cd..897f0a5dc5bcc 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala @@ -25,7 +25,7 @@ import org.apache.spark.shuffle._ import org.apache.spark.storage.BlockObjectWriter private[spark] class HashShuffleWriter[K, V]( - shuffleBlockManager: FileShuffleBlockManager, + shuffleBlockResolver: FileShuffleBlockResolver, handle: BaseShuffleHandle[K, V, _], mapId: Int, context: TaskContext) @@ -45,7 +45,7 @@ private[spark] class HashShuffleWriter[K, V]( private val blockManager = SparkEnv.get.blockManager private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null)) - private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, + private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser, writeMetrics) /** Write a bunch of records to this task's output */ diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 0497036192154..15842941daaab 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager { - private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf) + private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf) private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]() /** @@ -72,8 +72,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager true } - override def shuffleBlockResolver: IndexShuffleBlockManager = { - indexShuffleBlockManager + override def shuffleBlockResolver: IndexShuffleBlockResolver = { + indexShuffleBlockResolver } /** Shut down this ShuffleManager. */ diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index a066435df6fb0..add2656294ca2 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -20,12 +20,12 @@ package org.apache.spark.shuffle.sort import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.scheduler.MapStatus -import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle} +import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle} import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.collection.ExternalSorter private[spark] class SortShuffleWriter[K, V, C]( - shuffleBlockManager: IndexShuffleBlockManager, + shuffleBlockResolver: IndexShuffleBlockResolver, handle: BaseShuffleHandle[K, V, C], mapId: Int, context: TaskContext) @@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C]( // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). - val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) - val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID) + val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) + val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) - shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) + shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } @@ -84,7 +84,7 @@ private[spark] class SortShuffleWriter[K, V, C]( return Option(mapStatus) } else { // The map task failed, so delete our output data. - shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId) + shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId) return None } } finally { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index c186fd360fef6..524f6970992a5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -54,7 +54,7 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId { } // Format of the shuffle block ids (including data and index) should be kept in sync with -// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData(). +// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData(). @DeveloperApi case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a46fecd2274ef..cc794e5c90ffa 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -431,10 +431,11 @@ private[spark] class BlockManager( // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work if (blockId.isShuffle) { - val shuffleBlockManager = shuffleManager.shuffleBlockResolver + val shuffleBlockResolver = shuffleManager.shuffleBlockResolver // TODO: This should gracefully handle case where local block is not available. Currently // downstream code will throw an exception. - Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) + Option( + shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()) } else { doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 5764c16902c66..2a4447705fa65 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -55,7 +55,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon /** Looks up a file by hashing it into one of our local subdirectories. */ // This method should be kept in sync with - // org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile(). + // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile(). def getFile(filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) diff --git a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala index 84384bb48999a..0537bf66ad020 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/hash/HashShuffleManagerSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.serializer.JavaSerializer -import org.apache.spark.shuffle.FileShuffleBlockManager +import org.apache.spark.shuffle.FileShuffleBlockResolver import org.apache.spark.storage.{ShuffleBlockId, FileSegment} class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { @@ -53,10 +53,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test", conf) - val shuffleBlockManager = - SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager] + val shuffleBlockResolver = + SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver] - val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf), + val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf), new ShuffleWriteMetrics) for (writer <- shuffle1.writers) { writer.write("test1", "value") @@ -69,7 +69,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { val shuffle1Segment = shuffle1.writers(0).fileSegment() shuffle1.releaseWriters(success = true) - val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf), + val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf), new ShuffleWriteMetrics) for (writer <- shuffle2.writers) { @@ -88,7 +88,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { // of block based on remaining data in file : which could mess things up when there is // concurrent read and writes happening to the same shuffle group. - val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf), + val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf), new ShuffleWriteMetrics) for (writer <- shuffle3.writers) { writer.write("test3", "value") @@ -98,10 +98,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext { writer.commitAndClose() } // check before we register. - checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0))) + checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0))) shuffle3.releaseWriters(success = true) - checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0))) - shuffleBlockManager.removeShuffle(1) + checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0))) + shuffleBlockResolver.removeShuffle(1) } def writeToFile(file: File, numBytes: Int) { diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 46ca9708621b9..e4faaf8854fc7 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -46,18 +46,18 @@ public class ExternalShuffleBlockHandler extends RpcHandler { private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class); - private final ExternalShuffleBlockManager blockManager; + private final ExternalShuffleBlockResolver blockManager; private final OneForOneStreamManager streamManager; public ExternalShuffleBlockHandler(TransportConf conf) { - this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf)); + this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf)); } /** Enables mocking out the StreamManager and BlockManager. */ @VisibleForTesting ExternalShuffleBlockHandler( OneForOneStreamManager streamManager, - ExternalShuffleBlockManager blockManager) { + ExternalShuffleBlockResolver blockManager) { this.streamManager = streamManager; this.blockManager = blockManager; } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java similarity index 95% rename from network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java rename to network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 93e6fdd7161fa..dd08e24cade23 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -44,13 +44,13 @@ * Manages converting shuffle BlockIds into physical segments of local files, from a process outside * of Executors. Each Executor must register its own configuration about where it stores its files * (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated - * from Spark's FileShuffleBlockManager and IndexShuffleBlockManager. + * from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver. * * Executors with shuffle file consolidation are not currently supported, as the index is stored in - * the Executor's memory, unlike the IndexShuffleBlockManager. + * the Executor's memory, unlike the IndexShuffleBlockResolver. */ -public class ExternalShuffleBlockManager { - private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class); +public class ExternalShuffleBlockResolver { + private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class); // Map containing all registered executors' metadata. private final ConcurrentMap executors; @@ -60,7 +60,7 @@ public class ExternalShuffleBlockManager { private final TransportConf conf; - public ExternalShuffleBlockManager(TransportConf conf) { + public ExternalShuffleBlockResolver(TransportConf conf) { this(conf, Executors.newSingleThreadExecutor( // Add `spark` prefix because it will run in NM in Yarn mode. NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner"))); @@ -68,7 +68,7 @@ public ExternalShuffleBlockManager(TransportConf conf) { // Allows tests to have more control over when directories are cleaned up. @VisibleForTesting - ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) { + ExternalShuffleBlockResolver(TransportConf conf, Executor directoryCleaner) { this.conf = conf; this.executors = Maps.newConcurrentMap(); this.directoryCleaner = directoryCleaner; @@ -168,7 +168,7 @@ private void deleteExecutorDirs(String[] dirs) { /** * Hash-based shuffle data is simply stored as one file per block. - * This logic is from FileShuffleBlockManager. + * This logic is from FileShuffleBlockResolver. */ // TODO: Support consolidated hash shuffle files private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) { @@ -178,7 +178,7 @@ private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, /** * Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file - * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager, + * called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver, * and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId. */ private ManagedBuffer getSortBasedShuffleBlockData( diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java index 3f9fe1681cf27..73374cdc77a23 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandlerSuite.java @@ -45,14 +45,14 @@ public class ExternalShuffleBlockHandlerSuite { TransportClient client = mock(TransportClient.class); OneForOneStreamManager streamManager; - ExternalShuffleBlockManager blockManager; + ExternalShuffleBlockResolver blockResolver; RpcHandler handler; @Before public void beforeEach() { streamManager = mock(OneForOneStreamManager.class); - blockManager = mock(ExternalShuffleBlockManager.class); - handler = new ExternalShuffleBlockHandler(streamManager, blockManager); + blockResolver = mock(ExternalShuffleBlockResolver.class); + handler = new ExternalShuffleBlockHandler(streamManager, blockResolver); } @Test @@ -62,7 +62,7 @@ public void testRegisterExecutor() { ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort"); byte[] registerMessage = new RegisterExecutor("app0", "exec1", config).toByteArray(); handler.receive(client, registerMessage, callback); - verify(blockManager, times(1)).registerExecutor("app0", "exec1", config); + verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config); verify(callback, times(1)).onSuccess((byte[]) any()); verify(callback, never()).onFailure((Throwable) any()); @@ -75,12 +75,12 @@ public void testOpenShuffleBlocks() { ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3])); ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7])); - when(blockManager.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker); - when(blockManager.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker); + when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker); + when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker); byte[] openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" }).toByteArray(); handler.receive(client, openBlocks, callback); - verify(blockManager, times(1)).getBlockData("app0", "exec1", "b0"); - verify(blockManager, times(1)).getBlockData("app0", "exec1", "b1"); + verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0"); + verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1"); ArgumentCaptor response = ArgumentCaptor.forClass(byte[].class); verify(callback, times(1)).onSuccess(response.capture()); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java similarity index 77% rename from network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java rename to network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index dad6428a836fc..d02f4f0fdb682 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManagerSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -30,7 +30,7 @@ import static org.junit.Assert.*; -public class ExternalShuffleBlockManagerSuite { +public class ExternalShuffleBlockResolverSuite { static String sortBlock0 = "Hello!"; static String sortBlock1 = "World!"; @@ -60,29 +60,29 @@ public static void afterAll() { @Test public void testBadRequests() { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); + ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf); // Unregistered executor try { - manager.getBlockData("app0", "exec1", "shuffle_1_1_0"); + resolver.getBlockData("app0", "exec1", "shuffle_1_1_0"); fail("Should have failed"); } catch (RuntimeException e) { assertTrue("Bad error message: " + e, e.getMessage().contains("not registered")); } // Invalid shuffle manager - manager.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar")); + resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar")); try { - manager.getBlockData("app0", "exec2", "shuffle_1_1_0"); + resolver.getBlockData("app0", "exec2", "shuffle_1_1_0"); fail("Should have failed"); } catch (UnsupportedOperationException e) { // pass } // Nonexistent shuffle block - manager.registerExecutor("app0", "exec3", + resolver.registerExecutor("app0", "exec3", dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); try { - manager.getBlockData("app0", "exec3", "shuffle_1_1_0"); + resolver.getBlockData("app0", "exec3", "shuffle_1_1_0"); fail("Should have failed"); } catch (Exception e) { // pass @@ -91,18 +91,18 @@ public void testBadRequests() { @Test public void testSortShuffleBlocks() throws IOException { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); - manager.registerExecutor("app0", "exec0", + ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf); + resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.sort.SortShuffleManager")); InputStream block0Stream = - manager.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream(); + resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream(); String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); block0Stream.close(); assertEquals(sortBlock0, block0); InputStream block1Stream = - manager.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream(); + resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream(); String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); block1Stream.close(); assertEquals(sortBlock1, block1); @@ -110,18 +110,18 @@ public void testSortShuffleBlocks() throws IOException { @Test public void testHashShuffleBlocks() throws IOException { - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf); - manager.registerExecutor("app0", "exec0", + ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf); + resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo("org.apache.spark.shuffle.hash.HashShuffleManager")); InputStream block0Stream = - manager.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream(); + resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream(); String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); block0Stream.close(); assertEquals(hashBlock0, block0); InputStream block1Stream = - manager.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream(); + resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream(); String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); block1Stream.close(); assertEquals(hashBlock1, block1); diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index 254e3a7a32b98..d9d9c1bf2f17a 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -41,14 +41,15 @@ public class ExternalShuffleCleanupSuite { public void noCleanupAndCleanup() throws IOException { TestShuffleDataContext dataContext = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); - manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); - manager.applicationRemoved("app", false /* cleanup */); + ExternalShuffleBlockResolver resolver = + new ExternalShuffleBlockResolver(conf, sameThreadExecutor); + resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); + resolver.applicationRemoved("app", false /* cleanup */); assertStillThere(dataContext); - manager.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr")); - manager.applicationRemoved("app", true /* cleanup */); + resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr")); + resolver.applicationRemoved("app", true /* cleanup */); assertCleanedUp(dataContext); } @@ -64,7 +65,7 @@ public void cleanupUsesExecutor() throws IOException { @Override public void execute(Runnable runnable) { cleanupCalled.set(true); } }; - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, noThreadExecutor); + ExternalShuffleBlockResolver manager = new ExternalShuffleBlockResolver(conf, noThreadExecutor); manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); manager.applicationRemoved("app", true); @@ -81,11 +82,12 @@ public void cleanupMultipleExecutors() throws IOException { TestShuffleDataContext dataContext0 = createSomeData(); TestShuffleDataContext dataContext1 = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); + ExternalShuffleBlockResolver resolver = + new ExternalShuffleBlockResolver(conf, sameThreadExecutor); - manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); - manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr")); - manager.applicationRemoved("app", true); + resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); + resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr")); + resolver.applicationRemoved("app", true); assertCleanedUp(dataContext0); assertCleanedUp(dataContext1); @@ -96,25 +98,26 @@ public void cleanupOnlyRemovedApp() throws IOException { TestShuffleDataContext dataContext0 = createSomeData(); TestShuffleDataContext dataContext1 = createSomeData(); - ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(conf, sameThreadExecutor); + ExternalShuffleBlockResolver resolver = + new ExternalShuffleBlockResolver(conf, sameThreadExecutor); - manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); - manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr")); + resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); + resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr")); - manager.applicationRemoved("app-nonexistent", true); + resolver.applicationRemoved("app-nonexistent", true); assertStillThere(dataContext0); assertStillThere(dataContext1); - manager.applicationRemoved("app-0", true); + resolver.applicationRemoved("app-0", true); assertCleanedUp(dataContext0); assertStillThere(dataContext1); - manager.applicationRemoved("app-1", true); + resolver.applicationRemoved("app-1", true); assertCleanedUp(dataContext0); assertCleanedUp(dataContext1); // Make sure it's not an error to cleanup multiple times - manager.applicationRemoved("app-1", true); + resolver.applicationRemoved("app-1", true); assertCleanedUp(dataContext0); assertCleanedUp(dataContext1); } diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java index 76639114df5d9..3fdde054ab6c7 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java @@ -29,7 +29,7 @@ /** * Manages some sort- and hash-based shuffle data, including the creation - * and cleanup of directories that can be read by the {@link ExternalShuffleBlockManager}. + * and cleanup of directories that can be read by the {@link ExternalShuffleBlockResolver}. */ public class TestShuffleDataContext { public final String[] localDirs; @@ -61,9 +61,9 @@ public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) thr String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0"; OutputStream dataStream = new FileOutputStream( - ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, blockId + ".data")); + ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".data")); DataOutputStream indexStream = new DataOutputStream(new FileOutputStream( - ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, blockId + ".index"))); + ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId + ".index"))); long offset = 0; indexStream.writeLong(offset); @@ -82,7 +82,7 @@ public void insertHashShuffleData(int shuffleId, int mapId, byte[][] blocks) thr for (int i = 0; i < blocks.length; i ++) { String blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + i; Files.write(blocks[i], - ExternalShuffleBlockManager.getFile(localDirs, subDirsPerLocalDir, blockId)); + ExternalShuffleBlockResolver.getFile(localDirs, subDirsPerLocalDir, blockId)); } }