A different approach to parallelization with mapme.biodiversity #136
Replies: 21 comments
-
The approach described above works out of the box as soon as the users has
For the latter, a simple solution could be to replace in R/calc_indicator.R lines 87-97: if (processing_mode == "asset") {
# apply function with parameters and add hidden id column
results <- pbapply::pblapply(1:nrow(x), function(i) {
.prep_and_compute(x[i, ], params, i)
}, cl = cores)
} by: if (processing_mode == "asset") {
if (cores = "future") {
# apply function with parameters and add hidden id column
results <- pbapply::pblapply(1:nrow(x), function(i) {
.prep_and_compute(x[i, ], params, i)
}, cl = cores, future.seed = TRUE)
} else {
# apply function with parameters and add hidden id column
results <- pbapply::pblapply(1:nrow(x), function(i) {
.prep_and_compute(x[i, ], params, i)
}, cl = cores)
}
} Note that I have not tested this solution yet. Maybe future.seed = TRUE can be added even when future is not called in |
Beta Was this translation helpful? Give feedback.
-
Hi! This is awesome!!! I would opt to move |
Beta Was this translation helpful? Give feedback.
-
@Shirobakaidou Could you try this and report back? |
Beta Was this translation helpful? Give feedback.
-
Below I provide some benchmarks with processing times for a subset and then all terrestrial Madagascar protected areas (note that I use another source than WDPA, that I made available on this repo a while ago):
The tests were made on SSP Cloud, a data processing platform opened to data scientist from the French public sector. It is a Kubernetes instance hosted by the French statistical institute INSEE, with Onyxia running on top as a user interface. This code was used for the tests without future: library(tictoc)
library(dplyr)
library(sf)
library(mapme.biodiversity)
my_shp <- "https://github.com/mapme-initiative/mapme.biodiversity/files/9746104/AP_Vahatra_shp.zip"
download.file(my_shp, destfile = "Vahatra98AP.zip")
unzip("Vahatra98AP.zip", exdir = "Vahatra")
PA_mada <- st_read("Vahatra/AP_Vahatra.shp", quiet = TRUE) %>%
# Il manque la projection (pas de fichier .prj), on la spécifie à la main
st_set_crs("EPSG:4326")
# Discard points and cast multipolygons as polygons
PA_poly <- PA_mada %>%
st_make_valid() %>%
filter(st_geometry_type(.) == "MULTIPOLYGON") %>%
st_cast("POLYGON")
# Subset on small PAs
PA_poly_small <- PA_poly
# Test with 4 cores
PA_poly_small <- init_portfolio(x = PA_poly_small,
years = 2000:2020,
outdir = "data_s3/mapme",
cores = 4,
add_resources = TRUE,
verbose = TRUE)
# Get GFW data
PA_poly_small <- get_resources(x = PA_poly_small,
resources = c("gfw_treecover", "gfw_lossyear",
"gfw_emissions"))
# With 8 cores (without portfolio initialization)
start_4 <- tic()
PA_poly_small <- calc_indicators(x = PA_poly_small,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
duration_4 <- toc()
# With 8 cores
start_8 <- tic()
PA_poly_small_8 <- init_portfolio(x = PA_poly_small,
years = 2000:2020,
outdir = "data_s3/mapme",
cores = 8,
add_resources = TRUE,
verbose = TRUE)
PA_poly_small_8 <- calc_indicators(x = PA_poly_small_8,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=43m 47s
# With 16 cores
PA_poly_small_16 <- init_portfolio(x = PA_poly_small,
years = 2000:2020,
outdir = "data_s3/mapme",
cores = 16,
add_resources = TRUE,
verbose = TRUE)
PA_poly_small_16 <- calc_indicators(x = PA_poly_small_16,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=37m 56s
# With 24 cores
PA_poly_small_24 <- init_portfolio(x = PA_poly_small,
years = 2000:2020,
outdir = "data_s3/mapme",
cores = 24,
add_resources = TRUE,
verbose = TRUE)
PA_poly_small_24 <- calc_indicators(x = PA_poly_small_24,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=34m 20s
duration_4$callback_msg # Without init_portfolio icnluded in timer
# [1] "3588.133 sec elapsed"
duration_8$callback_msg
# [1] "2626.842 sec elapsed"
duration_16$callback_msg
# [1] "2275.712 sec elapsed"
duration_24$callback_msg
# [1] "2060.465 sec elapsed" This code was used for the tests with future: remove.packages("mapme.biodiversity")
# the general version
remotes::install_github("https://github.com/mapme-initiative/mapme.biodiversity")
# my fork
# remotes::install_github("https://github.com/fBedecarrats/mapme.biodiversity/tree/new_parallel")
install.packages("tictoc")
install.packages('future.apply')
library(tictoc)
library(dplyr)
library(sf)
library(future)
library(mapme.biodiversity)
my_shp <- "https://github.com/mapme-initiative/mapme.biodiversity/files/9746104/AP_Vahatra_shp.zip"
download.file(my_shp, destfile = "Vahatra98AP.zip")
unzip("Vahatra98AP.zip", exdir = "Vahatra")
PA_mada <- st_read("Vahatra/AP_Vahatra.shp", quiet = TRUE) %>%
# Il manque la projection (pas de fichier .prj), on la spécifie à la main
st_set_crs("EPSG:4326")
# Discard points and cast multipolygons as polygons
PA_poly <- PA_mada %>%
st_make_valid() %>%
filter(st_geometry_type(.) == "MULTIPOLYGON") %>%
st_cast("POLYGON")
# Subset on small PAs
PA_poly_small <- PA_poly %>%
filter(hectars <= 50000)
PA_poly_small_future <- init_portfolio(x = PA_poly_small,
years = 2000:2020,
outdir = "/home/onyxia/work/new_test/my_future",
tmpdir = "/home/onyxia/work/new_test/tmp",
add_resources = TRUE,
verbose = TRUE,
cores = "future")
# Get GFW data
PA_poly_small_future <- get_resources(x = PA_poly_small_future,
resources = c("gfw_treecover", "gfw_lossyear",
"gfw_emissions"))
dir.create("my_future/test")
plan(sequential)
plan(multicore, workers = 16)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# Warning message:
# In supportsMulticoreAndRStudio(...) :
# [ONE-TIME WARNING] Forked processing ('multicore') is not supported when
# running R from RStudio because it is considered unstable. For more details,
# how to control forked processing or not, and how to silence this warning in
# future R sessions, see ?parallelly::supportsMulticore
# elapsed=03m 18s
plan(sequential)
plan(multicore, workers = 8)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=03m 14s
plan(sequential)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=03m 13s
plan(sequential)
plan(multisession, workers = 16)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=01m 49s
plan(sequential)
plan(multisession, workers = 4)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=02m 23s
plan(sequential)
plan(cluster, workers = 8)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# 100% elapsed=01m 46s
plan(sequential)
plan(cluster, workers = 16)
PA_poly_small_future <- calc_indicators(x = PA_poly_small_future,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=01m 50s
###########################################################################
PA_poly <- init_portfolio(x = PA_poly,
years = 2000:2020,
outdir = "my_future",
tmpdir = "tmp",
add_resources = TRUE,
verbose = TRUE,
cores = "future")
plan(sequential)
plan(multisession, workers = 16)
PA_poly<- calc_indicators(x = PA_poly,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=36m 21s
plan(sequential)
plan(multisession, workers = 8)
PA_poly <- calc_indicators(x = PA_poly,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=41m 21s
plan(sequential)
plan(cluster, workers = 16)
PA_poly<- calc_indicators(x = PA_poly,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=37m 18s
plan(sequential)
plan(cluster, workers = 8)
PA_poly<- calc_indicators(x = PA_poly,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=40m 52s
plan(sequential)
plan(multisession, workers = 24)
PA_poly<- calc_indicators(x = PA_poly,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=34m 02s
plan(sequential)
plan(cluster, workers = 24)
PA_poly<- calc_indicators(x = PA_poly,
indicators = "treecover_area_and_emissions",
min_cover = 10, min_size = 1)
# elapsed=35m 31s |
Beta Was this translation helpful? Give feedback.
-
Thanks @fBedecarrats for this approach. We have discussed it and I think there are two things we would need to test:
I think we will need a bit of time and testing to dig into this. |
Beta Was this translation helpful? Give feedback.
-
This approach doesn't change the package behaviour regarding which calculation get parallelized and which calculations remain sequential (single core). This approach only handles the already parallelized cases with a different backend. |
Beta Was this translation helpful? Give feedback.
-
Note that from the above benchmarks, this doesn't affect performance. |
Beta Was this translation helpful? Give feedback.
-
The problem is that the previous freeze or slowdown where appearing somewhat randomly and I am unable to purposedly reproduce these occurrence, altough @karpfen and @Shirobakaidou indicated they had similar experiences (cf. https://github.com/openkfw/mapme.pa-impact-evaluation/issues/8 or #90). |
Beta Was this translation helpful? Give feedback.
-
I agree that the current suggestion by @fBedecarrats simply introduces another backend to the already implemented simplistic parallelization strategy (that is, we simply parallelize over assets in a portfolio). For that reason, I do not see any issues with including the proposed changes. Looking at the bigger picture, the future package is the way to go if we would like to improve our approach to parallelization. It provides a fairly straight-forward interface to parallelize not only on a local machine but also across multiple machines/clusters. Also, it supports a pattern of nested parallelization, that could be interesting for mapme.biodiversity. However, both, parallelization across multiple machines and nested parallelization, is not trivial to set up from a package development perspective. That applies to any package in general and specifically for mapme.biodiversity since we deal with very large datasets in the background which are only available on the host machine. I still think that it is worth exploring the possibilities that future provides because we want to support the most efficient processing for global portfolios possible. |
Beta Was this translation helpful? Give feedback.
-
@goergen95 I'd ne happy to think about it with you. I think that the furrr package provides a nice API to vectorize and parallelize operations with future. Maybe you could point out one of the indicator calculation formula that could try to refractor with this approach, as a proof of concept. |
Beta Was this translation helpful? Give feedback.
-
I would be glad if we can figure something out together! Parallelization on the indicator level is just one out of three levels of parallelization I have in mind and it is the one where I do not expect much efficiency gains (except maybe vector-raster operations). We could think about parallelization on these levels:
|
Beta Was this translation helpful? Give feedback.
-
Great! The idea is to replace the for loops by nested |
Beta Was this translation helpful? Give feedback.
-
Below is a minimal PoC of what I currently have in mind. Note, that in that case we actually don't get any speed-up compared to setting library(terra)
library(sf)
library(future)
library(tibble)
library(dplyr)
library(furrr)
# construct globally distributed polygons
(aois <- c(
"POLYGON ((-127.4604 59.78938, -127.1124 59.79392, -127.1237 60.13207, -127.4339 60.13207, -127.4604 59.78938))",
"POLYGON ((-91.62243 15.84818, -91.52449 15.84076, -91.52375 15.95687, -91.60944 15.95353, -91.62243 15.84818))",
"POLYGON ((-48.37485 -13.55735, -48.61121 -13.56249, -48.60864 -13.68581, -48.362 -13.71664, -48.37485 -13.55735))",
"POLYGON ((34.54681 1.639019, 34.87915 1.663101, 34.92249 1.889474, 34.68649 1.932822, 34.54681 1.639019))",
"POLYGON ((120.3057 46.61764, 120.6169 46.65425, 120.5986 46.86782, 120.3789 46.84341, 120.3057 46.61764))",
"POLYGON ((151.2656 -32.10844, 151.6211 -32.09528, 151.608 -31.8056, 151.371 -31.8056, 151.2656 -32.10844))"
) |>
st_as_sfc(crs = st_crs(4326)) |>
st_as_sf())
#> Simple feature collection with 6 features and 0 fields
#> Geometry type: POLYGON
#> Dimension: XY
#> Bounding box: xmin: -127.4604 ymin: -32.10844 xmax: 151.6211 ymax: 60.13207
#> Geodetic CRS: WGS 84
#> x
#> 1 POLYGON ((-127.4604 59.7893...
#> 2 POLYGON ((-91.62243 15.8481...
#> 3 POLYGON ((-48.37485 -13.557...
#> 4 POLYGON ((34.54681 1.639019...
#> 5 POLYGON ((120.3057 46.61764...
#> 6 POLYGON ((151.2656 -32.1084...
aois$id <- 1:nrow(aois)
# print areas
units::set_units(st_area(aois), ha)
#> Units: [ha]
#> [1] 69344.81 12106.23 40886.77 92429.55 48699.86 91747.18
# get centroids
centroids <- st_centroid(aois)
#> Warning: st_centroid assumes attributes are constant over geometries
# read the COG file from openlandmap, the true file size is about ~200 GB
dem_url <- "/vsicurl/https://s3.eu-central-1.wasabisys.com/openlandmap/dtm/dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210.tif"
(dem <- rast(dem_url))
#> class : SpatRaster
#> dimensions : 524999, 1296000, 1 (nrow, ncol, nlyr)
#> resolution : 0.0002777778, 0.0002777778 (x, y)
#> extent : -180, 180, -62.00056, 83.8325 (xmin, xmax, ymin, ymax)
#> coord. ref. : lon/lat WGS 84 (EPSG:4326)
#> source : dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210.tif
#> name : dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210
# plot dem and position of polygons
plot(dem, colNA = "steelblue", smooth = TRUE, range = c(-1e2, 1e4))
plot(centroids, col="red", add = TRUE, pch = 4, cex = 2) # define example indicator functions
# mean height
dem_mean <- function(poly, dem_url){
rast(dem_url) |>
crop(poly) |>
mask(poly) |>
global("mean", na.rm = TRUE) |>
as.numeric() -> dem_mean
tibble(parameter = "dem",
value = dem_mean,
child.pid = Sys.getpid()
)
}
# mean tri
tri_mean <- function(poly, dem_url){
rast(dem_url) |>
crop(poly) |>
mask(poly) |>
terrain(v = "TRI") |>
global("mean", na.rm = TRUE) |>
as.numeric() -> tri_mean
tibble(parameter = "tri",
value = tri_mean,
child.pid = Sys.getpid()
)
}
# define main function
main <- function(portfolio, indicators, dem_url){
future_map_dfr(indicators,
function(indicator, portfolio, dem_url){
fun <- switch(indicator,
"dem" = dem_mean,
"tri" = tri_mean)
portfolio <- portfolio |> dplyr::group_split(id)
result <- future_map_dfr(portfolio,
function(poly, fun, dem_url){
fun(poly, dem_url)
}, fun, dem_url)
result$parent.id <- Sys.getpid()
result
}, portfolio, dem_url, .options=furrr_options(
packages = c("terra", "sf")
)
)
}
# execute in sequence and parallel
plan(list(tweak(multisession, workers = 2), tweak(multisession, workers = 3)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
#> user system elapsed
#> 1.347 0.061 26.588
plan(sequential)
out
#> # A tibble: 12 × 4
#> parameter value child.pid parent.id
#> <chr> <dbl> <int> <int>
#> 1 dem 814. 14746 14630
#> 2 dem 2522. 14746 14630
#> 3 dem 574. 14748 14630
#> 4 dem 1506. 14748 14630
#> 5 dem 883. 14747 14630
#> 6 dem 1061. 14747 14630
#> 7 tri 3.05 14902 14631
#> 8 tri 7.92 14902 14631
#> 9 tri 4.62 14901 14631
#> 10 tri 4.51 14901 14631
#> 11 tri 3.96 14903 14631
#> 12 tri 6.62 14903 14631 Created on 2023-02-19 with reprex v2.0.2 |
Beta Was this translation helpful? Give feedback.
-
Great work @goergen95 ! This is the kind of nexting of future_map function I had in mind.
(*) This one I didn't run, but the performance from sequential must be linear, as the sequential tests with 6 and 60 assets confirm. Below the full testing sequence: plan(sequential)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 2.547 0.650 19.507
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 2.559 0.332 36.415
# Now let's try with more assets
many_aois <- bind_rows(replicate(10, aois, simplify = FALSE))
many_aois$id <- 1:nrow(many_aois)
plan(sequential)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 21.570 5.551 191.902
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 4.689 0.360 57.267
# Now let's try with more assets
very_many_aois <- bind_rows(replicate(100, aois, simplify = FALSE))
very_many_aois$id <- 1:nrow(very_many_aois)
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 29.505 2.516 311.656 |
Beta Was this translation helpful? Give feedback.
-
Regarding bottlenecks from data transfer and read, my understanding is that Apache Arrow has sparked a real revolution during the last couple of years for tabular and vector spatial data with orders of magnitude in performance improvement (both for optimized storage, with parquet, data read and in-memory sharing with arrow, and with net transfer with flight. I haven't found anything already available for raster data, although I see in this issue/discussion that the arrow developers have been engaging with geospatial players about porting this approach to raster data (see this post in particular). If/when they finalize this, this would mean that we could leverage the whole arrow functionalities available in R for raster processing. |
Beta Was this translation helpful? Give feedback.
-
Thanks for the replication and timing with more assets! That looks like the expected improvements. A small side note: With Discussing formats for me is not the priority right now. Instead we need to understand how we want to handle the package internal flow of data with parallelization. My example above delays reading any data from the raster source until the very last step. This means the only data that is transferred to child processes is the URL and the polygons, so its the bare minimum of data that needs to be transferred. Because in this case the data source is a COG on a remote server we can simply read the data for the respective polygon in the very last step. We can thus also parallelize between different machines. With the way we currently handle data in the package, this approach is only applicable on a single machine because we won't find the downloaded resources on another server. Alternatively, we would transfer entire resources to the other machines which could introduce a serious bottleneck (in the example above, the main bottleneck is the latency to the server with the data source). |
Beta Was this translation helpful? Give feedback.
-
Hi, I tested other configs yesterday (details gitted there), with the following results:
I didn't see improvements at 2x8 compared to 4x4, but it may be due to cluster initialization overheads. I'll try later today with 600 assets to see how it behave with a larger dataset. |
Beta Was this translation helpful? Give feedback.
-
So I completed the benchmark. Here is a graph, table and full test scripts and outputs (created a specific repo for these tests).
Below the detailed table and testing scripts and outputs.
library(terra)
library(sf)
library(future)
library(tibble)
library(dplyr)
library(furrr)
# construct globally distributed polygons
(aois <- c(
"POLYGON ((-127.4604 59.78938, -127.1124 59.79392, -127.1237 60.13207, -127.4339 60.13207, -127.4604 59.78938))",
"POLYGON ((-91.62243 15.84818, -91.52449 15.84076, -91.52375 15.95687, -91.60944 15.95353, -91.62243 15.84818))",
"POLYGON ((-48.37485 -13.55735, -48.61121 -13.56249, -48.60864 -13.68581, -48.362 -13.71664, -48.37485 -13.55735))",
"POLYGON ((34.54681 1.639019, 34.87915 1.663101, 34.92249 1.889474, 34.68649 1.932822, 34.54681 1.639019))",
"POLYGON ((120.3057 46.61764, 120.6169 46.65425, 120.5986 46.86782, 120.3789 46.84341, 120.3057 46.61764))",
"POLYGON ((151.2656 -32.10844, 151.6211 -32.09528, 151.608 -31.8056, 151.371 -31.8056, 151.2656 -32.10844))"
) |>
st_as_sfc(crs = st_crs(4326)) |>
st_as_sf())
#> Simple feature collection with 6 features and 0 fields
#> Geometry type: POLYGON
#> Dimension: XY
#> Bounding box: xmin: -127.4604 ymin: -32.10844 xmax: 151.6211 ymax: 60.13207
#> Geodetic CRS: WGS 84
#> x
#> 1 POLYGON ((-127.4604 59.7893...
#> 2 POLYGON ((-91.62243 15.8481...
#> 3 POLYGON ((-48.37485 -13.557...
#> 4 POLYGON ((34.54681 1.639019...
#> 5 POLYGON ((120.3057 46.61764...
#> 6 POLYGON ((151.2656 -32.1084...
aois$id <- 1:nrow(aois)
# print areas
units::set_units(st_area(aois), ha)
#> Units: [ha]
#> [1] 69344.81 12106.23 40886.77 92429.55 48699.86 91747.18
# get centroids
centroids <- st_centroid(aois)
#> Warning: st_centroid assumes attributes are constant over geometries
# read the COG file from openlandmap, the true file size is about ~200 GB
dem_url <- "/vsicurl/https://s3.eu-central-1.wasabisys.com/openlandmap/dtm/dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210.tif"
(dem <- rast(dem_url))
#> class : SpatRaster
#> dimensions : 524999, 1296000, 1 (nrow, ncol, nlyr)
#> resolution : 0.0002777778, 0.0002777778 (x, y)
#> extent : -180, 180, -62.00056, 83.8325 (xmin, xmax, ymin, ymax)
#> coord. ref. : lon/lat WGS 84 (EPSG:4326)
#> source : dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210.tif
#> name : dtm.bareearth_ensemble_p10_30m_s_2018_go_epsg4326_v20230210
# plot dem and position of polygons
plot(dem, colNA = "steelblue", smooth = TRUE, range = c(-1e2, 1e4))
plot(centroids, col="red", add = TRUE, pch = 4, cex = 2)
# define example indicator functions
# mean height
dem_mean <- function(poly, dem_url){
rast(dem_url) |>
crop(poly) |>
mask(poly) |>
global("mean", na.rm = TRUE) |>
as.numeric() -> dem_mean
tibble(parameter = "dem",
value = dem_mean,
child.pid = Sys.getpid()
)
}
# mean tri
tri_mean <- function(poly, dem_url){
rast(dem_url) |>
crop(poly) |>
mask(poly) |>
terrain(v = "TRI") |>
global("mean", na.rm = TRUE) |>
as.numeric() -> tri_mean
tibble(parameter = "tri",
value = tri_mean,
child.pid = Sys.getpid()
)
}
# define main function
main <- function(portfolio, indicators, dem_url){
future_map_dfr(indicators,
function(indicator, portfolio, dem_url){
fun <- switch(indicator,
"dem" = dem_mean,
"tri" = tri_mean)
portfolio <- portfolio |>
dplyr::group_split(id)
result <- future_map_dfr(portfolio,
function(poly, fun, dem_url){
fun(poly, dem_url)
}, fun, dem_url)
result$parent.id <- Sys.getpid()
result
}, portfolio, dem_url, .options=furrr_options(
packages = c("terra", "sf")
)
)
}
# Create larger datasets
# With 60 assets
many_aois <- bind_rows(replicate(10, aois, simplify = FALSE))
many_aois$id <- 1:nrow(many_aois)
# With 600 assets
very_many_aois <- bind_rows(replicate(100, aois, simplify = FALSE))
very_many_aois$id <- 1:nrow(very_many_aois)
# user system elapsed
# 189.744 48.133 1480.693
# Sequential ------------------------------------------------------
plan(sequential)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 2.547 0.650 19.507
plan(sequential)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 21.570 5.551 191.902
plan(sequential)
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# 4 Workers -----------------------------------------------------------
## cluster 2x2 ---------------------------------
# very small dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 2)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 1.623 0.095 26.197
# small dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 2)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 8.049 5.468 65.004
# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 2)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
## Cluster 4 --------------------------------------
plan(sequential)
plan(cluster, workers = 4)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 1.009 0.053 19.999
plan(sequential)
plan(cluster, workers = 4)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 9.946 0.926 109.158
# larger dataset
plan(sequential)
plan(cluster, workers = 4)
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 81.295 6.770 879.906
# 8 workers ---------------------------------------------------------------
## Cluster 2x4 --------------------------------------
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 4)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 2.043 0.128 31.238
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 4)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 5.589 0.565 67.473
# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 4)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 101.255 8.462 1042.591
## Cluster 4x2 -----------------------------------------
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 2)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 1.446 0.211 24.726
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 2)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 5.983 0.600 73.072
# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 2)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 121.529 11.115 1314.205
## Cluster 8 ---------------------------------------
plan(sequential)
plan(cluster, workers = 8)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 1.334 0.132 24.333
plan(sequential)
plan(cluster, workers = 8)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 6.725 0.770 74.830
# larger dataset
plan(sequential)
plan(cluster, workers = 8)
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 51.439 5.110 567.372
# 8 workers ---------------------------------------------------------------
## Multisession 4x4 --------------------------
plan(sequential)
plan(list(tweak(multisession, workers = 4), tweak(multisession, workers = 4)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 2.287 0.238 34.425
plan(sequential)
plan(list(tweak(multisession, workers = 4), tweak(multisession, workers = 4)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 5.064 0.410 61.576
# larger dataset
plan(sequential)
plan(list(tweak(multisession, workers = 4), tweak(multisession, workers = 4)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 28.309 2.233 317.105
## cluster 4x4 ----------------------------------------
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 2.559 0.332 36.415
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 4.689 0.360 57.267
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 4)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 29.505 2.516 311.656
## cluster 2x8 ---------------------------------
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 8)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 2.830 0.680 41.274
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 8)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 6.459 0.846 70.975
# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 8)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 61.611 4.638 807.943
## cluster 16 -----------------------
plan(sequential)
plan(cluster, workers = 16)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 1.152 0.074 21.172
plan(sequential)
plan(cluster, workers = 16)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 5.960 0.548 73.368
# larger dataset
plan(sequential)
plan(cluster, workers = 16)
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 65.716 2.881 971.648
# 24 workers -----------------------------------------------------------
## cluster 3x8 -----------------------------------
plan(sequential)
plan(list(tweak(cluster, workers = 3), tweak(cluster, workers = 8)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 3.807 0.274 47.897
# Smaller dataset
plan(sequential)
plan(list(tweak(cluster, workers = 3), tweak(cluster, workers = 8)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 5.207 0.453 66.653
# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 3), tweak(cluster, workers = 8)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 27.404 2.322 310.140
## Cluster 4x6 ------------------------------
# Very small dataset
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 6)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 2.740 0.264 38.223
# Smaller dataset
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 6)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 4.118 0.364 56.342
# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 4), tweak(cluster, workers = 6)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 37.542 3.022 442.073
## cluster 2 x 12 ----------------------------------
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 12)))
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 3.501 0.436 43.203
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 12)))
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 6.926 0.754 80.899
# larger dataset
plan(sequential)
plan(list(tweak(cluster, workers = 2), tweak(cluster, workers = 12)))
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 20.459 1.965 222.440
## cluster 24 ------------------------------------------
plan(sequential)
plan(cluster, workers = 24)
system.time(out <- main(aois, c("dem", "tri"), dem_url))
# user system elapsed
# 0.954 0.071 20.891
plan(sequential)
plan(cluster, workers = 24)
system.time(out <- main(many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 8.095 0.828 102.298
# larger dataset
plan(sequential)
plan(cluster, workers = 24)
system.time(out <- main(very_many_aois, c("dem", "tri"), dem_url))
# user system elapsed
# 55.393 4.377 810.612
|
Beta Was this translation helpful? Give feedback.
-
Hi Everyone, I have just run a comparison of a sequential extract and a parallel one with 6 cores.I calculated the travel time in DRC with a 10km2 resolution. Thus, I have about 24,000 units of observations. The sequential extract took 13.26 minutes, while the 6 cores took only 2.46 minutes, thereby being roughly 5 times faster. That is great! The benefits of parallisation again comes with a lot of cells. |
Beta Was this translation helpful? Give feedback.
-
Hi @fBedecarrats, I took the liberty to adapt one of your early examples in this thread and adapt it against the current remotes::install_github("mapme-initiative/mapme.biodiversity", ref = "dev")
library(mapme.biodiversity)
library(future)
library(progressr)
library(tictoc)
library(sf)
outdir <- "./mapme-test"
dir.create(outdir)
src <- "/vsizip//vsicurl/https://github.com/mapme-initiative/mapme.biodiversity/files/9746104/AP_Vahatra_shp.zip/AP_Vahatra.shp"
data <- st_read(src)
data <- st_set_crs(data, "EPSG:4326")
data <- st_make_valid(data)
mapme_options(outdir = outdir)
plan(multisession(workers = 3)) # aoi here only matches 3 tile
with_progress({
get_resources(
data,
get_gfw_treecover(version = "GFC-2022-v1.10"),
get_gfw_lossyear(version = "GFC-2022-v1.10"),
get_gfw_emissions())
})
plan(sequential)
plan(multisession(workers = 4))
start <- tic()
with_progress({
inds <- calc_indicators(
data,
calc_treecover_area_and_emissions(
years = 2000:2022,
min_size = 1,
min_cover = 10
))
})
end <- toc()
#> 128.676 sec elapsed
plan(sequential) |
Beta Was this translation helpful? Give feedback.
-
Wow, am I understanding correctly or does that mean a ~30-fold performance increase!??? From ~60 minutes to ~2 minutes ?!!! |
Beta Was this translation helpful? Give feedback.
-
TL;DR: This issue describes an alternative way to specify cores with mapme.biodiversity that uses the
future
package backend. I include benchmarks for different configurations and numbers of parallel workers.Demo: Usually we do the following with
cores = n
:But the following (with
cores = "future"
) works and is likely more consistent and flexible across configurations, including windows:How does it work?
With the package
mapme.biodiversity
, we usually define a number of cores to use for parallelization withinit_portfolio(cores = n)
wheren
is an integer. Under the hood, the cores argument is stored and passed at each subsequentcalc_indicators()
call to thepbapply::pbapply(cl = n)
function, see the code of mapme.biodiversity::calc_indicators(), lines 89-92.Since its 1.7 version from January 2023 of
pbapply
, this package accepts a different way to specify the number of parallelization workers, withpbapply::pbapply(cl = "future")
. When doing so, pbapply uses thefuture
backend (see the future documentation).I think that this approach has several advantages:
Below I will add two more comments:
Beta Was this translation helpful? Give feedback.
All reactions