Skip to content

Commit

Permalink
[ML-33] Optimize oneCCL port detecting (#34)
Browse files Browse the repository at this point in the history
* Add missing build.sh

* Add ALS with oneDAL backend

* Add IntelALSSuite

* fix shuffle_all2all func declare

* Rename ALS rank to nFactors and name conflict with oneCCL rank

* Fix test.sh

* use repartition to workaround partition uneven

* Use getifaddr instead of hostname -I

* add synchronized to getAvailPort and use dataForConversion
  • Loading branch information
xwu99 authored Mar 8, 2021
1 parent 6fe5d3e commit c07d70c
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 44 deletions.
79 changes: 66 additions & 13 deletions mllib-dal/src/main/native/OneCCL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
#include <sys/socket.h>
#include <unistd.h>

#include <list>
#include <ifaddrs.h>
#include <netdb.h>

#include <oneapi/ccl.hpp>

#include "org_apache_spark_ml_util_OneCCL__.h"
Expand Down Expand Up @@ -112,30 +116,79 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_setEnv
return err;
}

#define GET_IP_CMD "hostname -I"
#define MAX_KVS_VAL_LENGTH 130
#define READ_ONLY "r"
static const int CCL_IP_LEN = 128;
std::list<std::string> local_host_ips;

static int fill_local_host_ip() {
struct ifaddrs *ifaddr, *ifa;
int family = AF_UNSPEC;
char local_ip[CCL_IP_LEN];
if (getifaddrs(&ifaddr) < 0) {
// LOG_ERROR("fill_local_host_ip: can not get host IP");
return -1;
}

const char iface_name[] = "lo";
local_host_ips.clear();

for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
if (ifa->ifa_addr == NULL)
continue;
if (strstr(ifa->ifa_name, iface_name) == NULL) {
family = ifa->ifa_addr->sa_family;
if (family == AF_INET) {
memset(local_ip, 0, CCL_IP_LEN);
int res = getnameinfo(
ifa->ifa_addr,
(family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6),
local_ip,
CCL_IP_LEN,
NULL,
0,
NI_NUMERICHOST);
if (res != 0) {
std::string s("fill_local_host_ip: getnameinfo error > ");
s.append(gai_strerror(res));
// LOG_ERROR(s.c_str());
return -1;
}
local_host_ips.push_back(local_ip);
}
}
}
if (local_host_ips.empty()) {
// LOG_ERROR("fill_local_host_ip: can't find interface to get host IP");
return -1;
}
// memset(local_host_ip, 0, CCL_IP_LEN);
// strncpy(local_host_ip, local_host_ips.front().c_str(), CCL_IP_LEN);

// for (auto &ip : local_host_ips)
// cout << ip << endl;

freeifaddrs(ifaddr);
return 0;
}

static bool is_valid_ip(char ip[]) {
FILE *fp;
// TODO: use getifaddrs instead of popen
if ((fp = popen(GET_IP_CMD, READ_ONLY)) == NULL) {
printf("Can't get host IP\n");
exit(1);
if (fill_local_host_ip() == -1) {
std::cerr << "fill_local_host_ip error" << std::endl;
};
for (std::list<std::string>::iterator it = local_host_ips.begin(); it != local_host_ips.end(); ++it) {
if (*it == ip) {
return true;
}
char host_ips[MAX_KVS_VAL_LENGTH];
fgets(host_ips, MAX_KVS_VAL_LENGTH, fp);
pclose(fp);
}

return strstr(host_ips, ip) ? true : false;
return false;
}

/*
* Class: org_apache_spark_ml_util_OneCCL__
* Method: getAvailPort
* Signature: (Ljava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_getAvailPort
JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1getAvailPort
(JNIEnv *env, jobject obj, jstring localIP) {

// start from beginning of dynamic port
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,20 @@ class KMeansDALImpl (

instr.foreach(_.logInfo(s"Processing partitions with $executorNum executors"))

val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext)
val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)

val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP)
val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val kvsIPPort = kvsIP+"_"+kvsPort

// repartition to executorNum if not enough partitions
val dataForConversion = if (data.getNumPartitions < executorNum) {
data.repartition(executorNum).setName("Repartitioned for conversion").cache()
} else {
data
}

val executorIPAddress = Utils.sparkFirstExecutorIP(dataForConversion.sparkContext)
val kvsIP = dataForConversion.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)
val kvsPortDetected = Utils.checkExecutorAvailPort(dataForConversion, kvsIP)
val kvsPort = dataForConversion.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val kvsIPPort = kvsIP+"_"+kvsPort

val partitionDims = Utils.getPartitionDims(dataForConversion)

// filter the empty partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class PCADALImpl (

val coalescedTables = OneDAL.rddVectorToNumericTables(normalizedData, executorNum)

val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext)
val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)
val executorIPAddress = Utils.sparkFirstExecutorIP(coalescedTables.sparkContext)
val kvsIP = coalescedTables.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)

val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP)
val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)
val kvsPortDetected = Utils.checkExecutorAvailPort(coalescedTables, kvsIP)
val kvsPort = coalescedTables.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val kvsIPPort = kvsIP+"_"+kvsPort

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,16 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag](

logInfo(s"ALSDAL fit using $executorNum Executors for $nVectors vectors and $nFeatures features")

val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext)
val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)
val numericTables = data.repartition(executorNum).setName("Repartitioned for conversion").cache()

val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP)
val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)
val executorIPAddress = Utils.sparkFirstExecutorIP(numericTables.sparkContext)
val kvsIP = numericTables.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress)

val kvsIPPort = kvsIP+"_"+kvsPort
val kvsPortDetected = Utils.checkExecutorAvailPort(numericTables, kvsIP)
val kvsPort = numericTables.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected)

val numericTables = data.repartition(executorNum).setName("Repartitioned for conversion").cache()
val kvsIPPort = kvsIP+"_"+kvsPort

/*
val numericTables = if (data.getNumPartitions < executorNum) {
data.repartition(executorNum).setName("Repartitioned for conversion").cache()
} else {
data.coalesce(executorNum).setName("Coalesced for conversion").cache()
}
*/
val results = numericTables
// Transpose the dataset
.map { p =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ object OneCCL extends Logging {
c_cleanup()
}

def getAvailPort(localIP: String): Int = synchronized {
c_getAvailPort(localIP)
}

@native private def c_init(size: Int, rank: Int, ip_port: String, param: CCLParam) : Int
@native private def c_cleanup() : Unit

@native def isRoot() : Boolean
@native def rankID() : Int

@native def setEnv(key: String, value: String, overwrite: Boolean = true): Int
@native def getAvailPort(localIP: String): Int
@native def c_getAvailPort(localIP: String): Int
}
6 changes: 3 additions & 3 deletions mllib-dal/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ export LD_PRELOAD=$JAVA_HOME/jre/lib/amd64/libjsig.so
# -Dtest=none to turn off the Java tests

# Test all
mvn -Dtest=none -Dmaven.test.skip=false test
# mvn -Dtest=none -Dmaven.test.skip=false test

# Individual test
# mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.clustering.IntelKMeansSuite test
# mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.feature.IntelPCASuite test
mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.clustering.IntelKMeansSuite test
mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.feature.IntelPCASuite test
# mvn -Dtest=none -DwildcardSuites=org.apache.spark.ml.recommendation.IntelALSSuite test

0 comments on commit c07d70c

Please sign in to comment.