Skip to content

Commit

Permalink
[ML-340] Make log output the specific path to support automatic gener…
Browse files Browse the repository at this point in the history
…ate result. (#341)

* set specific path from system env for recording breakdown

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* Change get record path method

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

---------

Signed-off-by: minmingzhu <minming.zhu@intel.com>
  • Loading branch information
minmingzhu authored Aug 23, 2023
1 parent ce66e17 commit 96d092f
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 11 deletions.
12 changes: 8 additions & 4 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import org.apache.spark.ml.linalg.Vector
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext}

import java.net.InetAddress
import java.io.{File, FileWriter, BufferedWriter}
import java.io.{BufferedWriter, File, FileWriter}
import java.nio.file.Paths
import java.time.LocalDateTime
import java.time.Duration
import java.time.format.DateTimeFormatter
Expand Down Expand Up @@ -80,17 +82,19 @@ object Utils {
}
}

class AlgoTimeMetrics(val algoName: String) {
class AlgoTimeMetrics(val algoName: String, val sparkContext: SparkContext) {
val timeZoneName = List("Preprocessing", "Data Convertion", "Training")
val algoTimeStampList = timeZoneName.map((x: String) => (x, new Utils.AlgoTimeStamp(x))).toMap
val recorderName = Utils.GlobalTimeTable.register(this)
val timeFileName = recorderName + "time_breakdown"

val redirectPath = sparkContext.getConf.get("spark.oap.mllib.record.output.path",
Paths.get("").toAbsolutePath.toString)
val currentDirectory = redirectPath + "/" + timeFileName
val timerEnabled = isTimerEnabled()

def record(stampName: String): Unit = {
if (timerEnabled) {
algoTimeStampList(stampName).update(timeFileName)
algoTimeStampList(stampName).update(currentDirectory)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ class RandomForestClassifierDALImpl(val uid: String,
labelCol: String,
featuresCol: String): (util.Map[Integer, util.ArrayList[LearningNode]]) = {

val rfcTimer = new Utils.AlgoTimeMetrics("RandomForestClassifier")
logInfo(s"RandomForestClassifierDALImpl executorNum : " + executorNum)
val sparkContext = labeledPoints.rdd.sparkContext
val rfcTimer = new Utils.AlgoTimeMetrics("RandomForestClassifier", sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
// used run Random Forest unit test
val isTest = sparkContext.getConf.getBoolean("spark.oap.mllib.isTest", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class KMeansDALImpl(var nClusters: Int,
) extends Serializable with Logging {

def train(data: RDD[Vector]): MLlibKMeansModel = {
val kmeansTimer = new Utils.AlgoTimeMetrics("KMeans")
val sparkContext = data.sparkContext
val kmeansTimer = new Utils.AlgoTimeMetrics("KMeans", sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
kmeansTimer.record("Preprocessing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ class PCADALImpl(val k: Int,
extends Serializable with Logging {

def train(data: RDD[Vector]): PCADALModel = {
val pcaTimer = new Utils.AlgoTimeMetrics("PCA")
val normalizedData = normalizeData(data)
val sparkContext = normalizedData.sparkContext
val pcaTimer = new Utils.AlgoTimeMetrics("PCA", sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
pcaTimer.record("Preprocessing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,
labelCol: String,
featuresCol: String): LinearRegressionDALModel = {

val lrTimer = new Utils.AlgoTimeMetrics("LinearRegression")
val sparkContext = labeledPoints.sparkSession.sparkContext
val lrTimer = new Utils.AlgoTimeMetrics("LinearRegression", sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ class RandomForestRegressorDALImpl(val uid: String,
def train(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String): (util.Map[Integer, util.ArrayList[LearningNode]]) = {
val rfrTimer = new Utils.AlgoTimeMetrics("RandomForestRegressor")
logInfo(s"RandomForestRegressorDALImpl executorNum : " + executorNum)
val sparkContext = labeledPoints.rdd.sparkContext
val rfrTimer = new Utils.AlgoTimeMetrics("RandomForestRegressor", sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
// used run Random Forest unit test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class CorrelationDALImpl(
extends Serializable with Logging {

def computeCorrelationMatrix(data: RDD[Vector]): Matrix = {
val corTimer = new Utils.AlgoTimeMetrics("Correlation")
val sparkContext = data.sparkContext
val corTimer = new Utils.AlgoTimeMetrics("Correlation", sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
corTimer.record("Preprocessing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class SummarizerDALImpl(val executorNum: Int,
extends Serializable with Logging {

def computeSummarizerMatrix(data: RDD[Vector]): Summary = {
val sumTimer = new Utils.AlgoTimeMetrics("Summarizer")
val sparkContext = data.sparkContext
val sumTimer = new Utils.AlgoTimeMetrics("Summarizer", sparkContext)
val useDevice = sparkContext.getConf.get("spark.oap.mllib.device", Utils.DefaultComputeDevice)
val computeDevice = Common.ComputeDevice.getDeviceByName(useDevice)
sumTimer.record("Preprocessing")
Expand Down

0 comments on commit 96d092f

Please sign in to comment.