diff --git a/core/pom.xml b/core/pom.xml index 1df685c8..3e5ee1e7 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -19,18 +19,18 @@ org.apache.spark spark-core_2.12 - 3.0.0 + 3.1.1 provided org.apache.spark spark-network-common_2.12 - 3.0.0 + 3.1.1 org.apache.spark spark-core_2.12 - 3.0.0 + 3.1.1 test-jar test @@ -47,7 +47,7 @@ org.scalatest scalatest_2.12 - 3.0.3 + 3.1.1 test diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala index cf411061..63b8241b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala @@ -59,12 +59,12 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager } } - override def getReader[K, C](handle: _root_.org.apache.spark.shuffle.ShuffleHandle, startPartition: Int, endPartition: Int, context: _root_.org.apache.spark.TaskContext, readMetrics: ShuffleReadMetricsReporter): _root_.org.apache.spark.shuffle.ShuffleReader[K, C] = { + override def getReader[K, C](handle: _root_.org.apache.spark.shuffle.ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: _root_.org.apache.spark.TaskContext, readMetrics: ShuffleReadMetricsReporter): _root_.org.apache.spark.shuffle.ShuffleReader[K, C] = { val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( - handle.shuffleId, startPartition, endPartition) + handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) if (pmofConf.enableRdma) { new RdmaShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]], - startPartition, endPartition, context, pmofConf) + startMapIndex, endMapIndex, startPartition, endPartition, context, pmofConf) } else { new BaseShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, startPartition, endPartition, context, readMetrics, pmofConf) @@ -100,5 +100,4 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager new IndexShuffleBlockResolver(conf) } - override def getReaderForRange[K, C](handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = ??? } diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala index 92af5417..c2270592 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala @@ -17,6 +17,8 @@ import org.apache.spark.util.configuration.pmof.PmofConf * requesting them from other nodes' block stores. */ private[spark] class RdmaShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C], + startMapIndex: Int, + endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, @@ -36,7 +38,7 @@ private[spark] class RdmaShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C], context, PmofTransferService.getTransferServiceInstance(pmofConf, blockManager), blockManager, - mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), + mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition), serializerManager.wrapStream, // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala index 8d6900fd..73a58e89 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala @@ -18,7 +18,10 @@ import org.apache.spark.util.configuration.pmof.PmofConf import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.control.Breaks._ + private[spark] class PersistentMemoryHandler( + val pmofConf: PmofConf, val root_dir: String, val path_list: List[String], val shuffleId: String, @@ -87,21 +90,45 @@ private[spark] class PersistentMemoryHandler( def getPartitionManagedBuffer(blockId: String): ManagedBuffer = { new PmemManagedBuffer(this, blockId) } + + def fileDeletion(path: String): Unit = synchronized { + try { + if (new File(path).delete()) { + logInfo("File deleted successfully: " + poolFile) + } else { + logWarning("Failed to delete file: " + poolFile) + } + } catch { + case e: Exception => e.printStackTrace() + } + } def close(): Unit = synchronized { + val timeout = pmofConf.fileEmptyTimeout + val interval = pmofConf.fileEmptyInterval + val startTime = System.currentTimeMillis() + var first = true if (isFsdaxFile) { - try { - if (new File(poolFile).delete()) { - logInfo("File deleted successfully: " + poolFile) - } else { - logWarning("Failed to delete file: " + poolFile) + var currentTime = System.currentTimeMillis() + breakable{ + while(currentTime - startTime < timeout * 1000){ + if(!Files.exists(Paths.get(poolFile))){ + break + } + if (!first){ + /** + The slept thread will be terminated immediately + Thread.sleep(interval * 1000) + **/ + } + fileDeletion(poolFile) + currentTime = System.currentTimeMillis() + first = false } - } catch { - case e: Exception => e.printStackTrace() - } + } } else { - pmpool.close() pmMetaHandler.remove() + Runtime.getRuntime.exec("pmempool rm " + poolFile) } } @@ -119,7 +146,7 @@ object PersistentMemoryHandler { private var stopped: Boolean = _ def getPersistentMemoryHandler(pmofConf: PmofConf, root_dir: String, path_arg: List[String], shuffleBlockId: String, pmPoolSize: Long): PersistentMemoryHandler = synchronized { if (persistentMemoryHandler == null) { - persistentMemoryHandler = new PersistentMemoryHandler(root_dir, path_arg, shuffleBlockId, pmPoolSize) + persistentMemoryHandler = new PersistentMemoryHandler(pmofConf, root_dir, path_arg, shuffleBlockId, pmPoolSize) persistentMemoryHandler.log("Use persistentMemoryHandler Object: " + this) if (pmofConf.enableRdma) { val blockManager = SparkEnv.get.blockManager diff --git a/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala b/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala index f3551704..fe8c0453 100644 --- a/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala +++ b/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala @@ -27,4 +27,6 @@ class PmofConf(conf: SparkConf) { val shuffleBlockSize: Int = conf.getInt("spark.shuffle.pmof.shuffle_block_size", defaultValue = 2048) val pmemCapacity: Long = conf.getLong("spark.shuffle.pmof.pmem_capacity", defaultValue = 264239054848L) val pmemCoreMap = conf.get("spark.shuffle.pmof.dev_core_set", defaultValue = "/dev/dax0.0:0-17,36-53").split(";").map(_.trim).map(_.split(":")).map(arr => arr(0) -> arr(1)).toMap + val fileEmptyTimeout: Int = conf.getInt("spark.shuffle.pmof.file_empty_timeout", defaultValue = 30) + val fileEmptyInterval: Int = conf.getInt("spark.shuffle.pmof.file_empty_interval", defaultValue = 5) }