diff --git a/core/pom.xml b/core/pom.xml
index 1df685c8..3e5ee1e7 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -19,18 +19,18 @@
org.apache.spark
spark-core_2.12
- 3.0.0
+ 3.1.1
provided
org.apache.spark
spark-network-common_2.12
- 3.0.0
+ 3.1.1
org.apache.spark
spark-core_2.12
- 3.0.0
+ 3.1.1
test-jar
test
@@ -47,7 +47,7 @@
org.scalatest
scalatest_2.12
- 3.0.3
+ 3.1.1
test
diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala
index cf411061..63b8241b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala
@@ -59,12 +59,12 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager
}
}
- override def getReader[K, C](handle: _root_.org.apache.spark.shuffle.ShuffleHandle, startPartition: Int, endPartition: Int, context: _root_.org.apache.spark.TaskContext, readMetrics: ShuffleReadMetricsReporter): _root_.org.apache.spark.shuffle.ShuffleReader[K, C] = {
+ override def getReader[K, C](handle: _root_.org.apache.spark.shuffle.ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: _root_.org.apache.spark.TaskContext, readMetrics: ShuffleReadMetricsReporter): _root_.org.apache.spark.shuffle.ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
- handle.shuffleId, startPartition, endPartition)
+ handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
if (pmofConf.enableRdma) {
new RdmaShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]],
- startPartition, endPartition, context, pmofConf)
+ startMapIndex, endMapIndex, startPartition, endPartition, context, pmofConf)
} else {
new BaseShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, startPartition, endPartition, context, readMetrics, pmofConf)
@@ -100,5 +100,4 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager
new IndexShuffleBlockResolver(conf)
}
- override def getReaderForRange[K, C](handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = ???
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala
index 92af5417..c2270592 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala
@@ -17,6 +17,8 @@ import org.apache.spark.util.configuration.pmof.PmofConf
* requesting them from other nodes' block stores.
*/
private[spark] class RdmaShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C],
+ startMapIndex: Int,
+ endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
@@ -36,7 +38,7 @@ private[spark] class RdmaShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C],
context,
PmofTransferService.getTransferServiceInstance(pmofConf, blockManager),
blockManager,
- mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
+ mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala
index 8d6900fd..73a58e89 100644
--- a/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala
+++ b/core/src/main/scala/org/apache/spark/storage/pmof/PersistentMemoryHandler.scala
@@ -18,7 +18,10 @@ import org.apache.spark.util.configuration.pmof.PmofConf
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
+import scala.util.control.Breaks._
+
private[spark] class PersistentMemoryHandler(
+ val pmofConf: PmofConf,
val root_dir: String,
val path_list: List[String],
val shuffleId: String,
@@ -87,21 +90,45 @@ private[spark] class PersistentMemoryHandler(
def getPartitionManagedBuffer(blockId: String): ManagedBuffer = {
new PmemManagedBuffer(this, blockId)
}
+
+ def fileDeletion(path: String): Unit = synchronized {
+ try {
+ if (new File(path).delete()) {
+ logInfo("File deleted successfully: " + poolFile)
+ } else {
+ logWarning("Failed to delete file: " + poolFile)
+ }
+ } catch {
+ case e: Exception => e.printStackTrace()
+ }
+ }
def close(): Unit = synchronized {
+ val timeout = pmofConf.fileEmptyTimeout
+ val interval = pmofConf.fileEmptyInterval
+ val startTime = System.currentTimeMillis()
+ var first = true
if (isFsdaxFile) {
- try {
- if (new File(poolFile).delete()) {
- logInfo("File deleted successfully: " + poolFile)
- } else {
- logWarning("Failed to delete file: " + poolFile)
+ var currentTime = System.currentTimeMillis()
+ breakable{
+ while(currentTime - startTime < timeout * 1000){
+ if(!Files.exists(Paths.get(poolFile))){
+ break
+ }
+ if (!first){
+ /**
+ The slept thread will be terminated immediately
+ Thread.sleep(interval * 1000)
+ **/
+ }
+ fileDeletion(poolFile)
+ currentTime = System.currentTimeMillis()
+ first = false
}
- } catch {
- case e: Exception => e.printStackTrace()
- }
+ }
} else {
- pmpool.close()
pmMetaHandler.remove()
+ Runtime.getRuntime.exec("pmempool rm " + poolFile)
}
}
@@ -119,7 +146,7 @@ object PersistentMemoryHandler {
private var stopped: Boolean = _
def getPersistentMemoryHandler(pmofConf: PmofConf, root_dir: String, path_arg: List[String], shuffleBlockId: String, pmPoolSize: Long): PersistentMemoryHandler = synchronized {
if (persistentMemoryHandler == null) {
- persistentMemoryHandler = new PersistentMemoryHandler(root_dir, path_arg, shuffleBlockId, pmPoolSize)
+ persistentMemoryHandler = new PersistentMemoryHandler(pmofConf, root_dir, path_arg, shuffleBlockId, pmPoolSize)
persistentMemoryHandler.log("Use persistentMemoryHandler Object: " + this)
if (pmofConf.enableRdma) {
val blockManager = SparkEnv.get.blockManager
diff --git a/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala b/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala
index f3551704..fe8c0453 100644
--- a/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala
+++ b/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala
@@ -27,4 +27,6 @@ class PmofConf(conf: SparkConf) {
val shuffleBlockSize: Int = conf.getInt("spark.shuffle.pmof.shuffle_block_size", defaultValue = 2048)
val pmemCapacity: Long = conf.getLong("spark.shuffle.pmof.pmem_capacity", defaultValue = 264239054848L)
val pmemCoreMap = conf.get("spark.shuffle.pmof.dev_core_set", defaultValue = "/dev/dax0.0:0-17,36-53").split(";").map(_.trim).map(_.split(":")).map(arr => arr(0) -> arr(1)).toMap
+ val fileEmptyTimeout: Int = conf.getInt("spark.shuffle.pmof.file_empty_timeout", defaultValue = 30)
+ val fileEmptyInterval: Int = conf.getInt("spark.shuffle.pmof.file_empty_interval", defaultValue = 5)
}