Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Merge pull request #16 from Eugene-Mark/support-3.1.1
Browse files Browse the repository at this point in the history
[pmem-shuffle-15] Make pmem-shuffle support Spark3.1.1
  • Loading branch information
Jian-Zhang authored Apr 25, 2021
2 parents e8c7aee + 9ef3e4d commit b5b46e3
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 19 deletions.
8 changes: 4 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
<version>3.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.12</artifactId>
<version>3.0.0</version>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
<version>3.1.1</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand All @@ -47,7 +47,7 @@
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.12</artifactId>
<version>3.0.3</version>
<version>3.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager
}
}

override def getReader[K, C](handle: _root_.org.apache.spark.shuffle.ShuffleHandle, startPartition: Int, endPartition: Int, context: _root_.org.apache.spark.TaskContext, readMetrics: ShuffleReadMetricsReporter): _root_.org.apache.spark.shuffle.ShuffleReader[K, C] = {
override def getReader[K, C](handle: _root_.org.apache.spark.shuffle.ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: _root_.org.apache.spark.TaskContext, readMetrics: ShuffleReadMetricsReporter): _root_.org.apache.spark.shuffle.ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startPartition, endPartition)
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
if (pmofConf.enableRdma) {
new RdmaShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]],
startPartition, endPartition, context, pmofConf)
startMapIndex, endMapIndex, startPartition, endPartition, context, pmofConf)
} else {
new BaseShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, startPartition, endPartition, context, readMetrics, pmofConf)
Expand Down Expand Up @@ -100,5 +100,4 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager
new IndexShuffleBlockResolver(conf)
}

override def getReaderForRange[K, C](handle: ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import org.apache.spark.util.configuration.pmof.PmofConf
* requesting them from other nodes' block stores.
*/
private[spark] class RdmaShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C],
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
Expand All @@ -36,7 +38,7 @@ private[spark] class RdmaShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C],
context,
PmofTransferService.getTransferServiceInstance(pmofConf, blockManager),
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import org.apache.spark.util.configuration.pmof.PmofConf
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import scala.util.control.Breaks._

private[spark] class PersistentMemoryHandler(
val pmofConf: PmofConf,
val root_dir: String,
val path_list: List[String],
val shuffleId: String,
Expand Down Expand Up @@ -87,21 +90,45 @@ private[spark] class PersistentMemoryHandler(
def getPartitionManagedBuffer(blockId: String): ManagedBuffer = {
new PmemManagedBuffer(this, blockId)
}

def fileDeletion(path: String): Unit = synchronized {
try {
if (new File(path).delete()) {
logInfo("File deleted successfully: " + poolFile)
} else {
logWarning("Failed to delete file: " + poolFile)
}
} catch {
case e: Exception => e.printStackTrace()
}
}

def close(): Unit = synchronized {
val timeout = pmofConf.fileEmptyTimeout
val interval = pmofConf.fileEmptyInterval
val startTime = System.currentTimeMillis()
var first = true
if (isFsdaxFile) {
try {
if (new File(poolFile).delete()) {
logInfo("File deleted successfully: " + poolFile)
} else {
logWarning("Failed to delete file: " + poolFile)
var currentTime = System.currentTimeMillis()
breakable{
while(currentTime - startTime < timeout * 1000){
if(!Files.exists(Paths.get(poolFile))){
break
}
if (!first){
/**
The slept thread will be terminated immediately
Thread.sleep(interval * 1000)
**/
}
fileDeletion(poolFile)
currentTime = System.currentTimeMillis()
first = false
}
} catch {
case e: Exception => e.printStackTrace()
}
}
} else {
pmpool.close()
pmMetaHandler.remove()
Runtime.getRuntime.exec("pmempool rm " + poolFile)
}
}

Expand All @@ -119,7 +146,7 @@ object PersistentMemoryHandler {
private var stopped: Boolean = _
def getPersistentMemoryHandler(pmofConf: PmofConf, root_dir: String, path_arg: List[String], shuffleBlockId: String, pmPoolSize: Long): PersistentMemoryHandler = synchronized {
if (persistentMemoryHandler == null) {
persistentMemoryHandler = new PersistentMemoryHandler(root_dir, path_arg, shuffleBlockId, pmPoolSize)
persistentMemoryHandler = new PersistentMemoryHandler(pmofConf, root_dir, path_arg, shuffleBlockId, pmPoolSize)
persistentMemoryHandler.log("Use persistentMemoryHandler Object: " + this)
if (pmofConf.enableRdma) {
val blockManager = SparkEnv.get.blockManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ class PmofConf(conf: SparkConf) {
val shuffleBlockSize: Int = conf.getInt("spark.shuffle.pmof.shuffle_block_size", defaultValue = 2048)
val pmemCapacity: Long = conf.getLong("spark.shuffle.pmof.pmem_capacity", defaultValue = 264239054848L)
val pmemCoreMap = conf.get("spark.shuffle.pmof.dev_core_set", defaultValue = "/dev/dax0.0:0-17,36-53").split(";").map(_.trim).map(_.split(":")).map(arr => arr(0) -> arr(1)).toMap
val fileEmptyTimeout: Int = conf.getInt("spark.shuffle.pmof.file_empty_timeout", defaultValue = 30)
val fileEmptyInterval: Int = conf.getInt("spark.shuffle.pmof.file_empty_interval", defaultValue = 5)
}

0 comments on commit b5b46e3

Please sign in to comment.