Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[pmem-shuffle-15] Make pmem-shuffle support Spark3.1.1 #16

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
<version>3.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.12</artifactId>
<version>3.0.0</version>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
<version>3.1.1</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand All @@ -47,7 +47,7 @@
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.12</artifactId>
<version>3.0.3</version>
<version>3.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager
}
}

override def getReader[K, C](handle: _root_.org.apache.spark.shuffle.ShuffleHandle, startPartition: Int, endPartition: Int, context: _root_.org.apache.spark.TaskContext, readMetrics: ShuffleReadMetricsReporter): _root_.org.apache.spark.shuffle.ShuffleReader[K, C] = {
override def getReader[K, C](handle: _root_.org.apache.spark.shuffle.ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: _root_.org.apache.spark.TaskContext, readMetrics: ShuffleReadMetricsReporter): _root_.org.apache.spark.shuffle.ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startPartition, endPartition)
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
if (pmofConf.enableRdma) {
new RdmaShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]],
startPartition, endPartition, context, pmofConf)
startMapIndex, endMapIndex, startPartition, endPartition, context, pmofConf)
} else {
new BaseShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, startPartition, endPartition, context, readMetrics, pmofConf)
Expand Down Expand Up @@ -100,5 +100,4 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager
new IndexShuffleBlockResolver(conf)
}

override def getReaderForRange[K, C](handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import org.apache.spark.util.configuration.pmof.PmofConf
* requesting them from other nodes' block stores.
*/
private[spark] class RdmaShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C],
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
Expand All @@ -36,7 +38,7 @@ private[spark] class RdmaShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C],
context,
PmofTransferService.getTransferServiceInstance(pmofConf, blockManager),
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import org.apache.spark.util.configuration.pmof.PmofConf
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import scala.util.control.Breaks._

private[spark] class PersistentMemoryHandler(
val pmofConf: PmofConf,
val root_dir: String,
val path_list: List[String],
val shuffleId: String,
Expand Down Expand Up @@ -87,21 +90,45 @@ private[spark] class PersistentMemoryHandler(
def getPartitionManagedBuffer(blockId: String): ManagedBuffer = {
new PmemManagedBuffer(this, blockId)
}

def fileDeletion(path: String): Unit = synchronized {
try {
if (new File(path).delete()) {
logInfo("File deleted successfully: " + poolFile)
} else {
logWarning("Failed to delete file: " + poolFile)
}
} catch {
case e: Exception => e.printStackTrace()
}
}

def close(): Unit = synchronized {
val timeout = pmofConf.fileEmptyTimeout
val interval = pmofConf.fileEmptyInterval
val startTime = System.currentTimeMillis()
var first = true
if (isFsdaxFile) {
try {
if (new File(poolFile).delete()) {
logInfo("File deleted successfully: " + poolFile)
} else {
logWarning("Failed to delete file: " + poolFile)
var currentTime = System.currentTimeMillis()
breakable{
while(currentTime - startTime < timeout * 1000){
if(!Files.exists(Paths.get(poolFile))){
break
}
if (!first){
/**
The slept thread will be terminated immediately
Thread.sleep(interval * 1000)
**/
}
fileDeletion(poolFile)
currentTime = System.currentTimeMillis()
first = false
}
} catch {
case e: Exception => e.printStackTrace()
}
}
} else {
pmpool.close()
pmMetaHandler.remove()
Runtime.getRuntime.exec("pmempool rm " + poolFile)
}
}

Expand All @@ -119,7 +146,7 @@ object PersistentMemoryHandler {
private var stopped: Boolean = _
def getPersistentMemoryHandler(pmofConf: PmofConf, root_dir: String, path_arg: List[String], shuffleBlockId: String, pmPoolSize: Long): PersistentMemoryHandler = synchronized {
if (persistentMemoryHandler == null) {
persistentMemoryHandler = new PersistentMemoryHandler(root_dir, path_arg, shuffleBlockId, pmPoolSize)
persistentMemoryHandler = new PersistentMemoryHandler(pmofConf, root_dir, path_arg, shuffleBlockId, pmPoolSize)
persistentMemoryHandler.log("Use persistentMemoryHandler Object: " + this)
if (pmofConf.enableRdma) {
val blockManager = SparkEnv.get.blockManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ class PmofConf(conf: SparkConf) {
val shuffleBlockSize: Int = conf.getInt("spark.shuffle.pmof.shuffle_block_size", defaultValue = 2048)
val pmemCapacity: Long = conf.getLong("spark.shuffle.pmof.pmem_capacity", defaultValue = 264239054848L)
val pmemCoreMap = conf.get("spark.shuffle.pmof.dev_core_set", defaultValue = "/dev/dax0.0:0-17,36-53").split(";").map(_.trim).map(_.split(":")).map(arr => arr(0) -> arr(1)).toMap
val fileEmptyTimeout: Int = conf.getInt("spark.shuffle.pmof.file_empty_timeout", defaultValue = 30)
val fileEmptyInterval: Int = conf.getInt("spark.shuffle.pmof.file_empty_interval", defaultValue = 5)
}