diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala index d8829b2c9..31b7e7c75 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala @@ -41,6 +41,7 @@ class KMeansDALImpl ( instr.foreach(_.logInfo(s"Processing partitions with $executorNum executors")) val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext) + val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) // repartition to executorNum if not enough partitions val dataForConversion = if (data.getNumPartitions < executorNum) { @@ -113,7 +114,7 @@ class KMeansDALImpl ( val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, rank, executorIPAddress) + OneCCL.init(executorNum, rank, kvsIP) val initCentroids = OneDAL.makeNumericTable(centers) val result = new KMeansResult() diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala index 6f9aaa442..33dbe8349 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala @@ -47,10 +47,11 @@ class PCADALImpl ( val coalescedTables = OneDAL.rddVectorToNumericTables(normalizedData, executorNum) val executorIPAddress = Utils.sparkFirstExecutorIP(input.sparkContext) + val kvsIP = input.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, rank, executorIPAddress) + OneCCL.init(executorNum, rank, kvsIP) val result = new PCAResult() cPCATrainDAL(