From 96d092f310ccbce71f395c50f0a9e2002254f362 Mon Sep 17 00:00:00 2001 From: minmingzhu <45281494+minmingzhu@users.noreply.github.com> Date: Wed, 23 Aug 2023 16:25:23 +0800 Subject: [PATCH] [ML-340] Make log output the specific path to support automatic generate result. (#341) * set specific path from system env for recording breakdown Signed-off-by: minmingzhu * update Signed-off-by: minmingzhu * Change get record path method Signed-off-by: minmingzhu * update Signed-off-by: minmingzhu --------- Signed-off-by: minmingzhu --- .../src/main/scala/com/intel/oap/mllib/Utils.scala | 12 ++++++++---- .../RandomForestClassifierDALImpl.scala | 2 +- .../intel/oap/mllib/clustering/KMeansDALImpl.scala | 2 +- .../com/intel/oap/mllib/feature/PCADALImpl.scala | 2 +- .../mllib/regression/LinearRegressionDALImpl.scala | 2 +- .../regression/RandomForestRegressorDALImpl.scala | 2 +- .../intel/oap/mllib/stat/CorrelationDALImpl.scala | 2 +- .../com/intel/oap/mllib/stat/SummarizerDALImpl.scala | 2 +- 8 files changed, 15 insertions(+), 11 deletions(-) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/Utils.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/Utils.scala index e471eba97..5dacd8420 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/Utils.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/Utils.scala @@ -20,8 +20,10 @@ import org.apache.spark.ml.linalg.Vector import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext} + import java.net.InetAddress -import java.io.{File, FileWriter, BufferedWriter} +import java.io.{BufferedWriter, File, FileWriter} +import java.nio.file.Paths import java.time.LocalDateTime import java.time.Duration import java.time.format.DateTimeFormatter @@ -80,17 +82,19 @@ object Utils { } } - class AlgoTimeMetrics(val algoName: String) { + class AlgoTimeMetrics(val algoName: String, val sparkContext: SparkContext) { val timeZoneName = List("Preprocessing", "Data Convertion", "Training") val algoTimeStampList = timeZoneName.map((x: String) => (x, new Utils.AlgoTimeStamp(x))).toMap val recorderName = Utils.GlobalTimeTable.register(this) val timeFileName = recorderName + "time_breakdown" - + val redirectPath = sparkContext.getConf.get("spark.oap.mllib.record.output.path", + Paths.get("").toAbsolutePath.toString) + val currentDirectory = redirectPath + "/" + timeFileName val timerEnabled = isTimerEnabled() def record(stampName: String): Unit = { if (timerEnabled) { - algoTimeStampList(stampName).update(timeFileName) + algoTimeStampList(stampName).update(currentDirectory) } } diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index cb3ef3e52..2e427d056 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -53,9 +53,9 @@ class RandomForestClassifierDALImpl(val uid: String, labelCol: String, featuresCol: String): (util.Map[Integer, util.ArrayList[LearningNode]]) = { - val rfcTimer = new Utils.AlgoTimeMetrics("RandomForestClassifier") logInfo(s"RandomForestClassifierDALImpl executorNum : " + executorNum) val sparkContext = labeledPoints.rdd.sparkContext + val rfcTimer = new Utils.AlgoTimeMetrics("RandomForestClassifier", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) // used run Random Forest unit test val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 3a23a1d65..f8652430f 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -37,8 +37,8 @@ class KMeansDALImpl(var nClusters: Int, ) extends Serializable with Logging { def train(data: RDD[Vector]): MLlibKMeansModel = { - val kmeansTimer = new Utils.AlgoTimeMetrics("KMeans") val sparkContext = data.sparkContext + val kmeansTimer = new Utils.AlgoTimeMetrics("KMeans", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) kmeansTimer.record("Preprocessing") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 4f0807abd..c7a076a35 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -43,9 +43,9 @@ class PCADALImpl(val k: Int, extends Serializable with Logging { def train(data: RDD[Vector]): PCADALModel = { - val pcaTimer = new Utils.AlgoTimeMetrics("PCA") val normalizedData = normalizeData(data) val sparkContext = normalizedData.sparkContext + val pcaTimer = new Utils.AlgoTimeMetrics("PCA", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) pcaTimer.record("Preprocessing") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index ee93892b2..86d574557 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -70,8 +70,8 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, labelCol: String, featuresCol: String): LinearRegressionDALModel = { - val lrTimer = new Utils.AlgoTimeMetrics("LinearRegression") val sparkContext = labeledPoints.sparkSession.sparkContext + val lrTimer = new Utils.AlgoTimeMetrics("LinearRegression", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index a4e9a0f78..a31943ee2 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -45,9 +45,9 @@ class RandomForestRegressorDALImpl(val uid: String, def train(labeledPoints: Dataset[_], labelCol: String, featuresCol: String): (util.Map[Integer, util.ArrayList[LearningNode]]) = { - val rfrTimer = new Utils.AlgoTimeMetrics("RandomForestRegressor") logInfo(s"RandomForestRegressorDALImpl executorNum : " + executorNum) val sparkContext = labeledPoints.rdd.sparkContext + val rfrTimer = new Utils.AlgoTimeMetrics("RandomForestRegressor", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) // used run Random Forest unit test diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index 20362b896..9e770ab9d 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -30,8 +30,8 @@ class CorrelationDALImpl( extends Serializable with Logging { def computeCorrelationMatrix(data: RDD[Vector]): Matrix = { - val corTimer = new Utils.AlgoTimeMetrics("Correlation") val sparkContext = data.sparkContext + val corTimer = new Utils.AlgoTimeMetrics("Correlation", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) corTimer.record("Preprocessing") diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index 3f108364c..da66842de 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -31,8 +31,8 @@ class SummarizerDALImpl(val executorNum: Int, extends Serializable with Logging { def computeSummarizerMatrix(data: RDD[Vector]): Summary = { - val sumTimer = new Utils.AlgoTimeMetrics("Summarizer") val sparkContext = data.sparkContext + val sumTimer = new Utils.AlgoTimeMetrics("Summarizer", sparkContext) val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice) val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice) sumTimer.record("Preprocessing")