-
Notifications
You must be signed in to change notification settings - Fork 5
/
06-upload.R
342 lines (291 loc) · 11.5 KB
/
06-upload.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
#- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# 1. Setup ---------------------------------------------------------------------
#- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# NOTE: See DESCRIPTION for library dependencies and R/setup.R for
# variables used in each pipeline stage
# NOTE: This script requires CCAO employee access. See wiki for S3 credentials
# setup and multi-factor authentication help
# Load libraries, helpers, and recipes from files
purrr::walk(list.files("R/", "\\.R$", full.names = TRUE), source)
# Load various parameters as defined in the `finalize` step
metadata <- read_parquet(paths$output$metadata$local)
run_id <- metadata$run_id
#- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# 2. Upload --------------------------------------------------------------------
#- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
message("Uploading run artifacts")
# Only upload files if explicitly enabled
if (upload_enable) {
# Initialize a dictionary of paths AND S3 URIs specific to the run ID and year
paths <- model_file_dict(
run_id = run_id,
year = params$assessment$working_year
)
## 2.1. Train ----------------------------------------------------------------
# Upload lightgbm fit
aws.s3::put_object(
paths$output$workflow_fit$local,
paths$output$workflow_fit$s3
)
# Upload Tidymodels recipe
aws.s3::put_object(
paths$output$workflow_recipe$local,
paths$output$workflow_recipe$s3
)
# Upload finalized run parameters
read_parquet(paths$output$parameter_final$local) %>%
mutate(run_id = run_id) %>%
relocate(run_id) %>%
# Max_depth is set by lightsnip if link_max_depth is true, so we need to
# back out its value. Otherwise, use whichever value is chosen by CV
mutate(max_depth = {
if (link_max_depth) {
as.integer(floor(log2(num_leaves)) + add_to_linked_depth)
} else if (!is.null(.[["max_depth"]])) {
.$max_depth
} else {
NULL
}
}) %>%
write_parquet(paths$output$parameter_final$s3)
# Upload the test set predictions
read_parquet(paths$output$test_card$local) %>%
mutate(run_id = run_id) %>%
relocate(run_id) %>%
write_parquet(paths$output$test_card$s3)
# Upload the parameter search objects if CV was enabled. Requires some
# cleaning since the Tidymodels output is stored as a nested data frame
if (cv_enable) {
message("Uploading cross-validation artifacts")
# Upload the raw parameters object to S3 in case we need to use it later
aws.s3::put_object(
paths$output$parameter_raw$local,
paths$output$parameter_raw$s3
)
# Upload the parameter ranges used for CV
read_parquet(paths$output$parameter_range$local) %>%
mutate(run_id = run_id) %>%
relocate(run_id) %>%
write_parquet(paths$output$parameter_range$s3)
# Clean and unnest the raw parameters data, then write the results to S3
bind_cols(
read_parquet(paths$output$parameter_raw$local) %>%
tidyr::unnest(cols = .metrics) %>%
mutate(run_id = run_id) %>%
left_join(
rename(., notes = .notes) %>%
tidyr::unnest(cols = notes) %>%
rename(notes = note)
) %>%
select(-.notes) %>%
rename_with(~ gsub("^\\.", "", .x)) %>%
tidyr::pivot_wider(names_from = "metric", values_from = "estimate") %>%
relocate(
all_of(c(
"run_id",
"iteration" = "iter",
"configuration" = "config", "fold_id" = "id"
))
) %>%
relocate(c(location, type, notes), .after = everything()),
read_parquet(paths$output$parameter_raw$local) %>%
tidyr::unnest(cols = .extracts) %>%
tidyr::unnest(cols = .extracts) %>%
dplyr::select(num_iterations = .extracts)
) %>%
dplyr::select(-any_of(c("estimator", "trees")), -extracts) %>%
write_parquet(paths$output$parameter_search$s3)
}
# 2.2. Assess ----------------------------------------------------------------
message("Uploading final assessment results")
# Upload PIN and card-level values. These outputs are very large, so to help
# reduce file size and improve query performance we partition them by year,
# run ID, and township
read_parquet(paths$output$assessment_card$local) %>%
mutate(run_id = run_id, year = params$assessment$working_year) %>%
group_by(year, run_id, township_code) %>%
arrow::write_dataset(
path = paths$output$assessment_card$s3,
format = "parquet",
hive_style = TRUE,
compression = "snappy"
)
read_parquet(paths$output$assessment_pin$local) %>%
mutate(run_id = run_id, year = params$assessment$working_year) %>%
group_by(year, run_id, township_code) %>%
arrow::write_dataset(
path = paths$output$assessment_pin$s3,
format = "parquet",
hive_style = TRUE,
compression = "snappy"
)
# 2.3. Evaluate --------------------------------------------------------------
# Upload test set performance
message("Uploading test set evaluation")
read_parquet(paths$output$performance_test$local) %>%
mutate(run_id = run_id) %>%
relocate(run_id) %>%
write_parquet(paths$output$performance_test$s3)
read_parquet(paths$output$performance_quantile_test$local) %>%
mutate(run_id = run_id) %>%
relocate(run_id) %>%
write_parquet(paths$output$performance_quantile_test$s3)
message("Uploading test linear baseline")
read_parquet(paths$output$performance_test_linear$local) %>%
mutate(run_id = run_id) %>%
relocate(run_id) %>%
write_parquet(paths$output$performance_test_linear$s3)
read_parquet(paths$output$performance_quantile_test_linear$local) %>%
mutate(run_id = run_id) %>%
relocate(run_id) %>%
write_parquet(paths$output$performance_quantile_test_linear$s3)
# Upload assessment set performance
message("Uploading assessment set evaluation")
read_parquet(paths$output$performance_assessment$local) %>%
mutate(run_id = run_id) %>%
relocate(run_id) %>%
write_parquet(paths$output$performance_assessment$s3)
read_parquet(paths$output$performance_quantile_assessment$local) %>%
mutate(run_id = run_id) %>%
relocate(run_id) %>%
write_parquet(paths$output$performance_quantile_assessment$s3)
# 2.4. Interpret -------------------------------------------------------------
# Upload SHAP values. One row per card and on column per feature, so the
# output is very large. Therefore, we partition the data by
# year, run ID, and township
if (shap_enable) {
message("Uploading SHAP values")
read_parquet(paths$output$shap$local) %>%
mutate(run_id = run_id, year = params$assessment$working_year) %>%
group_by(year, run_id, township_code) %>%
arrow::write_dataset(
path = paths$output$shap$s3,
format = "parquet",
hive_style = TRUE,
compression = "snappy"
)
}
# Upload feature importance metrics
message("Uploading feature importance metrics")
read_parquet(paths$output$feature_importance$local) %>%
mutate(run_id = run_id) %>%
relocate(run_id) %>%
write_parquet(paths$output$feature_importance$s3)
# Upload comps
if (comp_enable) {
message("Uploading comps")
read_parquet(paths$output$comp$local) %>%
mutate(run_id = run_id, year = params$assessment$working_year) %>%
group_by(year, run_id) %>%
arrow::write_dataset(
path = paths$output$comp$s3,
format = "parquet",
hive_style = TRUE,
compression = "snappy"
)
}
# 2.5. Finalize --------------------------------------------------------------
message("Uploading run metadata, timings, and reports")
# Upload metadata
aws.s3::put_object(
paths$output$metadata$local,
paths$output$metadata$s3
)
# Upload finalized timings
aws.s3::put_object(
paths$output$timing$local,
paths$output$timing$s3
)
# Upload performance report
aws.s3::put_object(
paths$output$report_performance$local,
paths$output$report_performance$s3
)
# Upload PIN report(s)
pin_report_files <- list.files(
paths$output$report_pin$local,
pattern = paste0(report_pins, collapse = "|"),
full.names = TRUE
)
pin_report_files <- gsub("(?<!:)/+", "/", pin_report_files, perl = TRUE)
for (local_path in pin_report_files) {
s3_path <- gsub("(?<!:)/+", "/", file.path(
paths$output$report_pin$s3,
basename(local_path)
), perl = TRUE)
aws.s3::put_object(local_path, s3_path)
}
}
#- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# 3. Crawl and notify ----------------------------------------------------------
#- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# This will run a Glue crawler to update schemas and send an email to any SNS
# subscribers. Only run when actually uploading
if (upload_enable) {
message("Sending run email and running model crawler")
# If values were uploaded, trigger a Glue crawler to find any new partitions
glue_srv <- paws.analytics::glue(
config = list(region = Sys.getenv("AWS_REGION"))
)
tryCatch(
glue_srv$start_crawler("ccao-model-results-crawler"),
error = function(e) message(e),
warning = function(w) message(e)
)
# If SNS ARN is available, notify subscribers via email upon run completion
if (!is.na(Sys.getenv("AWS_SNS_ARN_MODEL_STATUS", unset = NA))) {
pipeline_sns <- paws.application.integration::sns(
config = list(region = Sys.getenv("AWS_REGION"))
)
# Get pipeline total run time from file
pipeline_sns_total_time <- read_parquet(paths$output$timing$local) %>%
mutate(dur = lubridate::seconds_to_period(round(overall_sec_elapsed))) %>%
dplyr::pull(dur)
# Get overall stats by township for the triad of interest, collapsed into
# a plaintext table
pipeline_sns_results <- arrow::read_parquet(
paths$output$performance_test$local,
col_select = c("geography_type", "geography_id", "by_class", "cod")
) %>%
filter(
tolower(town_get_triad(geography_id, name = TRUE)) ==
params$assessment$triad,
!by_class, geography_type == "township_code"
) %>%
mutate(township_name = town_convert(geography_id)) %>%
select(cod, township_name) %>%
mutate(across(where(is.numeric), round, 2)) %>%
arrange(cod) %>%
knitr::kable(format = "rst") %>%
as.character() %>%
.[!grepl("=", .)] %>%
paste0(collapse = "\n")
# Get a link to the uploaded Quarto report
report_path_parts <- strsplit(
paths$output$report_performance$s3[1],
"/"
)[[1]]
report_bucket <- report_path_parts[3]
report_path <- report_path_parts[4:length(report_path_parts)] %>%
paste(collapse = "/")
# Use direct link to the console instead of to the object so that we don't
# have to bother with signed URLs
report_url <- paste0(
"https://s3.console.aws.amazon.com/s3/object/",
"{report_bucket}/{report_path}?region=us-east-1&tab=overview"
) %>%
glue::glue()
# Publish to SNS
pipeline_sns$publish(
Subject = paste("Model Run Complete:", run_id),
Message = paste0(
"Model run: ", run_id, " complete\n",
"Finished in: ", pipeline_sns_total_time, "\n",
"Run note: ", metadata$run_note, "\n\n",
"Report link: ", report_url, "\n\n",
pipeline_sns_results
),
TopicArn = Sys.getenv("AWS_SNS_ARN_MODEL_STATUS")
)
}
}