forked from LorenzoFramba/Flight_Delay_Prediction
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cleanData.py
125 lines (103 loc) · 5.26 KB
/
cleanData.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Bucketizer
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql import functions as sf
class Clean:
"""
Conducts data preprocessing and transformation of the selected feature vectors
"""
def __init__(self, config, df, spark, sc):
self.cfg = config
self.spark = spark
self.sc = sc
self.df = self.changeVar(df)
self.df = self.filterAndTransform(self.df)
self.X = self.variable_selection()
self.OneHotEncoder()
def filterAndTransform(self, df):
"""
Removes rows containing Nan, filters out cancelled flights and creates new variables as
the combination of the existing.
"""
# removing as is stated in the task along with the 'Year' and 'DepTime'
col_to_drop = ['ArrTime',
'ActualElapsedTime',
'AirTime',
'TaxiIn',
'Diverted',
'CarrierDelay',
'WeatherDelay',
'NASDelay',
'SecurityDelay',
'LateAircraftDelay',
'Year',
'TailNum',
'CancellationCode'] # Only those 3 I added up to delay, others
# are delayed as is stated in the task
df = df.drop(*col_to_drop)
df = df.filter("Cancelled == 0") # select only those flights that happened
df = df.drop("Cancelled")
df = df.drop(*["UniqueCarrier",
"DayofMonth",
"FlightNum"]) # Droping unimportant categorical variables
df = df.na.drop("any")
df = df.withColumn('OrigDest',
sf.concat(sf.col('Origin'), sf.lit('_'), sf.col('Dest')))
df = df.drop(*["Origin", "Dest"])
df = df.withColumn("Speed", sf.round(col("Distance") / col("CRSElapsedTime"), 2).cast(DoubleType()))
return df
def changeVar(self, df):
"""
Ensures that variables are assigned to the right data type
"""
# "ArrDelay" and "DepDelay" have string type. We cast them to Integer
df = df.withColumn("ArrDelay", df["ArrDelay"].cast(IntegerType()))
df = df.withColumn("DepDelay", df["DepDelay"].cast(IntegerType()))
df = df.withColumn("CRSDepTime", df["CRSDepTime"].cast(IntegerType()))
df = df.withColumn("CRSArrTime", df["CRSArrTime"].cast(IntegerType()))
df = df.withColumn("DepTime", df["DepTime"].cast(IntegerType()))
df = df.withColumn("DayOfWeek", df["DayOfWeek"].cast(IntegerType()))
return df
def OneHotEncoder(self):
"""
Converts string-type categories to indexes, splits continuous data interval to indexes,
encodes the categorical data using One-Hot encoding.
"""
splits = [-float("inf"), 500, 1200, 1700, float("inf")]
self.bucketizer = Bucketizer(splitsArray=[splits, splits, splits],
inputCols=["CRSDepTime",
"CRSArrTime",
"DepTime"],
outputCols=["CatCRSDepTime",
"CatCRSArrTime",
"CatDepTime"])
self.varIdxer = StringIndexer(inputCol="OrigDest",
outputCol="IndOrigDest").setHandleInvalid("skip")
self.oneHot = OneHotEncoder(inputCols=['Month',
'DayOfWeek',
'CatCRSDepTime',
'CatCRSArrTime',
'IndOrigDest',
'CatDepTime'],
outputCols=['HotMonth',
'HotDayOfWeek',
'HotCRSCatDepTime',
'HotCRSCatArrTime',
'HotIndOrigDest',
'HotDepTime']).setHandleInvalid("keep")
def variable_selection(self):
"""
Based on user input selects the variables vectors to process
"""
X = []
if self.cfg.variables == 'X1':
X.append({"name": "X1", "variables": ['DepDelay', 'TaxiOut']})
elif self.cfg.variables == 'all':
X.append({"name": "X1", "variables": ['DepDelay', 'TaxiOut']})
X.append({"name": "X2", "variables": ['DepDelay', 'TaxiOut', 'HotDepTime']})
X.append({"name": "X3", "variables": ['DepDelay', 'TaxiOut', 'HotDayOfWeek', 'Speed']})
X.append({"name": "X4", "variables": ['DepDelay', 'TaxiOut', 'HotDayOfWeek', 'Speed', 'HotMonth']})
X.append({"name": "X5", "variables": ['DepDelay', 'TaxiOut', 'Speed', 'HotDepTime', 'HotCRSCatArrTime']})
elif self.cfg.variables == 'best':
X.append({"name": "X5", "variables": ['DepDelay', 'TaxiOut', 'Speed', 'HotDepTime', 'HotCRSCatArrTime']})
return X