Skip to content

Commit

Permalink
0.0.4dev: distribute mode 3
Browse files Browse the repository at this point in the history
- Using large raster files with (relatively) smaller vector objects
- Inspired by HUC-NASS computation
- Currently accepts continuous variables only
- TODO: categorical raster
  • Loading branch information
Insang Song committed Oct 31, 2023
1 parent 7ada648 commit a6279ba
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 20 deletions.
3 changes: 2 additions & 1 deletion .Rbuildignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
rds$
/tools/*
/.github
scomps*.html$
scomps*.html$
^LICENSE\.md$
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Imports:
testthat,
units
Suggests:
exactextractr,
future.batchtools,
igraph,
knitr,
Expand Down
3 changes: 2 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ export(check_within_reference)
export(clip_as_extent)
export(clip_as_extent_ras)
export(clip_as_extent_ras2)
export(distribute_process)
export(distribute_process_grid)
export(distribute_process_hierarchy)
export(distribute_process_multirasters)
export(estimate_demands)
export(extent_to_polygon)
export(extract_with)
Expand Down
71 changes: 64 additions & 7 deletions R/scale_process.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
#' library(future)
#' plan(multicore, workers = 4)
#' # Does not run ...
#' # distribute_process()
#' # distribute_process_grid()
#' @import future
#' @export
distribute_process <- function(
distribute_process_grid <- function(
grids,
grid_target_id = NULL,
fun,
Expand Down Expand Up @@ -45,8 +45,6 @@ distribute_process <- function(
grids_target <- grids[grid_target_ids %in% unlist(grids[["CGRIDID"]]),]
grids_target_list <- split(grids_target, unlist(grids_target[["CGRIDID"]]))

# pgrs <- progressr::progressor(along = nrow(grids_target))

results_distributed <- future.apply::future_lapply(
grids_target_list,
\(x) {
Expand All @@ -70,7 +68,7 @@ distribute_process <- function(

# post-processing
detected_id <- grep("^id", names(par_fun), value = TRUE)
detected_point <- grep("^points", names(par_fun), value = TRUE)
detected_point <- grep("^(points|poly)", names(par_fun), value = TRUE)
names(results_distributed)[1] <- par_fun[[detected_id]]
results_distributed[[par_fun[[detected_id]]]] <-
unlist(par_fun[[detected_point]][[par_fun[[detected_id]]]])
Expand All @@ -93,7 +91,7 @@ distribute_process <- function(
#' library(future)
#' plan(multicore, workers = 4)
#' # Does not run ...
#' # distribute_process()
#' # distribute_process_hierarchy()
#' @import future
#' @import progressr
#' @export
Expand Down Expand Up @@ -133,11 +131,70 @@ distribute_process_hierarchy <- function(

# post-processing
detected_id <- grep("^id", names(par_fun), value = TRUE)
detected_point <- grep("^points", names(par_fun), value = TRUE)
detected_point <- grep("^(points|poly)", names(par_fun), value = TRUE)
names(results_distributed)[1] <- par_fun[[detected_id]]
results_distributed[[par_fun[[detected_id]]]] <-
unlist(par_fun[[detected_point]][[par_fun[[detected_id]]]])

return(results_distributed)
}




#' @title Process a given function over multiple large rasters
#'
#' @description Large raster files usually exceed the memory capacity in size. Cropping a large raster into a small subset even consumes a lot of memory and adds processing time. This function leverages terra SpatRaster proxy to distribute computation jobs over multiple cores. It is assumed that users have multiple large raster files in their disk, then each file path is assigned to a thread. Each thread will directly read raster values from the disk using C++ pointers that operate in terra functions. For use, it is strongly recommended to use vector data with small and confined spatial extent for computation to avoid out-of-memory error. For this, users may need to make subsets of input vector objects in advance.
#' @param filenames character(n). A vector or list of full file paths of raster files. n is the total number of raster files.
#' @param fun function supported in scomps.
#' @param ... Arguments passed to the argument \code{fun}.
#' @return a data.frame object with computation results. For entries of the results, consult the function used in \code{fun} argument.
#' @author Insang Song \email{geoissong@@gmail.com}
#'
#' @examples
#' library(future)
#' plan(multicore, workers = 4)
#' # Does not run ...
#' # distribute_process_multirasters()
#' @import future
#' @import progressr
#' @export
distribute_process_multirasters <- function(
filenames,
fun,
...) {
par_fun <- list(...)

if (any(sapply(filenames, \(x) !file.exists(x)))) {
stop("One or many of files do not exist in provided file paths. Check the paths again.\n")
}

file_list <- split(filenames, filenames)
results_distributed <- future.apply::future_lapply(
file_list,
\(x) {
sf::sf_use_s2(FALSE)

run_result <- tryCatch({
res <- fun(...)
return(res)
},
error = function(e) return(data.frame(ID = NA)))
return(run_result)
},
future.seed = TRUE,
future.packages = c("terra", "sf", "dplyr", "scomps", "future"))
results_distributed <- do.call(dplyr::bind_rows, results_distributed)
results_distributed <- results_distributed[!is.na(results_distributed[["ID"]]),]

# post-processing
detected_id <- grep("^id", names(par_fun), value = TRUE)
detected_point <- grep("^(points|poly)", names(par_fun), value = TRUE)
names(results_distributed)[1] <- par_fun[[detected_id]]
results_distributed[[par_fun[[detected_id]]]] <-
unlist(par_fun[[detected_point]][[par_fun[[detected_id]]]])

return(results_distributed)
}


32 changes: 32 additions & 0 deletions man/distribute_process_grid.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/distribute_process_hierarchy.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions man/distribute_process_multirasters.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file added scomps_0.0.4.11012023.tar.gz
Binary file not shown.
75 changes: 66 additions & 9 deletions scomps_rmarkdown_litr.rmd
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: "Creating the ``r params$package_name`` R package"
author: "Insang Song"
date: "2023-10-04"
date: "2023-10-31"
knit: litr::render
output: litr::litr_html_document
params:
Expand Down Expand Up @@ -45,6 +45,7 @@ usethis::use_package("units")
usethis::use_package("methods")
usethis::use_package("progressr")
usethis::use_package("logr", "Suggests") # Default is "Imports"
usethis::use_package("exactextractr", "Suggests") # Default is "Imports"
usethis::use_package("future")
usethis::use_package("future.apply")
usethis::use_package("igraph", "Suggests") # Default is "Imports"
Expand Down Expand Up @@ -1408,10 +1409,10 @@ extract_with_buffer_kernel <- function(
#' library(future)
#' plan(multicore, workers = 4)
#' # Does not run ...
#' # distribute_process()
#' # distribute_process_grid()
#' @import future
#' @export
distribute_process <- function(
distribute_process_grid <- function(
grids,
grid_target_id = NULL,
fun,
Expand Down Expand Up @@ -1439,8 +1440,6 @@ distribute_process <- function(
grids_target <- grids[grid_target_ids %in% unlist(grids[["CGRIDID"]]),]
grids_target_list <- split(grids_target, unlist(grids_target[["CGRIDID"]]))
# pgrs <- progressr::progressor(along = nrow(grids_target))
results_distributed <- future.apply::future_lapply(
grids_target_list,
\(x) {
Expand All @@ -1464,7 +1463,7 @@ distribute_process <- function(
# post-processing
detected_id <- grep("^id", names(par_fun), value = TRUE)
detected_point <- grep("^points", names(par_fun), value = TRUE)
detected_point <- grep("^(points|poly)", names(par_fun), value = TRUE)
names(results_distributed)[1] <- par_fun[[detected_id]]
results_distributed[[par_fun[[detected_id]]]] <-
unlist(par_fun[[detected_point]][[par_fun[[detected_id]]]])
Expand All @@ -1487,7 +1486,7 @@ distribute_process <- function(
#' library(future)
#' plan(multicore, workers = 4)
#' # Does not run ...
#' # distribute_process()
#' # distribute_process_hierarchy()
#' @import future
#' @import progressr
#' @export
Expand Down Expand Up @@ -1527,14 +1526,73 @@ distribute_process_hierarchy <- function(
# post-processing
detected_id <- grep("^id", names(par_fun), value = TRUE)
detected_point <- grep("^points", names(par_fun), value = TRUE)
detected_point <- grep("^(points|poly)", names(par_fun), value = TRUE)
names(results_distributed)[1] <- par_fun[[detected_id]]
results_distributed[[par_fun[[detected_id]]]] <-
unlist(par_fun[[detected_point]][[par_fun[[detected_id]]]])
return(results_distributed)
}
#' @title Process a given function over multiple large rasters
#'
#' @description Large raster files usually exceed the memory capacity in size. Cropping a large raster into a small subset even consumes a lot of memory and adds processing time. This function leverages terra SpatRaster proxy to distribute computation jobs over multiple cores. It is assumed that users have multiple large raster files in their disk, then each file path is assigned to a thread. Each thread will directly read raster values from the disk using C++ pointers that operate in terra functions. For use, it is strongly recommended to use vector data with small and confined spatial extent for computation to avoid out-of-memory error. For this, users may need to make subsets of input vector objects in advance.
#' @param filenames character(n). A vector or list of full file paths of raster files. n is the total number of raster files.
#' @param fun function supported in scomps.
#' @param ... Arguments passed to the argument \code{fun}.
#' @return a data.frame object with computation results. For entries of the results, consult the function used in \code{fun} argument.
#' @author Insang Song \email{geoissong@@gmail.com}
#'
#' @examples
#' library(future)
#' plan(multicore, workers = 4)
#' # Does not run ...
#' # distribute_process_multirasters()
#' @import future
#' @import progressr
#' @export
distribute_process_multirasters <- function(
filenames,
fun,
...) {
par_fun <- list(...)
if (any(sapply(filenames, \(x) !file.exists(x)))) {
stop("One or many of files do not exist in provided file paths. Check the paths again.\n")
}
file_list <- split(filenames, filenames)
results_distributed <- future.apply::future_lapply(
file_list,
\(x) {
sf::sf_use_s2(FALSE)
run_result <- tryCatch({
res <- fun(...)
return(res)
},
error = function(e) return(data.frame(ID = NA)))
return(run_result)
},
future.seed = TRUE,
future.packages = c("terra", "sf", "dplyr", "scomps", "future"))
results_distributed <- do.call(dplyr::bind_rows, results_distributed)
results_distributed <- results_distributed[!is.na(results_distributed[["ID"]]),]
# post-processing
detected_id <- grep("^id", names(par_fun), value = TRUE)
detected_point <- grep("^(points|poly)", names(par_fun), value = TRUE)
names(results_distributed)[1] <- par_fun[[detected_id]]
results_distributed[[par_fun[[detected_id]]]] <-
unlist(par_fun[[detected_point]][[par_fun[[detected_id]]]])
return(results_distributed)
}
```


Expand All @@ -1547,7 +1605,6 @@ testthat::test_that("Processes are properly spawned and compute", {
withr::local_package("dplyr")
withr::local_package("progressr")
withr::local_options(list(sf_use_s2 = FALSE))
progressr::handlers(global = TRUE)
ncpath <- system.file("shape/nc.shp", package = "sf")
ncpoly <- terra::vect(ncpath) |>
Expand Down
1 change: 0 additions & 1 deletion tests/testthat/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ testthat::test_that("Processes are properly spawned and compute", {
withr::local_package("dplyr")
withr::local_package("progressr")
withr::local_options(list(sf_use_s2 = FALSE))
progressr::handlers(global = TRUE)

ncpath <- system.file("shape/nc.shp", package = "sf")
ncpoly <- terra::vect(ncpath) |>
Expand Down
Binary file modified tools/tarballs/scomps_0.0.4.11012023.tar.gz
Binary file not shown.

0 comments on commit a6279ba

Please sign in to comment.