-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathccao-condominium-pin_condo_char.R
218 lines (207 loc) · 7.14 KB
/
ccao-condominium-pin_condo_char.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# This script cleans and combines raw condo characteristics data for the
# warehouse
library(arrow)
library(aws.s3)
library(DBI)
library(data.table)
library(dplyr)
library(glue)
library(noctua)
library(purrr)
library(stringr)
library(tidyr)
source("utils.R")
# Declare raw and clean condo data locations
AWS_S3_RAW_BUCKET <- Sys.getenv("AWS_S3_RAW_BUCKET")
AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET")
output_bucket <- file.path(
AWS_S3_WAREHOUSE_BUCKET,
"ccao", "condominium", "pin_condo_char"
)
# Connect to Athena
AWS_ATHENA_CONN_NOCTUA <- dbConnect(noctua::athena())
# Get S3 file addresses
files <- grep(
".parquet",
file.path(
AWS_S3_RAW_BUCKET,
aws.s3::get_bucket_df(
AWS_S3_RAW_BUCKET,
prefix = "ccao/condominium/pin_condo_char/"
)$Key
),
value = TRUE
)
# Grab sales/spatial data
classes <- dbGetQuery(
conn = AWS_ATHENA_CONN_NOCTUA, "
SELECT DISTINCT
parid AS pin,
class
FROM iasworld.pardat
WHERE taxyr = (SELECT MAX(taxyr) FROM iasworld.pardat)
AND class IN ('299', '399')
"
)
# Grab all years of previously assembled condo data already present on Athena
years <- dbGetQuery(
conn = AWS_ATHENA_CONN_NOCTUA, "
SELECT DISTINCT year FROM ccao.pin_condo_char
"
) %>%
pull(year)
# Function to grab chars data from Athena if it's already available
athena_chars <- function(x) {
dbGetQuery(
conn = AWS_ATHENA_CONN_NOCTUA, glue("
SELECT * FROM ccao.pin_condo_char
WHERE year = '{x}'
")
)
}
# A place to store characteristics data so we can stack it
chars <- list()
# We use tax year, valuations uses year the work was done
for (i in c("2021", "2022", "2023")) {
if (!("2021" %in% years) && i == "2021") {
# If clean 2021 data is not already in Athena, load and clean it
chars[[i]] <- map(
grep("2022", files, value = TRUE), function(x) {
read_parquet(x) %>%
tibble(.name_repair = "unique") %>%
rename_with(~ tolower(.x)) %>%
mutate(pin = str_pad(parid, 14, side = "left", pad = "0")) %>%
select(contains(c("pin", "sqft", "bed", "source"))) %>%
select(-contains(c("x", "all", "search"))) %>%
rename_with(~"bedrooms", contains("bed")) %>%
rename_with(~"unit_sf", contains("unit")) %>%
rename_with(~"building_sf", contains("building"))
}
) %>%
rbindlist(fill = TRUE) %>%
inner_join(classes) %>%
mutate(across(c(unit_sf, building_sf), ~ na_if(., "0"))) %>%
mutate(across(c(unit_sf, building_sf), ~ na_if(., "1"))) %>%
mutate(
across(c(building_sf, unit_sf, bedrooms), ~ gsub("[^0-9.-]", "", .))
) %>%
mutate(across(.cols = everything(), ~ trimws(., which = "both"))) %>%
na_if("") %>%
mutate(
bedrooms = case_when(
is.na(unit_sf) & bedrooms == "0" ~ NA_character_,
TRUE ~ bedrooms
)
) %>%
mutate(across(c(building_sf, unit_sf, bedrooms), ~ as.numeric(.))) %>%
mutate(
bedrooms = ceiling(bedrooms),
parking_pin = str_detect(source, "(?i)parking|garage") &
is.na(unit_sf) & is.na(building_sf),
year = "2021"
) %>%
select(-c(class, source)) %>%
# These are obvious typos
mutate(unit_sf = case_when(
unit_sf == 28002000 ~ 2800,
unit_sf == 20002800 ~ 2000,
unit_sf == 182901 ~ 1829,
TRUE ~ unit_sf
))
} else if (!("2022" %in% years) && i == "2022") {
# If clean 2022 data is not already in Athena, load and clean it
chars[[i]] <- lapply(grep("2023", files, value = TRUE), function(x) {
raw <- read_parquet(x)[, 1:20]
names <- tolower(names(raw))
names(raw) <- make.unique(names)
raw %>%
select(!contains("pin")) %>%
rename_with(~ str_replace(.x, "iasworold", "iasworld")) %>%
mutate(pin = str_pad(iasworld_parid, 14, side = "left", pad = "0")) %>%
rename_with(~ str_replace_all(.x, "[[:space:]]", "")) %>%
rename_with(~ str_replace_all(.x, "\\.{4}", "")) %>%
select(!contains(c("1", "2", "all"))) %>%
select(contains(c("pin", "sq", "bed", "bath"))) %>%
rename_with(~"bedrooms", contains("bed")) %>%
rename_with(~"unit_sf", contains("unit")) %>%
rename_with(~"building_sf", contains(c("building", "bldg"))) %>%
rename_with(~"half_baths", contains("half")) %>%
rename_with(~"full_baths", contains("full")) %>%
mutate(
across(!contains("pin"), as.numeric),
year = "2022",
# Define a parking pin as a unit with only 0 or NA values for
# characteristics
parking_pin = case_when(
(bedrooms == 0 | unit_sf == 0) &
rowSums(
across(c(unit_sf, bedrooms, full_baths, half_baths)),
na.rm = TRUE
) == 0 ~ TRUE,
TRUE ~ FALSE
),
# Really low unit_sf should be considered NA
unit_sf = case_when(
unit_sf < 5 & !parking_pin ~ NA_real_,
TRUE ~ unit_sf
),
# Assume missing half_baths value is 0 if there is full bathroom data
# for PIN
half_baths = case_when(
is.na(half_baths) & !is.na(full_baths) & full_baths > 0 ~ 0,
TRUE ~ half_baths
),
# Make beds and baths are integers
across(c(half_baths, full_baths, bedrooms), ~ ceiling(.x)),
# Set all characteristics to NA for parking pins
across(
c(bedrooms, unit_sf, half_baths, full_baths),
~ ifelse(parking_pin, NA, .x)
)
)
}) %>%
bind_rows() %>%
group_by(pin) %>%
arrange(unit_sf) %>%
filter(row_number() == 1) %>%
ungroup() %>%
filter(!is.na(pin))
} else if (!("2023" %in% years) && i == "2023") {
chars[[i]] <- lapply(grep("2024", files, value = TRUE), function(x) {
read_parquet(x) %>%
select(
pin = "14.Digit.PIN",
building_sf = "Building.Square.Footage",
unit_sf = "Unit.Square.Footage",
bedrooms = "Bedrooms",
parking_pin = "Parking.Space.Change",
full_baths = "Full.Baths",
half_baths = "Half.Baths"
) %>%
mutate(
pin = gsub("[^0-9]", "", pin),
parking_pin = if_all(
c(unit_sf, bedrooms, full_baths, half_baths), is.na
) & !is.na(parking_pin),
year = "2023",
bedrooms = case_when(bedrooms > 15 ~ NA_real_, TRUE ~ bedrooms),
full_baths = case_when(full_baths > 10 ~ NA_real_, TRUE ~ full_baths),
unit_sf = case_when(unit_sf < 5 ~ NA_real_, TRUE ~ unit_sf)
)
}) %>%
bind_rows()
} else {
# If data is already in Athena, just take it from there
chars[[i]] <- athena_chars(i)
}
}
# Upload cleaned data to S3
chars %>%
bind_rows() %>%
group_by(year) %>%
arrow::write_dataset(
path = output_bucket,
format = "parquet",
hive_style = TRUE,
compression = "snappy"
)