Skip to content

Commit

Permalink
set ZE_AFFINITY_MASK=rankId
Browse files Browse the repository at this point in the history
  • Loading branch information
minmingzhu committed Aug 19, 2024
1 parent 4fe2416 commit 4481e2a
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 6 deletions.
8 changes: 2 additions & 6 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@ object OneCCL extends Logging {
}

// Run on Executor
def setExecutorEnv(): Unit = {
setEnv("CCL_ATL_TRANSPORT", "ofi")
// Set CCL_ROOT to workaround CCL_ROOT env read bug, should remove when upstream fix this
setEnv("CCL_ROOT", "/opt/intel/oneapi/ccl/latest")
// Uncomment this if you whant to debug oneCCL
// setEnv("CCL_LOG_LEVEL", "debug")
def setExecutorEnv(key: String, value: String): Unit = {
setEnv(key, value)
}

// Run on Executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ class RandomForestClassifierDALImpl(val uid: String,
rfcTimer.record("Data Convertion")
val kvsIPPort = getOneCCLIPPort(labeledPointsTables)

labeledPointsTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

labeledPointsTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ class KMeansDALImpl(var nClusters: Int,
kmeansTimer.record("Data Convertion")

val kvsIPPort = getOneCCLIPPort(coalescedTables)
coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

coalescedTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ class PCADALImpl(val k: Int,
val kvsIPPort = getOneCCLIPPort(coalescedTables)
pcaTimer.record("Data Convertion")

coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

coalescedTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,
}
lrTimer.record("Data Convertion")

labeledPointsTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) =>
val (feature, label) = tables.next()
val (featureTabAddr : Long, featureRows : Long, featureColumns : Long) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ class RandomForestRegressorDALImpl(val uid: String,

val kvsIPPort = getOneCCLIPPort(labeledPointsTables)

labeledPointsTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

labeledPointsTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ class CorrelationDALImpl(
}.count()
corTimer.record("OneCCL Init")

coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") {
iter.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ class SummarizerDALImpl(val executorNum: Int,
}.count()
sumTimer.record("OneCCL Init")

coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setExecutorEnv("ZE_AFFINITY_MASK", gpuIndices(0).toString())
Iterator.empty
}.count()

val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") {
iter.next()
Expand Down

0 comments on commit 4481e2a

Please sign in to comment.