From 0659dcdbe52778acd140287a7ad9a1350e2e3036 Mon Sep 17 00:00:00 2001 From: Sweaty Handshake Date: Mon, 16 Dec 2024 19:10:57 +0000 Subject: [PATCH] Address non-spatial parquets --- .../ccao/ccao-condominium-pin_condo_char.R | 437 +++++++++--------- .../ccao/ccao-condominium_parking.R | 1 + .../census/census-acs.R | 5 +- .../census/census-decennial.R | 3 +- .../environment/environment-airport_noise.R | 1 + .../spatial/spatial-environment_road.R | 6 +- 6 files changed, 231 insertions(+), 222 deletions(-) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium-pin_condo_char.R b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium-pin_condo_char.R index 8c381b26e..34d2fae98 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium-pin_condo_char.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium-pin_condo_char.R @@ -1,218 +1,219 @@ -# This script cleans and combines raw condo characteristics data for the -# warehouse -library(arrow) -library(aws.s3) -library(DBI) -library(data.table) -library(dplyr) -library(glue) -library(noctua) -library(purrr) -library(stringr) -library(tidyr) -source("utils.R") - -# Declare raw and clean condo data locations -AWS_S3_RAW_BUCKET <- Sys.getenv("AWS_S3_RAW_BUCKET") -AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET") -output_bucket <- file.path( - AWS_S3_WAREHOUSE_BUCKET, - "ccao", "condominium", "pin_condo_char" -) - -# Connect to Athena -AWS_ATHENA_CONN_NOCTUA <- dbConnect(noctua::athena()) - -# Get S3 file addresses -files <- grep( - ".parquet", - file.path( - AWS_S3_RAW_BUCKET, - aws.s3::get_bucket_df( - AWS_S3_RAW_BUCKET, - prefix = "ccao/condominium/pin_condo_char/" - )$Key - ), - value = TRUE -) - -# Grab sales/spatial data -classes <- dbGetQuery( - conn = AWS_ATHENA_CONN_NOCTUA, " - SELECT DISTINCT - parid AS pin, - class - FROM iasworld.pardat - WHERE taxyr = (SELECT MAX(taxyr) FROM iasworld.pardat) - AND class IN ('299', '399') - " -) - -# Grab all years of previously assembled condo data already present on Athena -years <- dbGetQuery( - conn = AWS_ATHENA_CONN_NOCTUA, " - SELECT DISTINCT year FROM ccao.pin_condo_char - " -) %>% - pull(year) - -# Function to grab chars data from Athena if it's already available -athena_chars <- function(x) { - dbGetQuery( - conn = AWS_ATHENA_CONN_NOCTUA, glue(" - SELECT * FROM ccao.pin_condo_char - WHERE year = '{x}' - ") - ) -} - -# A place to store characteristics data so we can stack it -chars <- list() - -# We use tax year, valuations uses year the work was done -for (i in c("2021", "2022", "2023")) { - if (!("2021" %in% years) && i == "2021") { - # If clean 2021 data is not already in Athena, load and clean it - chars[[i]] <- map( - grep("2022", files, value = TRUE), function(x) { - read_parquet(x) %>% - tibble(.name_repair = "unique") %>% - rename_with(~ tolower(.x)) %>% - mutate(pin = str_pad(parid, 14, side = "left", pad = "0")) %>% - select(contains(c("pin", "sqft", "bed", "source"))) %>% - select(-contains(c("x", "all", "search"))) %>% - rename_with(~"bedrooms", contains("bed")) %>% - rename_with(~"unit_sf", contains("unit")) %>% - rename_with(~"building_sf", contains("building")) - } - ) %>% - rbindlist(fill = TRUE) %>% - inner_join(classes) %>% - mutate(across(c(unit_sf, building_sf), ~ na_if(., "0"))) %>% - mutate(across(c(unit_sf, building_sf), ~ na_if(., "1"))) %>% - mutate( - across(c(building_sf, unit_sf, bedrooms), ~ gsub("[^0-9.-]", "", .)) - ) %>% - mutate(across(.cols = everything(), ~ trimws(., which = "both"))) %>% - na_if("") %>% - mutate( - bedrooms = case_when( - is.na(unit_sf) & bedrooms == "0" ~ NA_character_, - TRUE ~ bedrooms - ) - ) %>% - mutate(across(c(building_sf, unit_sf, bedrooms), ~ as.numeric(.))) %>% - mutate( - bedrooms = ceiling(bedrooms), - parking_pin = str_detect(source, "(?i)parking|garage") & - is.na(unit_sf) & is.na(building_sf), - year = "2021" - ) %>% - select(-c(class, source)) %>% - # These are obvious typos - mutate(unit_sf = case_when( - unit_sf == 28002000 ~ 2800, - unit_sf == 20002800 ~ 2000, - unit_sf == 182901 ~ 1829, - TRUE ~ unit_sf - )) - } else if (!("2022" %in% years) && i == "2022") { - # If clean 2022 data is not already in Athena, load and clean it - chars[[i]] <- lapply(grep("2023", files, value = TRUE), function(x) { - raw <- read_parquet(x)[, 1:20] - - names <- tolower(names(raw)) - names(raw) <- make.unique(names) - - raw %>% - select(!contains("pin")) %>% - rename_with(~ str_replace(.x, "iasworold", "iasworld")) %>% - mutate(pin = str_pad(iasworld_parid, 14, side = "left", pad = "0")) %>% - rename_with(~ str_replace_all(.x, "[[:space:]]", "")) %>% - rename_with(~ str_replace_all(.x, "\\.{4}", "")) %>% - select(!contains(c("1", "2", "all"))) %>% - select(contains(c("pin", "sq", "bed", "bath"))) %>% - rename_with(~"bedrooms", contains("bed")) %>% - rename_with(~"unit_sf", contains("unit")) %>% - rename_with(~"building_sf", contains(c("building", "bldg"))) %>% - rename_with(~"half_baths", contains("half")) %>% - rename_with(~"full_baths", contains("full")) %>% - mutate( - across(!contains("pin"), as.numeric), - year = "2022", - # Define a parking pin as a unit with only 0 or NA values for - # characteristics - parking_pin = case_when( - (bedrooms == 0 | unit_sf == 0) & - rowSums( - across(c(unit_sf, bedrooms, full_baths, half_baths)), - na.rm = TRUE - ) == 0 ~ TRUE, - TRUE ~ FALSE - ), - # Really low unit_sf should be considered NA - unit_sf = case_when( - unit_sf < 5 & !parking_pin ~ NA_real_, - TRUE ~ unit_sf - ), - # Assume missing half_baths value is 0 if there is full bathroom data - # for PIN - half_baths = case_when( - is.na(half_baths) & !is.na(full_baths) & full_baths > 0 ~ 0, - TRUE ~ half_baths - ), - # Make beds and baths are integers - across(c(half_baths, full_baths, bedrooms), ~ ceiling(.x)), - # Set all characteristics to NA for parking pins - across( - c(bedrooms, unit_sf, half_baths, full_baths), - ~ ifelse(parking_pin, NA, .x) - ) - ) - }) %>% - bind_rows() %>% - group_by(pin) %>% - arrange(unit_sf) %>% - filter(row_number() == 1) %>% - ungroup() %>% - filter(!is.na(pin)) - } else if (!("2023" %in% years) && i == "2023") { - chars[[i]] <- lapply(grep("2024", files, value = TRUE), function(x) { - read_parquet(x) %>% - select( - pin = "14.Digit.PIN", - building_sf = "Building.Square.Footage", - unit_sf = "Unit.Square.Footage", - bedrooms = "Bedrooms", - parking_pin = "Parking.Space.Change", - full_baths = "Full.Baths", - half_baths = "Half.Baths" - ) %>% - mutate( - pin = gsub("[^0-9]", "", pin), - parking_pin = if_all( - c(unit_sf, bedrooms, full_baths, half_baths), is.na - ) & !is.na(parking_pin), - year = "2023", - bedrooms = case_when(bedrooms > 15 ~ NA_real_, TRUE ~ bedrooms), - full_baths = case_when(full_baths > 10 ~ NA_real_, TRUE ~ full_baths), - unit_sf = case_when(unit_sf < 5 ~ NA_real_, TRUE ~ unit_sf) - ) - }) %>% - bind_rows() - } else { - # If data is already in Athena, just take it from there - chars[[i]] <- athena_chars(i) - } -} - -# Upload cleaned data to S3 -chars %>% - bind_rows() %>% - group_by(year) %>% - arrow::write_dataset( - path = output_bucket, - format = "parquet", - hive_style = TRUE, - compression = "snappy" - ) +# This script cleans and combines raw condo characteristics data for the +# warehouse +library(arrow) +library(aws.s3) +library(DBI) +library(data.table) +library(dplyr) +library(glue) +library(noctua) +library(purrr) +library(stringr) +library(tidyr) +source("utils.R") + +# Declare raw and clean condo data locations +AWS_S3_RAW_BUCKET <- Sys.getenv("AWS_S3_RAW_BUCKET") +AWS_S3_WAREHOUSE_BUCKET <- Sys.getenv("AWS_S3_WAREHOUSE_BUCKET") +output_bucket <- file.path( + AWS_S3_WAREHOUSE_BUCKET, + "ccao", "condominium", "pin_condo_char" +) + +# Connect to Athena +AWS_ATHENA_CONN_NOCTUA <- dbConnect(noctua::athena()) + +# Get S3 file addresses +files <- grep( + ".parquet", + file.path( + AWS_S3_RAW_BUCKET, + aws.s3::get_bucket_df( + AWS_S3_RAW_BUCKET, + prefix = "ccao/condominium/pin_condo_char/" + )$Key + ), + value = TRUE +) + +# Grab sales/spatial data +classes <- dbGetQuery( + conn = AWS_ATHENA_CONN_NOCTUA, " + SELECT DISTINCT + parid AS pin, + class + FROM iasworld.pardat + WHERE taxyr = (SELECT MAX(taxyr) FROM iasworld.pardat) + AND class IN ('299', '399') + " +) + +# Grab all years of previously assembled condo data already present on Athena +years <- dbGetQuery( + conn = AWS_ATHENA_CONN_NOCTUA, " + SELECT DISTINCT year FROM ccao.pin_condo_char + " +) %>% + pull(year) + +# Function to grab chars data from Athena if it's already available +athena_chars <- function(x) { + dbGetQuery( + conn = AWS_ATHENA_CONN_NOCTUA, glue(" + SELECT * FROM ccao.pin_condo_char + WHERE year = '{x}' + ") + ) +} + +# A place to store characteristics data so we can stack it +chars <- list() + +# We use tax year, valuations uses year the work was done +for (i in c("2021", "2022", "2023")) { + if (!("2021" %in% years) && i == "2021") { + # If clean 2021 data is not already in Athena, load and clean it + chars[[i]] <- map( + grep("2022", files, value = TRUE), function(x) { + read_parquet(x) %>% + tibble(.name_repair = "unique") %>% + rename_with(~ tolower(.x)) %>% + mutate(pin = str_pad(parid, 14, side = "left", pad = "0")) %>% + select(contains(c("pin", "sqft", "bed", "source"))) %>% + select(-contains(c("x", "all", "search"))) %>% + rename_with(~"bedrooms", contains("bed")) %>% + rename_with(~"unit_sf", contains("unit")) %>% + rename_with(~"building_sf", contains("building")) + } + ) %>% + rbindlist(fill = TRUE) %>% + inner_join(classes) %>% + mutate(across(c(unit_sf, building_sf), ~ na_if(., "0"))) %>% + mutate(across(c(unit_sf, building_sf), ~ na_if(., "1"))) %>% + mutate( + across(c(building_sf, unit_sf, bedrooms), ~ gsub("[^0-9.-]", "", .)) + ) %>% + mutate(across(.cols = everything(), ~ trimws(., which = "both"))) %>% + na_if("") %>% + mutate( + bedrooms = case_when( + is.na(unit_sf) & bedrooms == "0" ~ NA_character_, + TRUE ~ bedrooms + ) + ) %>% + mutate(across(c(building_sf, unit_sf, bedrooms), ~ as.numeric(.))) %>% + mutate( + bedrooms = ceiling(bedrooms), + parking_pin = str_detect(source, "(?i)parking|garage") & + is.na(unit_sf) & is.na(building_sf), + year = "2021" + ) %>% + select(-c(class, source)) %>% + # These are obvious typos + mutate(unit_sf = case_when( + unit_sf == 28002000 ~ 2800, + unit_sf == 20002800 ~ 2000, + unit_sf == 182901 ~ 1829, + TRUE ~ unit_sf + )) + } else if (!("2022" %in% years) && i == "2022") { + # If clean 2022 data is not already in Athena, load and clean it + chars[[i]] <- lapply(grep("2023", files, value = TRUE), function(x) { + raw <- read_parquet(x)[, 1:20] + + names <- tolower(names(raw)) + names(raw) <- make.unique(names) + + raw %>% + select(!contains("pin")) %>% + rename_with(~ str_replace(.x, "iasworold", "iasworld")) %>% + mutate(pin = str_pad(iasworld_parid, 14, side = "left", pad = "0")) %>% + rename_with(~ str_replace_all(.x, "[[:space:]]", "")) %>% + rename_with(~ str_replace_all(.x, "\\.{4}", "")) %>% + select(!contains(c("1", "2", "all"))) %>% + select(contains(c("pin", "sq", "bed", "bath"))) %>% + rename_with(~"bedrooms", contains("bed")) %>% + rename_with(~"unit_sf", contains("unit")) %>% + rename_with(~"building_sf", contains(c("building", "bldg"))) %>% + rename_with(~"half_baths", contains("half")) %>% + rename_with(~"full_baths", contains("full")) %>% + mutate( + across(!contains("pin"), as.numeric), + year = "2022", + # Define a parking pin as a unit with only 0 or NA values for + # characteristics + parking_pin = case_when( + (bedrooms == 0 | unit_sf == 0) & + rowSums( + across(c(unit_sf, bedrooms, full_baths, half_baths)), + na.rm = TRUE + ) == 0 ~ TRUE, + TRUE ~ FALSE + ), + # Really low unit_sf should be considered NA + unit_sf = case_when( + unit_sf < 5 & !parking_pin ~ NA_real_, + TRUE ~ unit_sf + ), + # Assume missing half_baths value is 0 if there is full bathroom data + # for PIN + half_baths = case_when( + is.na(half_baths) & !is.na(full_baths) & full_baths > 0 ~ 0, + TRUE ~ half_baths + ), + # Make beds and baths are integers + across(c(half_baths, full_baths, bedrooms), ~ ceiling(.x)), + # Set all characteristics to NA for parking pins + across( + c(bedrooms, unit_sf, half_baths, full_baths), + ~ ifelse(parking_pin, NA, .x) + ) + ) + }) %>% + bind_rows() %>% + group_by(pin) %>% + arrange(unit_sf) %>% + filter(row_number() == 1) %>% + ungroup() %>% + filter(!is.na(pin)) + } else if (!("2023" %in% years) && i == "2023") { + chars[[i]] <- lapply(grep("2024", files, value = TRUE), function(x) { + read_parquet(x) %>% + select( + pin = "14.Digit.PIN", + building_sf = "Building.Square.Footage", + unit_sf = "Unit.Square.Footage", + bedrooms = "Bedrooms", + parking_pin = "Parking.Space.Change", + full_baths = "Full.Baths", + half_baths = "Half.Baths" + ) %>% + mutate( + pin = gsub("[^0-9]", "", pin), + parking_pin = if_all( + c(unit_sf, bedrooms, full_baths, half_baths), is.na + ) & !is.na(parking_pin), + year = "2023", + bedrooms = case_when(bedrooms > 15 ~ NA_real_, TRUE ~ bedrooms), + full_baths = case_when(full_baths > 10 ~ NA_real_, TRUE ~ full_baths), + unit_sf = case_when(unit_sf < 5 ~ NA_real_, TRUE ~ unit_sf) + ) + }) %>% + bind_rows() + } else { + # If data is already in Athena, just take it from there + chars[[i]] <- athena_chars(i) + } +} + +# Upload cleaned data to S3 +chars %>% + bind_rows() %>% + mutate(loaded_at = as.character(Sys.time())) %>% + group_by(year) %>% + arrow::write_dataset( + path = output_bucket, + format = "parquet", + hive_style = TRUE, + compression = "snappy" + ) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium_parking.R b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium_parking.R index cee094c33..61b2dc5f6 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium_parking.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/ccao/ccao-condominium_parking.R @@ -65,6 +65,7 @@ nonlivable[["neg_pred"]] <- map( # Upload all nonlivable spaces to nonlivable table nonlivable %>% bind_rows() %>% + mutate(loaded_at = as.character(Sys.time())) %>% group_by(year) %>% arrow::write_dataset( path = output_bucket, diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/census/census-acs.R b/etl/scripts-ccao-data-warehouse-us-east-1/census/census-acs.R index a17ba3bcf..fa467133c 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/census/census-acs.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/census/census-acs.R @@ -133,10 +133,11 @@ pull_and_write_acs <- function( )) %>% rename(any_of(c("GEOID" = "GEOID...1"))) %>% select(-starts_with("GEOID..."), -starts_with("NAME")) %>% - filter(!str_detect(GEOID, "Z")) + filter(!str_detect(GEOID, "Z")) %>% + mutate(loaded_at = as.character(Sys.time())) # Write to S3 - arrow::write_parquet(df, remote_file) + write_parquet(df, remote_file) } } diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/census/census-decennial.R b/etl/scripts-ccao-data-warehouse-us-east-1/census/census-decennial.R index 50977f0ba..7825c7c70 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/census/census-decennial.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/census/census-decennial.R @@ -110,7 +110,8 @@ pull_and_write_dec <- function(s3_bucket_uri, survey, folder, geography, year) { cache_table = TRUE ) %>% select(-NAME) %>% - rename_with(~ rename_to_2020(.x, year), .cols = !GEOID) + rename_with(~ rename_to_2020(.x, year), .cols = !GEOID) %>% + mutate(loaded_at = as.character(Sys.time())) # Write to S3 arrow::write_parquet(df, remote_file) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/environment/environment-airport_noise.R b/etl/scripts-ccao-data-warehouse-us-east-1/environment/environment-airport_noise.R index 3dd104e15..f712ec843 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/environment/environment-airport_noise.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/environment/environment-airport_noise.R @@ -84,6 +84,7 @@ pins %>% select(pin10, airport_noise_dnl) %>% mutate(airport_noise_dnl = replace_na(airport_noise_dnl, 52.5)) %>% st_drop_geometry() %>% + mutate(loaded_at = as.character(Sys.time())) %>% write_parquet( file.path(output_bucket, paste0("year=omp"), "part-0.parquet") ) diff --git a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment_road.R b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment_road.R index 97699a44c..fdaba273a 100644 --- a/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment_road.R +++ b/etl/scripts-ccao-data-warehouse-us-east-1/spatial/spatial-environment_road.R @@ -290,7 +290,11 @@ walk(parquet_files, \(file_key) { )) %>% select(-year) - output_path <- file.path(output_bucket, paste0("year=", tools::file_path_sans_ext(basename(file_key))), "part-0.parquet") + output_path <- file.path( + output_bucket, + paste0("year=", tools::file_path_sans_ext(basename(file_key))), + "part-0.parquet" + ) geoparquet_to_s3(shapefile_data, output_path) print(paste(file_key, "cleaned and uploaded."))