diff --git a/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java b/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java index d8ea09a23..ed83f3fe8 100644 --- a/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java +++ b/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java @@ -21,8 +21,7 @@ import java.io.*; import java.util.UUID; import java.util.logging.Level; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.logging.Logger; import com.intel.daal.utils.LibUtils; @@ -31,7 +30,8 @@ public final class LibLoader { // Make sure loading libraries from different temp directory for each process private final static String subDir = "MLlibDAL_" + UUID.randomUUID(); - private static final Logger log = LoggerFactory.getLogger("LibLoader"); + private static final Logger logger = Logger.getLogger(LibLoader.class.getName()); + private static final Level logLevel = Level.INFO; /** * Get temp dir for exacting lib files @@ -81,12 +81,12 @@ private static synchronized void loadLibMLlibDAL() throws IOException { * @param name library name */ private static void loadFromJar(String path, String name) throws IOException { - log.debug("Loading " + name + " ..."); + logger.log(logLevel, "Loading " + name + " ..."); File fileOut = createTempFile(path, name); // File exists already if (fileOut == null) { - log.debug("DONE: Loading library as resource."); + logger.log(logLevel, "DONE: Loading library as resource."); return; } @@ -96,7 +96,7 @@ private static void loadFromJar(String path, String name) throws IOException { } try (OutputStream streamOut = new FileOutputStream(fileOut)) { - log.debug("Writing resource to temp file."); + logger.log(logLevel, "Writing resource to temp file."); byte[] buffer = new byte[32768]; while (true) { @@ -115,7 +115,7 @@ private static void loadFromJar(String path, String name) throws IOException { } System.load(fileOut.toString()); - log.debug("DONE: Loading library as resource."); + logger.log(logLevel, "DONE: Loading library as resource."); } /** diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index 3927968e6..0f6c774c1 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -23,7 +23,7 @@ ccl::communicator &getComm() { JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init (JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port, jobject param) { - std::cerr << "OneCCL (native): init" << std::endl; + std::cout << "oneCCL (native): init" << std::endl; auto t1 = std::chrono::high_resolution_clock::now(); @@ -42,7 +42,7 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init auto t2 = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast( t2 - t1 ).count(); - std::cerr << "OneCCL (native): init took " << duration << " secs" << std::endl; + std::cout << "oneCCL (native): init took " << duration << " secs" << std::endl; rank_id = getComm().rank(); comm_size = getComm().size(); @@ -68,7 +68,7 @@ JNIEXPORT void JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1cleanup g_comms.pop_back(); - std::cerr << "OneCCL (native): cleanup" << std::endl; + std::cout << "oneCCL (native): cleanup" << std::endl; } @@ -112,24 +112,6 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_setEnv return err; } -#define GET_IP_CMD "hostname -I" -#define MAX_KVS_VAL_LENGTH 130 -#define READ_ONLY "r" - -static bool is_valid_ip(char ip[]) { - FILE *fp; - // TODO: use getifaddrs instead of popen - if ((fp = popen(GET_IP_CMD, READ_ONLY)) == NULL) { - printf("Can't get host IP\n"); - exit(1); - } - char host_ips[MAX_KVS_VAL_LENGTH]; - fgets(host_ips, MAX_KVS_VAL_LENGTH, fp); - pclose(fp); - - return strstr(host_ips, ip) ? true : false; -} - /* * Class: org_apache_spark_ml_util_OneCCL__ * Method: getAvailPort @@ -138,18 +120,12 @@ static bool is_valid_ip(char ip[]) { JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_getAvailPort (JNIEnv *env, jobject obj, jstring localIP) { - // start from beginning of dynamic port const int port_start_base = 3000; char* local_host_ip = (char *) env->GetStringUTFChars(localIP, NULL); - // check if the input ip is one of host's ips - if (!is_valid_ip(local_host_ip)) - return -1; - struct sockaddr_in main_server_address; int server_listen_sock; - in_port_t port = port_start_base; if ((server_listen_sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("OneCCL (native) getAvailPort error!"); @@ -158,19 +134,17 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_getAvailPort main_server_address.sin_family = AF_INET; main_server_address.sin_addr.s_addr = inet_addr(local_host_ip); - main_server_address.sin_port = htons(port); + main_server_address.sin_port = port_start_base; - // search for available port while (bind(server_listen_sock, (const struct sockaddr *)&main_server_address, sizeof(main_server_address)) < 0) { - port++; - main_server_address.sin_port = htons(port); + main_server_address.sin_port++; } - close(server_listen_sock); + close(server_listen_sock); env->ReleaseStringUTFChars(localIP, local_host_ip); - return port; + return main_server_address.sin_port; } diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala index f531b46a5..2ac551745 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala @@ -43,7 +43,7 @@ class KMeansDALImpl ( val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext) val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) - val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP) + val kvsPortDetected = Utils.checkExecutorAvailPort(data.sparkContext, kvsIP) val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) val kvsIPPort = kvsIP+"_"+kvsPort @@ -70,14 +70,14 @@ class KMeansDALImpl ( val it = entry._3 val numCols = partitionDims(index)._2 - logDebug(s"KMeansDALImpl: Partition index: $index, numCols: $numCols, numRows: $numRows") + println(s"KMeansDALImpl: Partition index: $index, numCols: $numCols, numRows: $numRows") // Build DALMatrix, this will load libJavaAPI, libtbb, libtbbmalloc val context = new DaalContext() val matrix = new DALMatrix(context, classOf[java.lang.Double], numCols.toLong, numRows.toLong, NumericTable.AllocationFlag.DoAllocate) - logDebug("KMeansDALImpl: Loading native libraries" ) + println("KMeansDALImpl: Loading native libraries" ) // oneDAL libs should be loaded by now, extract libMLlibDAL.so to temp file and load LibLoader.loadLibraries() diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala index 1b3f9ddf0..15ee0538e 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala @@ -18,20 +18,19 @@ package org.apache.spark.ml.feature import java.util.Arrays + import com.intel.daal.data_management.data.{HomogenNumericTable, NumericTable} -import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg._ import org.apache.spark.ml.util.{OneCCL, OneDAL, Utils} import org.apache.spark.mllib.feature.{PCAModel => MLlibPCAModel} import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, Vectors => OldVectors} import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.feature.{StandardScaler => MLlibStandardScaler} +import org.apache.spark.mllib.feature.{ StandardScaler => MLlibStandardScaler } class PCADALImpl ( val k: Int, val executorNum: Int, - val executorCores: Int) - extends Serializable with Logging { + val executorCores: Int) extends Serializable { // Normalize data before apply fitWithDAL private def normalizeData(input: RDD[Vector]) : RDD[Vector] = { @@ -50,7 +49,7 @@ class PCADALImpl ( val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext) val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) - val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP) + val kvsPortDetected = Utils.checkExecutorAvailPort(data.sparkContext, kvsIP) val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) val kvsIPPort = kvsIP+"_"+kvsPort diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala index 32b66a247..a0a1679e9 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala @@ -17,24 +17,50 @@ package org.apache.spark.ml.util -import org.apache.spark.internal.Logging +import org.apache.spark.SparkConf -object OneCCL extends Logging { +object OneCCL { var cclParam = new CCLParam() +// var kvsIPPort = sys.env.getOrElse("CCL_KVS_IP_PORT", "") +// var worldSize = sys.env.getOrElse("CCL_WORLD_SIZE", "1").toInt + +// var kvsPort = 5000 + +// private def checkEnv() { +// val altTransport = sys.env.getOrElse("CCL_ATL_TRANSPORT", "") +// val pmType = sys.env.getOrElse("CCL_PM_TYPE", "") +// val ipExchange = sys.env.getOrElse("CCL_KVS_IP_EXCHANGE", "") +// +// assert(altTransport == "ofi") +// assert(pmType == "resizable") +// assert(ipExchange == "env") +// assert(kvsIPPort != "") +// +// } + // Run on Executor - def setExecutorEnv(): Unit = { - setEnv("CCL_ATL_TRANSPORT","ofi") - // Uncomment this if you whant to debug oneCCL - // setEnv("CCL_LOG_LEVEL", "2") - } +// def setExecutorEnv(executor_num: Int, ip: String, port: Int): Unit = { +// // Work around ccl by passings in a spark.executorEnv.CCL_KVS_IP_PORT. +// val ccl_kvs_ip_port = sys.env.getOrElse("CCL_KVS_IP_PORT", s"${ip}_${port}") +// +// println(s"oneCCL: Initializing with CCL_KVS_IP_PORT: $ccl_kvs_ip_port") +// +// setEnv("CCL_PM_TYPE", "resizable") +// setEnv("CCL_ATL_TRANSPORT","ofi") +// setEnv("CCL_ATL_TRANSPORT_PATH", LibLoader.getTempSubDir()) +// setEnv("CCL_KVS_IP_EXCHANGE","env") +// setEnv("CCL_KVS_IP_PORT", ccl_kvs_ip_port) +// setEnv("CCL_WORLD_SIZE", s"${executor_num}") +// // Uncomment this if you whant to debug oneCCL +// // setEnv("CCL_LOG_LEVEL", "2") +// } def init(executor_num: Int, rank: Int, ip_port: String) = { - setExecutorEnv() - - logInfo(s"Initializing with IP_PORT: ${ip_port}") +// setExecutorEnv(executor_num, ip, port) + println(s"oneCCL: Initializing with IP_PORT: ${ip_port}") // cclParam is output from native code c_init(executor_num, rank, ip_port, cclParam) @@ -42,7 +68,11 @@ object OneCCL extends Logging { // executor number should equal to oneCCL world size assert(executor_num == cclParam.commSize, "executor number should equal to oneCCL world size") - logInfo(s"Initialized with executorNum: $executor_num, commSize, ${cclParam.commSize}, rankId: ${cclParam.rankId}") + println(s"oneCCL: Initialized with executorNum: $executor_num, commSize, ${cclParam.commSize}, rankId: ${cclParam.rankId}") + + // Use a new port when calling init again +// kvsPort = kvsPort + 1 + } // Run on Executor diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala index aa8eb8979..14cf1ab27 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala @@ -71,13 +71,13 @@ object Utils { ip } - def checkExecutorAvailPort(data: RDD[_], localIP: String) : Int = { - val sc = data.sparkContext - val result = data.mapPartitions { p => + def checkExecutorAvailPort(sc: SparkContext, localIP: String) : Int = { + val executor_num = Utils.sparkExecutorNum(sc) + val data = sc.parallelize(1 to executor_num, executor_num) + val result = data.mapPartitionsWithIndex { (index, p) => LibLoader.loadLibraries() - val port = OneCCL.getAvailPort(localIP) - if (port != -1) - Iterator(port) + if (index == 0) + Iterator(OneCCL.getAvailPort(localIP)) else Iterator() }.collect()