Skip to content

Commit

Permalink
use spark.oap.mllib.oneccl.kvs.ip to workaround KVS IP hang issue
Browse files Browse the repository at this point in the history
  • Loading branch information
xwu99 committed Feb 2, 2021
1 parent 2d87d07 commit 318cae1
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class KMeansDALImpl (
instr.foreach(_.logInfo(s"Processing partitions with $executorNum executors"))

val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext)
val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)

// repartition to executorNum if not enough partitions
val dataForConversion = if (data.getNumPartitions < executorNum) {
Expand Down Expand Up @@ -113,7 +114,7 @@ class KMeansDALImpl (

val results = coalescedTables.mapPartitionsWithIndex { (rank, table) =>
val tableArr = table.next()
OneCCL.init(executorNum, rank, executorIPAddress)
OneCCL.init(executorNum, rank, kvsIP)

val initCentroids = OneDAL.makeNumericTable(centers)
val result = new KMeansResult()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ class PCADALImpl (
val coalescedTables = OneDAL.rddVectorToNumericTables(normalizedData, executorNum)

val executorIPAddress = Utils.sparkFirstExecutorIP(input.sparkContext)
val kvsIP = input.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)

val results = coalescedTables.mapPartitionsWithIndex { (rank, table) =>
val tableArr = table.next()
OneCCL.init(executorNum, rank, executorIPAddress)
OneCCL.init(executorNum, rank, kvsIP)

val result = new PCAResult()
cPCATrainDAL(
Expand Down

0 comments on commit 318cae1

Please sign in to comment.