-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
194 lines (165 loc) · 8.43 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
#!/usr/bin/env python
import sys
import pandas as pd
import numpy as np
from joblib import load
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, KBinsDiscretizer, FunctionTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline, make_pipeline, FeatureUnion
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
import great_expectations as ge
from datetime import datetime
from lib.constants import *
from lib.transformers import ftransformer_cut, ColumnSelector
if __name__ == "__main__":
# Check if the user supplied any cases which demonstrate the ability for
# Great Expectations to catch differences in what is expected from the data
if len(sys.argv) == 2:
passed_scenario = sys.argv[1]
else:
passed_scenario = None
# Create our great_expectations context
context = ge.data_context.DataContext()
# Validate raw data -------------------------------------------------------------------
# Check that the data inputs are what we expect before preprocessing and modeling
# NOTE: The expectations for this data asset were defined in the
# file entitled ''. That file must be run to create and save the expectation
# suite that is being used here to validate the new batches.
if passed_scenario == 'missing-column':
raw_dat = pd.read_csv('scenarios/raw-data-missing-column.csv')
else:
raw_dat = pd.read_csv('data/raw-data.csv')
data_asset_name = "raw-data"
expectation_suite_name = "default"
batch = context.get_batch(data_asset_name, expectation_suite_name, raw_dat)
run_id = datetime.utcnow().isoformat().replace(":", "") + "Z"
validation_result_raw_dat = batch.validate(run_id=run_id)
if validation_result_raw_dat["success"]:
print('Successfully validated raw data.')
else:
print('The following raw data expectations failed:')
for k in validation_result_raw_dat['results']:
if not k['success']:
print(k['expectation_config'])
assert validation_result_raw_dat["success"]
# Transform data ----------------------------------------------------------------------
# Now proceed to transform the data before modeling it using the concept of
# a scikit-learn pipeline which chains together "transformers"
# Create dummy variables transformer across species and color
ohe = OneHotEncoder(categories=[SPECIES, COLOR], drop='first', sparse=False)
ohe_cols = raw_dat.loc[:, ['species', 'color']]
ohe.fit(ohe_cols)
# Create ordinal variable transformer for beak_ratio
oe = OrdinalEncoder(categories=[BEAK_RATIO])
oe_cols = raw_dat.loc[:, ['beak_ratio']]
oe.fit(oe_cols)
# Create a transformer to bin claw_length across [0, 0.2, 0.4, 0.6, 0.8, 1]
ft = FunctionTransformer(ftransformer_cut,
kw_args={'bins': np.linspace(0, 1, 5)})
if passed_scenario == 'different-transformer':
# Load an unexpected transformer that bins wing_density into 4 quantiles
kbd = load('./scenarios/kbd.joblib')
else:
# Create a transformer to bin wing_density into 2 quantiles
kbd = KBinsDiscretizer(n_bins=2, encode='ordinal', strategy='quantile')
kbd_cols = raw_dat.loc[:, ['wing_density']]
kbd.fit(kbd_cols)
# String together the created transformers into a sklearn "pipeline"
preprocess_pipeline = make_pipeline(
FeatureUnion(transformer_list=[
("ohe", make_pipeline(
ColumnSelector(columns=['species', 'color']),
ohe
)),
("oe", make_pipeline(
ColumnSelector(columns=['beak_ratio']),
oe
)),
("ft", make_pipeline(
ColumnSelector(columns=['claw_length']),
ft
)),
("kbd", make_pipeline(
ColumnSelector(columns=['wing_density']),
kbd
)),
("label", make_pipeline(
ColumnSelector(columns=['weight'])
))
])
)
# Apply the pipeline to the raw data
# NOTE: The transformers are fitted based on the data loaded here when
# main.py is run, but could be pickled and loaded so that they are
# consistent across runs and batches.
modeling_dat = preprocess_pipeline.transform(raw_dat)
# Convert the numpy ndarray to a dataframe to write out as CSV and
# check the expectations
modeling_cols = ['V' + str(x) for x in range(modeling_dat.shape[1])]
modeling_dat_as_df = pd.DataFrame(modeling_dat, columns=modeling_cols)
modeling_dat_as_df.to_csv('./output/modeling-data.csv', index=False)
# Validate modeling data ---------------------------------------------------------------
# Check that the transformed data are what we expect before modeling
# NOTE: The expectations for this data asset were defined in the
# file entitled ''. That file must be run to create and save the expectation
# suite that is being used here to validate the new batches.
data_asset_name = "modeling-data"
expectation_suite_name = "default"
batch = context.get_batch(data_asset_name, expectation_suite_name, modeling_dat_as_df)
validation_result_modeling_dat = batch.validate(run_id=run_id)
if validation_result_modeling_dat["success"]:
print('Successfully validated modeling data.')
else:
print('The following modeling data expectations failed:')
for k in validation_result_modeling_dat['results']:
if not k['success']:
print(k['expectation_config'])
assert validation_result_modeling_dat["success"]
# Model the data -----------------------------------------------------------------------
# Split up the data into train and test, assuming last column is target
X = modeling_dat[:, :-1]
y = modeling_dat[:, -1]
X_train, X_test, \
y_train, y_test = train_test_split(X, y,
train_size=0.75,
test_size=0.25,
random_state=21)
# Build the final estimator step of our pipeline
# NOTE: This estimator does not need to be its own pipeline but for consistency
# against the preprocessing pipeline
rf = RandomForestRegressor(n_estimators=100, random_state=22)
modeling_pipeline = Pipeline([('rf', rf)])
# Tune the RandomForest parameters and make some predictions on the holdout test dataset
# NOTE: The choice of parameters and two fold cross validation are purely for
# demonstration to work with this small training dataset
param_grid = {'rf__n_estimators': [50, 100, 150], 'rf__max_depth': [10, 20, 30]}
rf_cv = GridSearchCV(modeling_pipeline, param_grid, iid=False, cv=2)
rf_cv.fit(X_train, y_train)
rf_tuned_hyperparameters = rf_cv.best_params_
if passed_scenario == 'holdout-outlier':
# Update the test value to be a complete outlier so that the expectations
# will not pass on the holdout error data.
y_test[0] = 999.99
# Save off the holdout predictions and errors
y_pred_tuned_forest = rf_cv.predict(X_test)
holdout_error_dat = pd.DataFrame({'actual': y_test, 'pred': y_pred_tuned_forest})
holdout_error_dat['error'] = holdout_error_dat['actual'] - holdout_error_dat['pred']
holdout_error_dat.to_csv('./output/holdout-error-data.csv', index=False)
# Validate holdout errors ---------------------------------------------------------------
# Check that the holdout errors are what we would typically see for this model
# NOTE: The expectations for this data asset were defined in the
# file entitled ''. That file must be run to create and save the expectation
# suite that is being used here to validate the new batches.
data_asset_name = "holdout-error-data"
expectation_suite_name = "default"
batch = context.get_batch(data_asset_name, expectation_suite_name, holdout_error_dat)
validation_result_holdout_error_dat = batch.validate(run_id=run_id)
if validation_result_holdout_error_dat["success"]:
print('Successfully validated holdout error data.')
else:
print('The following holdout error data expectations failed:')
for k in validation_result_holdout_error_dat['results']:
if not k['success']:
print(k['expectation_config'])
assert validation_result_holdout_error_dat["success"]