-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathTemporalRegressionWithSlidingWindow.scala
128 lines (105 loc) · 5.45 KB
/
TemporalRegressionWithSlidingWindow.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package examples
import org.apache.spark.sql.SparkSession
import org.incal.core.{PlotSetting, PlotlyPlotter}
import org.incal.spark_ml.SparkUtil._
import org.incal.spark_ml.models.regression._
import org.incal.spark_ml.models.result.RegressionResultsHolder
import org.incal.spark_ml.models.setting.{RegressionLearningSetting, TemporalRegressionLearningSetting}
import org.incal.spark_ml.models.TreeCore
import org.incal.spark_ml.{MLResultUtil, SparkMLApp, SparkMLService}
import scala.concurrent.ExecutionContext.Implicits.global
object TemporalRegressionWithSlidingWindow extends SparkMLApp((session: SparkSession, mlService: SparkMLService) => {
object Column extends Enumeration {
val index, Date, SP500, Dividend, Earnings, ConsumerPriceIndex, LongInterestRate, RealPrice, RealDividend, RealEarnings, PE10,
SP500Change, DividendChange, EarningsChange, ConsumerPriceIndexChange, LongInterestRateChange, RealPriceChange, RealDividendChange, RealEarningsChange, PE10Change = Value
}
val featureColumnNames = Seq(
Column.SP500Change, Column.DividendChange, Column.EarningsChange, Column.ConsumerPriceIndexChange,
Column.LongInterestRateChange, Column.RealPriceChange, Column.RealDividendChange, Column.RealEarningsChange, Column.PE10Change
).map(_.toString)
val outputColumnName = Column.SP500Change.toString
// read a csv and create a data frame with given column names
val url = "https://bit.ly/2OmhfOD" // SAP
val df = remoteCsvToDataFrame(url, true)(session)
// turn the data frame into ML-ready one with features and a label
val finalDf = prepFeaturesDataFrame(featureColumnNames.toSet, Some(outputColumnName))(df)
// linear regression spec
val linearRegressionSpec = LinearRegression(
maxIteration = Left(Some(200)),
regularization = Right(Seq(1, 0.1, 0.01, 0.001)),
elasticMixingRatio = Right(Seq(0, 0.5, 1))
)
// random regression forest spec
val randomRegressionForestSpec = RandomRegressionForest(
core = TreeCore(maxDepth = Right(Seq(4,5,6)))
)
// gradient-boost regression tree spec
val gradientBoostRegressionTreeSpec = GradientBoostRegressionTree(
core = TreeCore(maxDepth = Right(Seq(4,5,6))),
maxIteration = Left(Some(50))
)
// learning setting
val regressionLearningSetting = RegressionLearningSetting(
trainingTestSplitRatio = Some(0.8),
// featuresNormalizationType = Some(VectorScalerType.StandardScaler),
crossValidationEvalMetric = Some(RegressionEvalMetric.rmse),
crossValidationFolds = Some(5),
collectOutputs = true
)
val temporalLearningSetting = TemporalRegressionLearningSetting(
core = regressionLearningSetting,
predictAhead = 1,
slidingWindowSize = Right(Seq(6, 12))
)
// aux function to get a mean training and test RMSE and MAE
def calcMeanRMSEAndMAE(results: RegressionResultsHolder) = {
val metricStatsMap = MLResultUtil.calcMetricStats(results.performanceResults)
val (trainingRMSE, Some(testRMSE), _) = metricStatsMap.get(RegressionEvalMetric.rmse).get
val (trainingMAE, Some(testMAE), _) = metricStatsMap.get(RegressionEvalMetric.mae).get
((trainingRMSE.mean, testRMSE.mean), (trainingMAE.mean, testMAE.mean))
}
// aux function to export outputs using GNU plot (must be installed)
def exportOutputs(results: RegressionResultsHolder, fileName: String, size: Int) = {
val outputs = results.expectedActualOutputs.head
val trainingOutputs = outputs._1
val testOutputs = outputs._2
export(trainingOutputs, "training")
export(testOutputs, "test")
def export(outputsx: Seq[(Double, Double)], prefix: String) = {
val y = outputsx.map { case (_, y) => y }.take(size)
val yhat = outputsx.map { case (yhat, _) => yhat }.take(size)
PlotlyPlotter.plotLines(
data = Seq(y, yhat),
setting = PlotSetting(
title = Some("Outputs"),
xLabel = Some("Time"),
yLabel = Some("Value"),
showLegend = true,
captions = Seq("Actual Output", "Expected Output")
),
outputFileName = prefix + "-" + fileName
)
}
}
for {
// run the linear regression and get results
lrResults <- mlService.regressTimeSeries(finalDf, linearRegressionSpec, temporalLearningSetting)
// run the random regression forest and get results
rrfResults <- mlService.regressTimeSeries(finalDf, randomRegressionForestSpec, temporalLearningSetting)
// run the gradient boost regression and get results
gbrtResults <- mlService.regressTimeSeries(finalDf, gradientBoostRegressionTreeSpec, temporalLearningSetting)
} yield {
val ((lrTrainingRMSE, lrTestRMSE), (lrTrainingMAE, lrTestMAE)) = calcMeanRMSEAndMAE(lrResults)
val ((rrfTrainingRMSE, rrfTestRMSE), (rrfTrainingMAE, rrfTestMAE)) = calcMeanRMSEAndMAE(rrfResults)
val ((gbrtTrainingRMSE, gbrtTestRMSE), (gbrtTrainingMAE, gbrtTestMAE)) = calcMeanRMSEAndMAE(gbrtResults)
println(s"Linear Regression RMSE: $lrTrainingRMSE / $lrTestRMSE")
println(s"Linear Regression MAE: $lrTrainingMAE / $lrTestMAE")
println(s"Random Regression Forest RMSE: $rrfTrainingRMSE / $rrfTestRMSE")
println(s"Random Regression Forest MAE: $rrfTrainingMAE / $rrfTestMAE")
println(s"Gradient Boost Regression RMSE: $gbrtTrainingRMSE / $gbrtTestRMSE")
println(s"Gradient Boost Regression MAE: $gbrtTrainingMAE / $gbrtTestMAE")
exportOutputs(lrResults, "lrOutputs.html", 300)
exportOutputs(rrfResults, "rrfOutputs.html", 300)
exportOutputs(gbrtResults, "gbrtOutputs.html", 300)
}
})