diff --git a/NEWS.md b/NEWS.md index 65ddeb458..c6184fa50 100644 --- a/NEWS.md +++ b/NEWS.md @@ -6,7 +6,9 @@ ### New features -* Added support for native execution on the macOS with the M1 ARM-based CPU. +* Added support for native execution on the macOS with the M1 ARM-based CPU. (#375) + +* Added `threads` argument for `$optimize()` and `$variational()`. (#369) # cmdstanr 0.2.1 diff --git a/R/model.R b/R/model.R index 9b8e037a8..c64d792c6 100644 --- a/R/model.R +++ b/R/model.R @@ -865,7 +865,7 @@ sample_method <- function(data = NULL, } if (!is.null(num_cores)) { warning("'num_cores' is deprecated. Please use 'parallel_chains' instead.") - cores <- num_cores + parallel_chains <- num_cores } if (!is.null(num_chains)) { warning("'num_chains' is deprecated. Please use 'chains' instead.") @@ -988,6 +988,7 @@ CmdStanModel$set("public", name = "sample", value = sample_method) #' init = NULL, #' save_latent_dynamics = FALSE, #' output_dir = NULL, +#' threads = NULL, #' algorithm = NULL, #' init_alpha = NULL, #' iter = NULL, @@ -1002,6 +1003,10 @@ CmdStanModel$set("public", name = "sample", value = sample_method) #' in the CmdStan manual. Arguments left at `NULL` default to the default used #' by the installed version of CmdStan. #' +#' * `threads`: (positive integer) If the model was +#' [compiled][model-method-compile] with threading support, the number of +#' threads to use in parallelized sections (e.g., when +#' using the Stan functions `reduce_sum()` or `map_rect()`). #' * `algorithm`: (string) The optimization algorithm. One of `"lbfgs"`, #' `"bfgs"`, or `"newton"`. #' * `iter`: (positive integer) The number of iterations. @@ -1021,10 +1026,26 @@ optimize_method <- function(data = NULL, init = NULL, save_latent_dynamics = FALSE, output_dir = NULL, + threads = NULL, algorithm = NULL, init_alpha = NULL, iter = NULL, sig_figs = NULL) { + checkmate::assert_integerish(threads, lower = 1, len = 1, null.ok = TRUE) + if (is.null(self$cpp_options()[["stan_threads"]])) { + if (!is.null(threads)) { + warning("'threads' is set but the model was not compiled with ", + "'cpp_options = list(stan_threads = TRUE)' so 'threads' will have no effect!", + call. = FALSE) + threads <- NULL + } + } else { + if (is.null(threads)) { + stop("The model was compiled with 'cpp_options = list(stan_threads = TRUE)' ", + "but 'threads' was not set!", + call. = FALSE) + } + } optimize_args <- OptimizeArgs$new( algorithm = algorithm, init_alpha = init_alpha, @@ -1044,7 +1065,11 @@ optimize_method <- function(data = NULL, sig_figs = sig_figs ) - cmdstan_procs <- CmdStanProcs$new(num_procs = 1, show_stdout_messages = (is.null(refresh) || refresh != 0)) + cmdstan_procs <- CmdStanProcs$new( + num_procs = 1, + show_stdout_messages = (is.null(refresh) || refresh != 0), + threads_per_proc = threads + ) runset <- CmdStanRun$new(args = cmdstan_args, procs = cmdstan_procs) runset$run_cmdstan() CmdStanMLE$new(runset) @@ -1079,6 +1104,7 @@ CmdStanModel$set("public", name = "optimize", value = optimize_method) #' init = NULL, #' save_latent_dynamics = FALSE, #' output_dir = NULL, +#' threads = NULL, #' algorithm = NULL, #' iter = NULL, #' grad_samples = NULL, @@ -1100,6 +1126,10 @@ CmdStanModel$set("public", name = "optimize", value = optimize_method) #' in the CmdStan manual. Arguments left at `NULL` default to the default used #' by the installed version of CmdStan. #' +#' * `threads`: (positive integer) If the model was +#' [compiled][model-method-compile] with threading support, the number of +#' threads to use in parallelized sections (e.g., when +#' using the Stan functions `reduce_sum()` or `map_rect()`). #' * `algorithm`: (string) The algorithm. Either `"meanfield"` or `"fullrank"`. #' * `iter`: (positive integer) The _maximum_ number of iterations. #' * `grad_samples`: (positive integer) The number of samples for Monte Carlo @@ -1117,6 +1147,7 @@ CmdStanModel$set("public", name = "optimize", value = optimize_method) #' * `output_samples:` (positive integer) Number of posterior samples to draw #' and save. #' +#' #' @section Value: The `$variational()` method returns a [`CmdStanVB`] object. #' #' @template seealso-docs @@ -1130,6 +1161,7 @@ variational_method <- function(data = NULL, init = NULL, save_latent_dynamics = FALSE, output_dir = NULL, + threads = NULL, algorithm = NULL, iter = NULL, grad_samples = NULL, @@ -1141,6 +1173,21 @@ variational_method <- function(data = NULL, eval_elbo = NULL, output_samples = NULL, sig_figs = NULL) { + checkmate::assert_integerish(threads, lower = 1, len = 1, null.ok = TRUE) + if (is.null(self$cpp_options()[["stan_threads"]])) { + if (!is.null(threads)) { + warning("'threads' is set but the model was not compiled with ", + "'cpp_options = list(stan_threads = TRUE)' so 'threads' will have no effect!", + call. = FALSE) + threads <- NULL + } + } else { + if (is.null(threads)) { + stop("The model was compiled with 'cpp_options = list(stan_threads = TRUE)' ", + "but 'threads' was not set!", + call. = FALSE) + } + } variational_args <- VariationalArgs$new( algorithm = algorithm, iter = iter, @@ -1167,7 +1214,11 @@ variational_method <- function(data = NULL, sig_figs = sig_figs ) - cmdstan_procs <- CmdStanProcs$new(num_procs = 1, show_stdout_messages = (is.null(refresh) || refresh != 0)) + cmdstan_procs <- CmdStanProcs$new( + num_procs = 1, + show_stdout_messages = (is.null(refresh) || refresh != 0), + threads_per_proc = threads + ) runset <- CmdStanRun$new(args = cmdstan_args, procs = cmdstan_procs) runset$run_cmdstan() CmdStanVB$new(runset) @@ -1191,9 +1242,9 @@ CmdStanModel$set("public", name = "variational", value = variational_method) #' data = NULL, #' seed = NULL, #' output_dir = NULL, -#' sig_figs = NULL, #' parallel_chains = getOption("mc.cores", 1), -#' threads_per_chain = NULL +#' threads_per_chain = NULL, +#' sig_figs = NULL #' ) #' ``` #' @@ -1202,7 +1253,7 @@ CmdStanModel$set("public", name = "variational", value = variational_method) #' - A [CmdStanMCMC] fitted model object. #' - A character vector of paths to CmdStan CSV output files containing #' parameter draws. -#' * `data`, `seed`, `output_dir`, `parallel_chains`, `threads_per_chain`: +#' * `data`, `seed`, `output_dir`, `parallel_chains`, `threads_per_chain`, `sig_figs`: #' Same as for the [`$sample()`][model-method-sample] method. #' #' @section Value: The `$generate_quantities()` method returns a [`CmdStanGQ`] object. @@ -1258,10 +1309,26 @@ generate_quantities_method <- function(fitted_params, data = NULL, seed = NULL, output_dir = NULL, - sig_figs = NULL, parallel_chains = getOption("mc.cores", 1), - threads_per_chain = NULL) { + threads_per_chain = NULL, + sig_figs = NULL) { checkmate::assert_integerish(parallel_chains, lower = 1, null.ok = TRUE) + checkmate::assert_integerish(threads_per_chain, lower = 1, len = 1, null.ok = TRUE) + if (is.null(self$cpp_options()[["stan_threads"]])) { + if (!is.null(threads_per_chain)) { + warning("'threads_per_chain' is set but the model was not compiled with ", + "'cpp_options = list(stan_threads = TRUE)' so 'threads_per_chain' will have no effect!", + call. = FALSE) + threads_per_chain <- NULL + } + } else { + if (is.null(threads_per_chain)) { + stop("The model was compiled with 'cpp_options = list(stan_threads = TRUE)' ", + "but 'threads_per_chain' was not set!", + call. = FALSE) + } + } + fitted_params <- process_fitted_params(fitted_params) chains <- length(fitted_params) generate_quantities_args <- GenerateQuantitiesArgs$new( diff --git a/R/read_csv.R b/R/read_csv.R index ca75667e5..546312ff9 100644 --- a/R/read_csv.R +++ b/R/read_csv.R @@ -235,7 +235,7 @@ read_cmdstan_csv <- function(files, posterior::as_draws_array(draws[(num_warmup_draws+1):all_draws, variables, drop = FALSE]), along="chain" ) - } + } } if (length(sampler_diagnostics) > 0) { warmup_sampler_diagnostics_draws <- posterior::bind_draws( @@ -483,7 +483,11 @@ read_csv_metadata <- function(csv_file) { csv_file_info$step_size <- csv_file_info$stepsize csv_file_info$iter_warmup <- csv_file_info$num_warmup csv_file_info$iter_sampling <- csv_file_info$num_samples - csv_file_info$threads_per_chain <- csv_file_info$num_threads + if (csv_file_info$method == "variational" || csv_file_info$method == "optimize") { + csv_file_info$threads <- csv_file_info$num_threads + } else { + csv_file_info$threads_per_chain <- csv_file_info$num_threads + } csv_file_info$model <- NULL csv_file_info$engaged <- NULL csv_file_info$delta <- NULL diff --git a/R/run.R b/R/run.R index 94632ca03..44597c3b7 100644 --- a/R/run.R +++ b/R/run.R @@ -367,6 +367,9 @@ CmdStanRun$set("private", name = "run_generate_quantities_", value = .run_genera Sys.setenv(PATH = paste0(path_to_TBB, ";", Sys.getenv("PATH"))) } } + if (!is.null(procs$threads_per_proc())) { + Sys.setenv("STAN_NUM_THREADS" = as.integer(procs$threads_per_proc())) + } start_time <- Sys.time() id <- 1 procs$new_proc( diff --git a/man/model-method-generate-quantities.Rd b/man/model-method-generate-quantities.Rd index a158e046f..d89e39376 100644 --- a/man/model-method-generate-quantities.Rd +++ b/man/model-method-generate-quantities.Rd @@ -15,9 +15,9 @@ based on previously fitted parameters. data = NULL, seed = NULL, output_dir = NULL, - sig_figs = NULL, parallel_chains = getOption("mc.cores", 1), - threads_per_chain = NULL + threads_per_chain = NULL, + sig_figs = NULL ) } } @@ -31,7 +31,7 @@ based on previously fitted parameters. \item A character vector of paths to CmdStan CSV output files containing parameter draws. } -\item \code{data}, \code{seed}, \code{output_dir}, \code{parallel_chains}, \code{threads_per_chain}: +\item \code{data}, \code{seed}, \code{output_dir}, \code{parallel_chains}, \code{threads_per_chain}, \code{sig_figs}: Same as for the \code{\link[=model-method-sample]{$sample()}} method. } } diff --git a/man/model-method-optimize.Rd b/man/model-method-optimize.Rd index 76b84edef..6acf4e32d 100644 --- a/man/model-method-optimize.Rd +++ b/man/model-method-optimize.Rd @@ -27,6 +27,7 @@ variables. Thus modes correspond to modes of the model as written. init = NULL, save_latent_dynamics = FALSE, output_dir = NULL, + threads = NULL, algorithm = NULL, init_alpha = NULL, iter = NULL, @@ -110,6 +111,10 @@ arguments. These arguments are described briefly here and in greater detail in the CmdStan manual. Arguments left at \code{NULL} default to the default used by the installed version of CmdStan. \itemize{ +\item \code{threads}: (positive integer) If the model was +\link[=model-method-compile]{compiled} with threading support, the number of +threads to use in parallelized sections (e.g., when +using the Stan functions \code{reduce_sum()} or \code{map_rect()}). \item \code{algorithm}: (string) The optimization algorithm. One of \code{"lbfgs"}, \code{"bfgs"}, or \code{"newton"}. \item \code{iter}: (positive integer) The number of iterations. diff --git a/man/model-method-variational.Rd b/man/model-method-variational.Rd index e3ad66b84..4f6b2a7ec 100644 --- a/man/model-method-variational.Rd +++ b/man/model-method-variational.Rd @@ -26,6 +26,7 @@ matrix for the approximation. init = NULL, save_latent_dynamics = FALSE, output_dir = NULL, + threads = NULL, algorithm = NULL, iter = NULL, grad_samples = NULL, @@ -116,6 +117,10 @@ arguments. These arguments are described briefly here and in greater detail in the CmdStan manual. Arguments left at \code{NULL} default to the default used by the installed version of CmdStan. \itemize{ +\item \code{threads}: (positive integer) If the model was +\link[=model-method-compile]{compiled} with threading support, the number of +threads to use in parallelized sections (e.g., when +using the Stan functions \code{reduce_sum()} or \code{map_rect()}). \item \code{algorithm}: (string) The algorithm. Either \code{"meanfield"} or \code{"fullrank"}. \item \code{iter}: (positive integer) The \emph{maximum} number of iterations. \item \code{grad_samples}: (positive integer) The number of samples for Monte Carlo diff --git a/tests/testthat/test-model-generate_quantities.R b/tests/testthat/test-model-generate_quantities.R index b7d9da661..5e59c4a79 100644 --- a/tests/testthat/test-model-generate_quantities.R +++ b/tests/testthat/test-model-generate_quantities.R @@ -60,6 +60,7 @@ test_that("generate_quantities work for different chains and parallel_chains", { expect_gq_output( mod_gq$generate_quantities(data = data_list, fitted_params = fit, parallel_chains = 4) ) + mod_gq <- cmdstan_model(testing_stan_file("bernoulli_ppc"), cpp_options = list(stan_threads = TRUE)) expect_gq_output( mod_gq$generate_quantities(data = data_list, fitted_params = fit_1_chain, threads_per_chain = 2) ) diff --git a/tests/testthat/test-threads.R b/tests/testthat/test-threads.R index ee596d111..2672fefdd 100644 --- a/tests/testthat/test-threads.R +++ b/tests/testthat/test-threads.R @@ -3,6 +3,8 @@ context("threading") if (not_on_cran()) { set_cmdstan_path() stan_program <- testing_stan_file("bernoulli") + stan_gq_program <- testing_stan_file("bernoulli_ppc") + data_file_gq_json <- testing_data("bernoulli_ppc") data_file_json <- test_path("resources", "data", "bernoulli.data.json") } @@ -19,7 +21,7 @@ test_that("using threads_per_chain without stan_threads set in compile() warns", fixed = TRUE) }) -test_that("threading works", { +test_that("threading works with sample()", { skip_on_cran() mod <- cmdstan_model(stan_program, cpp_options = list(stan_threads = TRUE)) @@ -30,25 +32,135 @@ test_that("threading works", { ) expect_output( - mod$sample(data = data_file_json, parallel_chains = 4, threads_per_chain = 1), + f <- mod$sample(data = data_file_json, parallel_chains = 4, threads_per_chain = 1), "Running MCMC with 4 parallel chains, with 1 thread(s) per chain..", fixed = TRUE ) - num_threads <- as.integer(Sys.getenv("STAN_NUM_THREADS")) - expect_equal(num_threads, 1) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 1) + expect_equal(f$metadata()$threads_per_chain, 1) + expect_output( - mod$sample(data = data_file_json, parallel_chains = 4, threads_per_chain = 2), + f <- mod$sample(data = data_file_json, parallel_chains = 4, threads_per_chain = 2), "Running MCMC with 4 parallel chains, with 2 thread(s) per chain..", fixed = TRUE ) - num_threads <- as.integer(Sys.getenv("STAN_NUM_THREADS")) - expect_equal(num_threads, 2) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 2) + expect_equal(f$metadata()$threads_per_chain, 2) expect_output( - mod$sample(data = data_file_json, parallel_chains = 4, threads_per_chain = 4), + f <- mod$sample(data = data_file_json, parallel_chains = 4, threads_per_chain = 4), "Running MCMC with 4 parallel chains, with 4 thread(s) per chain..", fixed = TRUE ) - num_threads <- as.integer(Sys.getenv("STAN_NUM_THREADS")) - expect_equal(num_threads, 4) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 4) + expect_equal(f$metadata()$threads_per_chain, 4) +}) + +test_that("threading works with optimize()", { + skip_on_cran() + mod <- cmdstan_model(stan_program, cpp_options = list(stan_threads = TRUE)) + + expect_error( + mod$optimize(data = data_file_json), + "The model was compiled with 'cpp_options = list(stan_threads = TRUE)' but 'threads' was not set!", + fixed = TRUE + ) + + expect_output( + f <- mod$optimize(data = data_file_json, threads = 1, seed = 123), + "Optimization terminated normally", + fixed = TRUE + ) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 1) + expect_equal(f$metadata()$threads, 1) + + expect_output( + f <- mod$optimize(data = data_file_json, threads = 2, seed = 123), + "Optimization terminated normally", + fixed = TRUE + ) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 2) + expect_equal(f$metadata()$threads, 2) + + expect_output( + f <- mod$optimize(data = data_file_json, threads = 4, seed = 123), + "Optimization terminated normally", + fixed = TRUE + ) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 4) + expect_equal(f$metadata()$threads, 4) +}) + +test_that("threading works with variational()", { + skip_on_cran() + mod <- cmdstan_model(stan_program, cpp_options = list(stan_threads = TRUE)) + + expect_error( + mod$variational(data = data_file_json), + "The model was compiled with 'cpp_options = list(stan_threads = TRUE)' but 'threads' was not set!", + fixed = TRUE + ) + + expect_output( + f <- mod$variational(data = data_file_json, threads = 1, seed = 123), + "EXPERIMENTAL ALGORITHM", + fixed = TRUE + ) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 1) + expect_equal(f$metadata()$threads, 1) + + expect_output( + f <- mod$variational(data = data_file_json, threads = 2, seed = 123), + "EXPERIMENTAL ALGORITHM", + fixed = TRUE + ) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 2) + expect_equal(f$metadata()$threads, 2) + + expect_output( + f <- mod$variational(data = data_file_json, threads = 4, seed = 123), + "EXPERIMENTAL ALGORITHM", + fixed = TRUE + ) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 4) + expect_equal(f$metadata()$threads, 4) +}) + +test_that("threading works with generate_quantities()", { + skip_on_cran() + mod <- cmdstan_model(stan_program, cpp_options = list(stan_threads = TRUE)) + mod_gq <- cmdstan_model(stan_gq_program, cpp_options = list(stan_threads = TRUE)) + expect_output( + f <- mod$sample(data = data_file_json, parallel_chains = 4, threads_per_chain = 1), + "Running MCMC with 4 parallel chains, with 1 thread(s) per chain..", + fixed = TRUE + ) + expect_error( + mod_gq$generate_quantities(fitted_params = f, data = data_file_json), + "The model was compiled with 'cpp_options = list(stan_threads = TRUE)' but 'threads_per_chain' was not set!", + fixed = TRUE + ) + expect_output( + f_gq <- mod_gq$generate_quantities(fitted_params = f, data = data_file_gq_json, threads_per_chain = 1, seed = 123), + "Running standalone generated quantities after 4 MCMC chains", + fixed = TRUE + ) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 1) + expect_equal(f_gq$metadata()$threads_per_chain, 1) + + expect_output( + f_gq <- mod_gq$generate_quantities(fitted_params = f, data = data_file_gq_json, threads_per_chain = 2, seed = 123), + "Running standalone generated quantities after 4 MCMC chains", + fixed = TRUE + ) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 2) + expect_equal(f_gq$metadata()$threads_per_chain, 2) + + expect_output( + f_gq <- mod_gq$generate_quantities(fitted_params = f, data = data_file_gq_json, threads_per_chain = 4, seed = 123), + "Running standalone generated quantities after 4 MCMC chains", + fixed = TRUE + ) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 4) + expect_equal(f_gq$metadata()$threads_per_chain, 4) })