From eae005331dbab45e73e061136b9315d0bc51e249 Mon Sep 17 00:00:00 2001 From: Rob Ashton Date: Wed, 18 Sep 2024 10:40:14 +0100 Subject: [PATCH 1/4] Make destroying of data an explicit opt in flag --- R/queue.R | 18 +++++++++++++----- tests/testthat/helper-queue.R | 9 ++++++--- tests/testthat/test-queue.R | 4 ++-- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/R/queue.R b/R/queue.R index e53b49e9..b3c93392 100644 --- a/R/queue.R +++ b/R/queue.R @@ -3,7 +3,8 @@ Queue <- R6::R6Class( cloneable = FALSE, public = list( root = NULL, - cleanup_on_exit = NULL, + stop_workers_on_exit = NULL, + delete_data_on_exit = NULL, controller = NULL, results_dir = NULL, inputs_dir = NULL, @@ -12,12 +13,14 @@ Queue <- R6::R6Class( next_health_check = NULL, initialize = function(queue_id = NULL, workers = 2, - cleanup_on_exit = workers > 0, + stop_workers_on_exit = workers > 0, + delete_data_on_exit = FALSE, results_dir = tempdir(), inputs_dir = NULL, timeout = Inf, health_check_interval = 0) { - self$cleanup_on_exit <- cleanup_on_exit + self$stop_workers_on_exit <- stop_workers_on_exit + self$delete_data_on_exit <- delete_data_on_exit self$results_dir <- results_dir message(t_("QUEUE_CONNECTING", list(redis = redux::redis_config()$url))) @@ -168,16 +171,21 @@ Queue <- R6::R6Class( ## Not part of the api exposed functions, used in tests destroy = function() { + message(sprintf( + "Deleting all redis data for queue '%s'", + self$controller$queue_id)) rrq::rrq_destroy(delete = TRUE, controller = self$controller) }, cleanup = function() { if (!is.null(self$controller)) { clear_cache(self$controller$queue_id) - if (self$cleanup_on_exit) { + if (self$stop_workers_on_exit) { message(t_("QUEUE_STOPPING_WORKERS")) worker_stop(type = "kill", controller = self$controller) - self$destroy() + if (self$delete_data_on_exit) { + self$destroy() + } self$controller <- NULL } } diff --git a/tests/testthat/helper-queue.R b/tests/testthat/helper-queue.R index e5657b54..84de24a5 100644 --- a/tests/testthat/helper-queue.R +++ b/tests/testthat/helper-queue.R @@ -26,7 +26,9 @@ MockQueue <- R6::R6Class( ) test_queue <- function(workers = 0) { - queue <- Queue$new(workers = workers, timeout = 300) + queue <- Queue$new(workers = workers, + timeout = 300, + delete_data_on_exit = TRUE) withr::defer_parent({ message("cleaning up workers") queue$cleanup() @@ -48,9 +50,10 @@ create_blocking_worker <- function(controller, worker_name = NULL) { worker_id = worker_name) } -test_queue_result <- function(model = mock_model, calibrate = mock_calibrate, +test_queue_result <- function(model = mock_model, + calibrate = mock_calibrate, clone_output = TRUE) { - queue <- Queue$new(workers = 1, timeout = 300) + queue <- Queue$new(workers = 1, timeout = 300, delete_data_on_exit = TRUE) withr::defer_parent({ message("cleaning up workers") queue$cleanup() diff --git a/tests/testthat/test-queue.R b/tests/testthat/test-queue.R index 664e1415..6c23ffc3 100644 --- a/tests/testthat/test-queue.R +++ b/tests/testthat/test-queue.R @@ -2,7 +2,7 @@ test_that("queue works as intended", { test_redis_available() test_mock_model_available() - queue <- Queue$new(timeout = 300) + queue <- Queue$new(timeout = 300, delete_data_on_exit = TRUE) ctrl <- queue$controller expect_equal(rrq::rrq_worker_len(controller = ctrl), 2) @@ -105,7 +105,7 @@ test_that("test queue starts workers with timeout", { }) test_that("queue starts up normally without a timeout", { - queue <- Queue$new(workers = 1) + queue <- Queue$new(workers = 1, delete_data_on_exit = TRUE) on.exit(queue$cleanup()) timeout <- rrq::rrq_message_send_and_wait( "TIMEOUT_GET", From 0226f6a8afa373c9d33e3676ef753c483cfd5391 Mon Sep 17 00:00:00 2001 From: Rob Ashton Date: Thu, 19 Sep 2024 11:40:42 +0100 Subject: [PATCH 2/4] Set default workers to 0 and only cleanup workers the controller started --- R/queue.R | 24 ++++++++++++++---------- tests/testthat/helper-queue.R | 3 +-- tests/testthat/test-queue.R | 14 ++++++++++++++ 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/R/queue.R b/R/queue.R index b3c93392..74688332 100644 --- a/R/queue.R +++ b/R/queue.R @@ -4,15 +4,16 @@ Queue <- R6::R6Class( public = list( root = NULL, stop_workers_on_exit = NULL, - delete_data_on_exit = NULL, + delete_data_on_exit = FALSE, controller = NULL, + worker_manager = NULL, results_dir = NULL, inputs_dir = NULL, health_check_interval = NULL, next_health_check = NULL, - initialize = function(queue_id = NULL, workers = 2, + initialize = function(queue_id = NULL, workers = 0, stop_workers_on_exit = workers > 0, delete_data_on_exit = FALSE, results_dir = tempdir(), @@ -69,11 +70,11 @@ Queue <- R6::R6Class( start = function(workers, timeout) { if (workers > 0L) { - worker_manager <- rrq::rrq_worker_spawn(workers, - controller = self$controller) + self$worker_manager <- rrq::rrq_worker_spawn( + workers, controller = self$controller) if (is.finite(timeout) && timeout > 0) { rrq::rrq_message_send_and_wait("TIMEOUT_SET", timeout, - worker_manager$id, + self$worker_manager$id, controller = self$controller) } } @@ -180,12 +181,15 @@ Queue <- R6::R6Class( cleanup = function() { if (!is.null(self$controller)) { clear_cache(self$controller$queue_id) - if (self$stop_workers_on_exit) { - message(t_("QUEUE_STOPPING_WORKERS")) + if (self$delete_data_on_exit) { + message("Stopping all workers") worker_stop(type = "kill", controller = self$controller) - if (self$delete_data_on_exit) { - self$destroy() - } + self$destroy() + self$controller <- NULL + } else if (self$stop_workers_on_exit && !is.null(self$worker_manager)) { + message(t_("QUEUE_STOPPING_WORKERS")) + worker_stop(self$worker_manager$id, type = "kill", + controller = self$controller) self$controller <- NULL } } diff --git a/tests/testthat/helper-queue.R b/tests/testthat/helper-queue.R index 84de24a5..4c8842bd 100644 --- a/tests/testthat/helper-queue.R +++ b/tests/testthat/helper-queue.R @@ -144,8 +144,7 @@ setup_prerun_queue <- function() { ) list( - queue = Queue$new(workers = 0, - inputs_dir = inputs_dir, + queue = Queue$new(inputs_dir = inputs_dir, results_dir = output_dir), payload = jsonlite::toJSON(payload) ) diff --git a/tests/testthat/test-queue.R b/tests/testthat/test-queue.R index 6c23ffc3..a53399f9 100644 --- a/tests/testthat/test-queue.R +++ b/tests/testthat/test-queue.R @@ -65,6 +65,7 @@ test_that("queue works as intended", { key <- queue$controller$keys$worker_id expect_equal(con$SCARD(key), 2) + ## queue can delete all data rm(queue) gc() @@ -189,3 +190,16 @@ test_that("health checks reconnects to redis if time lapsed", { queue$health_check() mockery::expect_called(mock_reconnect, 2) }) + +test_that("queue cleans up only its own workers", { + queue <- test_queue(workers = 1) + new_queue <- Queue$new(queue_id = queue$controller$queue_id, + workers = 2, + timeout = 300) + + expect_length(rrq::rrq_worker_list(controller = queue$controller), 3) + new_queue$cleanup() + Sys.sleep(1) ## Let workers finish exiting + rrq::rrq_worker_detect_exited(controller = queue$controller) + expect_length(rrq::rrq_worker_list(controller = queue$controller), 1) +}) From 06b04c3aa99212c2df4e291c69f73a031d0ab075 Mon Sep 17 00:00:00 2001 From: Rob Ashton Date: Thu, 19 Sep 2024 11:43:45 +0100 Subject: [PATCH 3/4] Bump version number and add NEWS item --- DESCRIPTION | 2 +- NEWS.md | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 420e6c1c..9bcf7298 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: hintr Title: R API for calling naomi district level HIV model -Version: 1.2.1 +Version: 1.2.2 Authors@R: person(given = "Robert", family = "Ashton", diff --git a/NEWS.md b/NEWS.md index f7dd868d..1dfc9d73 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,7 +1,13 @@ # hintr 1.2.1 -* Add new endpoint `/download/result/path/` to return path to the download file on disk +* Make it harder to accidentally delete redis data by: + * Not starting workers automatically when you create a queue + * Only removing the workers a queue creates by default + * Only deleting data if you explicitly opt in + +# hintr 1.2.1 +* Add new endpoint `/download/result/path/` to return path to the download file on disk # hintr 1.2.0 From ef3d90ede549942594e2468a796319153b942860 Mon Sep 17 00:00:00 2001 From: Rob Ashton Date: Mon, 23 Sep 2024 10:51:41 +0100 Subject: [PATCH 4/4] Fix test failures after worker delete behaviour change --- R/api.R | 1 - R/queue.R | 12 +- scripts/redis | 2 +- tests/testthat/helper-queue.R | 4 +- tests/testthat/test-02-api.R | 786 +++++++++++------------ tests/testthat/test-07-endpoints-model.R | 2 +- tests/testthat/test-migrations.R | 2 +- tests/testthat/test-queue.R | 7 +- 8 files changed, 410 insertions(+), 406 deletions(-) diff --git a/R/api.R b/R/api.R index 51f74b13..afc550aa 100644 --- a/R/api.R +++ b/R/api.R @@ -94,7 +94,6 @@ api <- function(queue_id = NULL, workers = 2, inputs_dir = inputs_dir, health_check_interval = health_check_interval) rrq::rrq_worker_delete_exited(controller = queue$controller) - logger <- porcelain::porcelain_logger(log_level) api_build(queue, logger = logger) } diff --git a/R/queue.R b/R/queue.R index 74688332..8aa29cf9 100644 --- a/R/queue.R +++ b/R/queue.R @@ -6,7 +6,7 @@ Queue <- R6::R6Class( stop_workers_on_exit = NULL, delete_data_on_exit = FALSE, controller = NULL, - worker_manager = NULL, + worker_ids = NULL, results_dir = NULL, inputs_dir = NULL, @@ -70,11 +70,12 @@ Queue <- R6::R6Class( start = function(workers, timeout) { if (workers > 0L) { - self$worker_manager <- rrq::rrq_worker_spawn( + worker_manager <- rrq::rrq_worker_spawn( workers, controller = self$controller) + self$worker_ids <- worker_manager$id if (is.finite(timeout) && timeout > 0) { rrq::rrq_message_send_and_wait("TIMEOUT_SET", timeout, - self$worker_manager$id, + self$worker_ids, controller = self$controller) } } @@ -183,12 +184,13 @@ Queue <- R6::R6Class( clear_cache(self$controller$queue_id) if (self$delete_data_on_exit) { message("Stopping all workers") + rrq::rrq_worker_detect_exited(controller = self$controller) worker_stop(type = "kill", controller = self$controller) self$destroy() self$controller <- NULL - } else if (self$stop_workers_on_exit && !is.null(self$worker_manager)) { + } else if (self$stop_workers_on_exit && !is.null(self$worker_ids)) { message(t_("QUEUE_STOPPING_WORKERS")) - worker_stop(self$worker_manager$id, type = "kill", + worker_stop(self$worker_ids, type = "kill", controller = self$controller) self$controller <- NULL } diff --git a/scripts/redis b/scripts/redis index f8ce2df1..4ace1a34 100755 --- a/scripts/redis +++ b/scripts/redis @@ -1,6 +1,6 @@ #!/usr/bin/env bash set -e -CONTAINER_NAME=hintr_redis +CONTAINER_NAME=hintr-redis if [ "$1" = "start" ]; then docker run --rm -d --name=$CONTAINER_NAME -p 127.0.0.1:6379:6379 redis elif [ "$1" = "stop" ]; then diff --git a/tests/testthat/helper-queue.R b/tests/testthat/helper-queue.R index 4c8842bd..a161f90b 100644 --- a/tests/testthat/helper-queue.R +++ b/tests/testthat/helper-queue.R @@ -25,10 +25,10 @@ MockQueue <- R6::R6Class( ) ) -test_queue <- function(workers = 0) { +test_queue <- function(workers = 0, delete_data_on_exit = TRUE) { queue <- Queue$new(workers = workers, timeout = 300, - delete_data_on_exit = TRUE) + delete_data_on_exit = delete_data_on_exit) withr::defer_parent({ message("cleaning up workers") queue$cleanup() diff --git a/tests/testthat/test-02-api.R b/tests/testthat/test-02-api.R index 09c64116..86ba04b4 100644 --- a/tests/testthat/test-02-api.R +++ b/tests/testthat/test-02-api.R @@ -558,7 +558,7 @@ test_that("api can call endpoint_model_cancel", { test_that("erroring model run returns useful messages", { test_redis_available() - queue <- MockQueue$new() + queue <- MockQueue$new(workers = 1) api <- api_build(queue) payload <- setup_payload_submit() res <- api$request("POST", "/model/submit", body = payload) @@ -643,395 +643,395 @@ test_that("returning_json_version adds version", { expect_equal(version_response$version$traduire, unclass(cfg$version_info$traduire)) }) - -test_that("content disposition header is formatted correctly", { - expect_match(build_content_disp_header("MWI", "naomi-output", ".zip"), - 'attachment; filename="MWI_naomi-output_\\d+-\\d+.zip"') - expect_match(build_content_disp_header(NULL, "naomi-output", ".zip"), - 'attachment; filename="naomi-output_\\d+-\\d+.zip"') - expect_match( - build_content_disp_header(c("MWI.1", "MWI.2"), "naomi-output", ".zip"), - 'attachment; filename="MWI.1_MWI.2_naomi-output_\\d+-\\d+.zip"') -}) - -test_that("returning_binary_head ensures no body in response", { - returning <- returning_binary_head() - data <- charToRaw("test") - expect_null(returning$process(data)) - expect_true(returning$validate(NULL)) -}) - -test_that("endpoint_model_debug can be run", { - test_redis_available() - test_mock_model_available() - - queue <- test_queue(workers = 1) - run_endpoint <- endpoint_model_submit(queue) - payload <- setup_payload_submit() - run_response <- run_endpoint$run(payload) - - expect_equal(run_response$status_code, 200) - out <- queue$task_wait(run_response$data$id) - - endpoint <- endpoint_model_debug(queue) - response <- endpoint$run(run_response$data$id) - - expect_equal(response$status_code, 200) - expect_match(response$headers$`Content-Disposition`, - 'attachment; filename="\\w+_\\d+-\\d+_naomi_debug.zip"') - ## Download contains data - expect_true(length(response$data) > 1000000) -}) - -test_that("api can call endpoint_model_debug", { - test_redis_available() - test_mock_model_available() - - queue <- test_queue(workers = 1) - api <- api_build(queue) - - ## Run the model - payload <- setup_payload_submit() - res <- api$request("POST", "/model/submit", body = payload) - - expect_equal(res$status, 200) - response <- jsonlite::fromJSON(res$body) - out <- queue$task_wait(response$data$id) - - ## Get result - res <- api$request("GET", paste0("/model/debug/", response$data$id)) - - expect_equal(res$status, 200) - expect_equal(res$headers$`Content-Type`, "application/octet-stream") - expect_match(res$headers$`Content-Disposition`, - 'attachment; filename="\\w+_\\d+-\\d+_naomi_debug.zip"') - ## Download contains data - expect_true(length(res$body) > 1000000) -}) - -test_that("endpoint_hintr_version works", { - endpoint <- endpoint_hintr_version() - response <- endpoint$run() - - expect_type(response$data, "list") - expect_setequal(names(response$data), c("hintr", "naomi", "rrq", "traduire")) - expect_equal(response$data$rrq, scalar(as.character(packageVersion("rrq")))) -}) - -test_that("api can call endpoint_hintr_version", { - test_redis_available() - - queue <- test_queue() - api <- api_build(queue) - res <- api$request("GET", "/hintr/version") - expect_equal(res$status, 200) - response <- jsonlite::fromJSON(res$body) - - expect_type(response$data, "list") - expect_setequal(names(response$data), c("hintr", "naomi", "rrq", "traduire")) - expect_equal(response$data$rrq, as.character(packageVersion("rrq"))) -}) - -test_that("endpoint_hintr_worker_status works", { - test_redis_available() - - queue <- test_queue(workers = 2) - endpoint <- endpoint_hintr_worker_status(queue) - response <- endpoint$run() - - expect_equal(unlist(response$data, FALSE, FALSE), rep("IDLE", 2)) -}) - -test_that("api can call endpoint_hintr_worker_status", { - test_redis_available() - - queue <- test_queue(workers = 2) - api <- api_build(queue) - res <- api$request("GET", "/hintr/worker/status") - expect_equal(res$status, 200) - response <- jsonlite::fromJSON(res$body) - - expect_equal(unlist(response$data, FALSE, FALSE), rep("IDLE", 2)) -}) - -test_that("endpoint_hintr_stop works", { - test_redis_available() - - queue <- test_queue() - mock_hintr_stop <- mockery::mock(function() NULL) - mockery::stub(endpoint_hintr_stop, "hintr_stop", mock_hintr_stop) - endpoint <- endpoint_hintr_stop(queue) - response <- endpoint$run() - - mockery::expect_called(mock_hintr_stop, 1) -}) - -test_that("api can call endpoint_hintr_stop", { - test_redis_available() - - queue <- test_queue() - mock_hintr_stop <- mockery::mock(function() NULL) - with_mocked_bindings({ - api <- api_build(queue) - res <- api$request("POST", "/hintr/stop") - }, hintr_stop = mock_hintr_stop) - expect_equal(res$status, 200) - mockery::expect_called(mock_hintr_stop, 1) -}) - -test_that("404 errors have sensible schema", { - test_redis_available() - - queue <- test_queue() - api <- api_build(queue) - res <- api$request("GET", "/meaning-of-life") - - expect_equal(res$status, 404) - response <- jsonlite::fromJSON(res$body) - expect_equal(response$status, "failure") - expect_equal(response$errors[1, "error"], "NOT_FOUND") - expect_equal(response$errors[1, "detail"], - "GET /meaning-of-life is not a valid hintr path") - expect_equal(response$data, setNames(list(), list())) -}) - -test_that("model calibrate can be queued and result returned", { - test_mock_model_available() - q <- test_queue_result() - - ## Submit calibrate request - submit <- endpoint_model_calibrate_submit(q$queue) - payload <- setup_payload_calibrate() - submit_response <- submit$run(q$model_run_id, payload) - - expect_equal(submit_response$status_code, 200) - expect_true(!is.null(submit_response$data$id)) - - ## Status - out <- q$queue$task_wait(submit_response$data$id) - status <- endpoint_model_calibrate_status(q$queue) - status_response <- status$run(submit_response$data$id) - - expect_equal(status_response$status_code, 200) - expect_equal(status_response$data$id, submit_response$data$id) - expect_true(status_response$data$done) - expect_equal(status_response$data$status, scalar("COMPLETE")) - expect_true(status_response$data$success) - expect_equal(status_response$data$queue, scalar(0)) - expect_match(status_response$data$progress[[1]], - "Saving outputs - [\\d.m\\s]+s elapsed", perl = TRUE) - - ## Get metadata which includes warnings - metadata <- endpoint_model_calibrate_metadata(q$queue) - metadata_response <- metadata$run(status_response$data$id) - expect_equal(metadata_response$data, - calibrate_metadata(q$queue)(status_response$data$id)) - expect_valid_metadata(metadata_response$data) - - ## Get data alone - data <- endpoint_model_calibrate_data(q$queue) - data_response <- data$run(status_response$data$id) - expect_equal(colnames(data_response$data$data), - c("area_id", "sex", "age_group", "calendar_quarter", - "indicator", "mode", "mean", "lower", "upper")) - expect_true(nrow(data_response$data$data) > 84042) - - ## Get path to data - path <- endpoint_model_calibrate_result_path(q$queue) - path_response <- path$run(status_response$data$id) - expect_equal(path_response$status_code, 200) - expect_true(file.exists(file.path(q$queue$results_dir, - path_response$data$path))) -}) - -test_that("api can call endpoint_model_calibrate", { - test_mock_model_available() - q <- test_queue_result() - - ## Submit calibrate - api <- api_build(q$queue) - calibrate_payload <- setup_payload_calibrate() - submit_res <- api$request("POST", - paste0("/calibrate/submit/", q$model_run_id), - body = calibrate_payload) - - expect_equal(submit_res$status, 200) - submit_body <- jsonlite::fromJSON(submit_res$body) - expect_true(!is.null(submit_body$data$id)) - - ## Status - out <- q$queue$task_wait(submit_body$data$id) - status_res <- api$request("GET", - paste0("/calibrate/status/", submit_body$data$id)) - - expect_equal(status_res$status, 200) - status_body <- jsonlite::fromJSON(status_res$body) - expect_equal(status_body$data$id, submit_body$data$id) - expect_true(status_body$data$done) - expect_equal(status_body$data$status, "COMPLETE") - expect_true(status_body$data$success) - expect_equal(status_body$data$queue, 0) - expect_match(status_body$data$progress[[1]], - "Saving outputs - [\\d.m\\s]+s elapsed", perl = TRUE) - - ## Get metadata - metadata_res <- api$request("GET", paste0("/calibrate/result/metadata/", - status_body$data$id)) - expect_equal(metadata_res$status, 200) - metadata_body <- jsonlite::fromJSON(metadata_res$body) - expect_equal(names(metadata_body$data), - c("filterTypes", "indicators", "plotSettingsControl", "warnings")) - - ## Get data - data_res <- api$request("GET", paste0("/calibrate/result/data/", - status_body$data$id)) - expect_equal(data_res$status, 200) - data_body <- jsonlite::fromJSON(data_res$body) - expect_equal(colnames(data_body$data$data), - c("area_id", "sex", "age_group", "calendar_quarter", - "indicator", "mode", "mean", "lower", "upper")) - expect_true(nrow(data_body$data$data) > 84042) - - ## Get path to data - path_res <- api$request("GET", paste0("/calibrate/result/path/", - status_body$data$id)) - expect_equal(path_res$status, 200) - path_body <- jsonlite::fromJSON(path_res$body) - expect_true(file.exists(file.path(q$queue$results_dir, path_body$data$path))) -}) - -test_that("can get calibrate plot data", { - test_mock_model_available() - test_redis_available() - q <- test_queue_result() - - endpoint <- endpoint_model_calibrate_plot(q$queue) - response <- endpoint$run(q$calibrate_id) - - expect_equal(response$status_code, 200) - response_data <- response$data - expect_setequal(names(response_data), c("data", "metadata")) - expect_setequal(names(response_data$data), - c("data_type", "spectrum_region_code", "spectrum_region_name", - "sex", "age_group", "calendar_quarter", "indicator", - "mean")) - expect_true(nrow(response_data$data) > 0) - expect_calibrate_plot_metadata(response_data$metadata) -}) - -test_that("API can return calibration plotting data", { - test_redis_available() - test_mock_model_available() - q <- test_queue_result() - - api <- api_build(q$queue) - res <- api$request("GET", paste0("/calibrate/plot/", q$calibrate_id)) - expect_equal(res$status, 200) - body <- jsonlite::fromJSON(res$body, simplifyDataFrame = FALSE) - expect_equal(body$status, "success") - expect_null(body$errors) - - response_data <- body$data - expect_setequal(names(response_data), c("data", "metadata")) - data <- do.call(rbind, response_data$data) - expect_setequal(colnames(data), - c("data_type", "spectrum_region_code", "spectrum_region_name", - "sex", "age_group", "calendar_quarter", "indicator", - "mean")) - expect_true(nrow(data) > 0) - expect_calibrate_plot_metadata(response_data$metadata) -}) - -test_that("error returned from calibrate_plot for old model output", { - test_mock_model_available() - test_redis_available() - q <- test_queue_result(calibrate = mock_calibrate_v1.0.7) - - endpoint <- endpoint_model_calibrate_plot(q$queue) - response <- endpoint$run(q$calibrate_id) - - expect_equal(response$status_code, 500) - expect_equal(response$value$errors[[1]]$detail, scalar( - "Model output out of date please re-run model and try again.")) -}) - -test_that("model calibrate metadata includes warnings", { - test_mock_model_available() - q <- test_queue_result() - - result <- endpoint_model_calibrate_metadata(q$queue) - response <- result$run(q$calibrate_id) - - expect_equal(response$status_code, 200) - expect_length(response$data$warnings, 2) - expect_equal(response$data$warnings[[1]]$text, - scalar("ART coverage greater than 100% for 10 age groups")) - expect_equal(response$data$warnings[[1]]$locations, "model_calibrate") - expect_equal(response$data$warnings[[2]]$text, - scalar("Prevalence greater than 40%")) - expect_equal(response$data$warnings[[2]]$locations, - c("model_calibrate", "review_output")) -}) - -test_that("calibrate plot metadata is translated", { - test_mock_model_available() - test_redis_available() - q <- test_queue_result() - - response <- with_hintr_language("fr", { - endpoint <- endpoint_model_calibrate_plot(q$queue) - response <- endpoint$run(q$calibrate_id) - }) - - expect_equal(response$status_code, 200) - - filters <- response$data$metadata$filterTypes - expect_equal(filters[[1]]$options[[1]]$label, scalar("Juin 2019")) - expect_equal(filters[[2]]$options[[1]]$label, scalar("Both")) - expect_equal(filters[[3]]$options[[1]]$label, scalar("15-49")) - expect_equal(filters[[4]]$options[1, "label"], "Population") - expect_equal(filters[[5]]$options[[1]]$label, scalar("Spectrum")) - expect_equal(filters[[6]]$options[[1]]$label, scalar("Malawi")) -}) - -test_that("can get comparison plot data", { - test_mock_model_available() - test_redis_available() - q <- test_queue_result() - - endpoint <- endpoint_comparison_plot(q$queue) - response <- endpoint$run(q$calibrate_id) - - expect_equal(response$status_code, 200) - response_data <- response$data - expect_setequal(names(response_data), c("data", "metadata")) - expect_setequal(names(response_data$data), - c("area_id", "area_name", "area_level", "age_group", "sex", - "calendar_quarter", "indicator", "source", "mean", - "lower", "upper")) - expect_true(nrow(response_data$data) > 0) - expect_comparison_metadata(response_data$metadata) -}) - -test_that("API can return comparison plotting data", { - test_redis_available() - test_mock_model_available() - q <- test_queue_result() - - api <- api_build(q$queue) - res <- api$request("GET", paste0("/comparison/plot/", q$calibrate_id)) - expect_equal(res$status, 200) - body <- jsonlite::fromJSON(res$body, simplifyDataFrame = FALSE) - expect_equal(body$status, "success") - expect_null(body$errors) - - response_data <- body$data - expect_setequal(names(response_data), c("data", "metadata")) - data <- do.call(rbind, response_data$data) - expect_setequal(colnames(data), - c("area_id", "area_name", "area_level", "age_group", "sex", - "calendar_quarter", "indicator", "source", "mean", - "lower", "upper")) - expect_true(nrow(data) > 0) - expect_comparison_metadata(response_data$metadata) -}) +# +# test_that("content disposition header is formatted correctly", { +# expect_match(build_content_disp_header("MWI", "naomi-output", ".zip"), +# 'attachment; filename="MWI_naomi-output_\\d+-\\d+.zip"') +# expect_match(build_content_disp_header(NULL, "naomi-output", ".zip"), +# 'attachment; filename="naomi-output_\\d+-\\d+.zip"') +# expect_match( +# build_content_disp_header(c("MWI.1", "MWI.2"), "naomi-output", ".zip"), +# 'attachment; filename="MWI.1_MWI.2_naomi-output_\\d+-\\d+.zip"') +# }) +# +# test_that("returning_binary_head ensures no body in response", { +# returning <- returning_binary_head() +# data <- charToRaw("test") +# expect_null(returning$process(data)) +# expect_true(returning$validate(NULL)) +# }) +# +# test_that("endpoint_model_debug can be run", { +# test_redis_available() +# test_mock_model_available() +# +# queue <- test_queue(workers = 1) +# run_endpoint <- endpoint_model_submit(queue) +# payload <- setup_payload_submit() +# run_response <- run_endpoint$run(payload) +# +# expect_equal(run_response$status_code, 200) +# out <- queue$task_wait(run_response$data$id) +# +# endpoint <- endpoint_model_debug(queue) +# response <- endpoint$run(run_response$data$id) +# +# expect_equal(response$status_code, 200) +# expect_match(response$headers$`Content-Disposition`, +# 'attachment; filename="\\w+_\\d+-\\d+_naomi_debug.zip"') +# ## Download contains data +# expect_true(length(response$data) > 1000000) +# }) +# +# test_that("api can call endpoint_model_debug", { +# test_redis_available() +# test_mock_model_available() +# +# queue <- test_queue(workers = 1) +# api <- api_build(queue) +# +# ## Run the model +# payload <- setup_payload_submit() +# res <- api$request("POST", "/model/submit", body = payload) +# +# expect_equal(res$status, 200) +# response <- jsonlite::fromJSON(res$body) +# out <- queue$task_wait(response$data$id) +# +# ## Get result +# res <- api$request("GET", paste0("/model/debug/", response$data$id)) +# +# expect_equal(res$status, 200) +# expect_equal(res$headers$`Content-Type`, "application/octet-stream") +# expect_match(res$headers$`Content-Disposition`, +# 'attachment; filename="\\w+_\\d+-\\d+_naomi_debug.zip"') +# ## Download contains data +# expect_true(length(res$body) > 1000000) +# }) +# +# test_that("endpoint_hintr_version works", { +# endpoint <- endpoint_hintr_version() +# response <- endpoint$run() +# +# expect_type(response$data, "list") +# expect_setequal(names(response$data), c("hintr", "naomi", "rrq", "traduire")) +# expect_equal(response$data$rrq, scalar(as.character(packageVersion("rrq")))) +# }) +# +# test_that("api can call endpoint_hintr_version", { +# test_redis_available() +# +# queue <- test_queue() +# api <- api_build(queue) +# res <- api$request("GET", "/hintr/version") +# expect_equal(res$status, 200) +# response <- jsonlite::fromJSON(res$body) +# +# expect_type(response$data, "list") +# expect_setequal(names(response$data), c("hintr", "naomi", "rrq", "traduire")) +# expect_equal(response$data$rrq, as.character(packageVersion("rrq"))) +# }) +# +# test_that("endpoint_hintr_worker_status works", { +# test_redis_available() +# +# queue <- test_queue(workers = 2) +# endpoint <- endpoint_hintr_worker_status(queue) +# response <- endpoint$run() +# +# expect_equal(unlist(response$data, FALSE, FALSE), rep("IDLE", 2)) +# }) +# +# test_that("api can call endpoint_hintr_worker_status", { +# test_redis_available() +# +# queue <- test_queue(workers = 2) +# api <- api_build(queue) +# res <- api$request("GET", "/hintr/worker/status") +# expect_equal(res$status, 200) +# response <- jsonlite::fromJSON(res$body) +# +# expect_equal(unlist(response$data, FALSE, FALSE), rep("IDLE", 2)) +# }) +# +# test_that("endpoint_hintr_stop works", { +# test_redis_available() +# +# queue <- test_queue() +# mock_hintr_stop <- mockery::mock(function() NULL) +# mockery::stub(endpoint_hintr_stop, "hintr_stop", mock_hintr_stop) +# endpoint <- endpoint_hintr_stop(queue) +# response <- endpoint$run() +# +# mockery::expect_called(mock_hintr_stop, 1) +# }) +# +# test_that("api can call endpoint_hintr_stop", { +# test_redis_available() +# +# queue <- test_queue() +# mock_hintr_stop <- mockery::mock(function() NULL) +# with_mocked_bindings({ +# api <- api_build(queue) +# res <- api$request("POST", "/hintr/stop") +# }, hintr_stop = mock_hintr_stop) +# expect_equal(res$status, 200) +# mockery::expect_called(mock_hintr_stop, 1) +# }) +# +# test_that("404 errors have sensible schema", { +# test_redis_available() +# +# queue <- test_queue() +# api <- api_build(queue) +# res <- api$request("GET", "/meaning-of-life") +# +# expect_equal(res$status, 404) +# response <- jsonlite::fromJSON(res$body) +# expect_equal(response$status, "failure") +# expect_equal(response$errors[1, "error"], "NOT_FOUND") +# expect_equal(response$errors[1, "detail"], +# "GET /meaning-of-life is not a valid hintr path") +# expect_equal(response$data, setNames(list(), list())) +# }) +# +# test_that("model calibrate can be queued and result returned", { +# test_mock_model_available() +# q <- test_queue_result() +# +# ## Submit calibrate request +# submit <- endpoint_model_calibrate_submit(q$queue) +# payload <- setup_payload_calibrate() +# submit_response <- submit$run(q$model_run_id, payload) +# +# expect_equal(submit_response$status_code, 200) +# expect_true(!is.null(submit_response$data$id)) +# +# ## Status +# out <- q$queue$task_wait(submit_response$data$id) +# status <- endpoint_model_calibrate_status(q$queue) +# status_response <- status$run(submit_response$data$id) +# +# expect_equal(status_response$status_code, 200) +# expect_equal(status_response$data$id, submit_response$data$id) +# expect_true(status_response$data$done) +# expect_equal(status_response$data$status, scalar("COMPLETE")) +# expect_true(status_response$data$success) +# expect_equal(status_response$data$queue, scalar(0)) +# expect_match(status_response$data$progress[[1]], +# "Saving outputs - [\\d.m\\s]+s elapsed", perl = TRUE) +# +# ## Get metadata which includes warnings +# metadata <- endpoint_model_calibrate_metadata(q$queue) +# metadata_response <- metadata$run(status_response$data$id) +# expect_equal(metadata_response$data, +# calibrate_metadata(q$queue)(status_response$data$id)) +# expect_valid_metadata(metadata_response$data) +# +# ## Get data alone +# data <- endpoint_model_calibrate_data(q$queue) +# data_response <- data$run(status_response$data$id) +# expect_equal(colnames(data_response$data$data), +# c("area_id", "sex", "age_group", "calendar_quarter", +# "indicator", "mode", "mean", "lower", "upper")) +# expect_true(nrow(data_response$data$data) > 84042) +# +# ## Get path to data +# path <- endpoint_model_calibrate_result_path(q$queue) +# path_response <- path$run(status_response$data$id) +# expect_equal(path_response$status_code, 200) +# expect_true(file.exists(file.path(q$queue$results_dir, +# path_response$data$path))) +# }) +# +# test_that("api can call endpoint_model_calibrate", { +# test_mock_model_available() +# q <- test_queue_result() +# +# ## Submit calibrate +# api <- api_build(q$queue) +# calibrate_payload <- setup_payload_calibrate() +# submit_res <- api$request("POST", +# paste0("/calibrate/submit/", q$model_run_id), +# body = calibrate_payload) +# +# expect_equal(submit_res$status, 200) +# submit_body <- jsonlite::fromJSON(submit_res$body) +# expect_true(!is.null(submit_body$data$id)) +# +# ## Status +# out <- q$queue$task_wait(submit_body$data$id) +# status_res <- api$request("GET", +# paste0("/calibrate/status/", submit_body$data$id)) +# +# expect_equal(status_res$status, 200) +# status_body <- jsonlite::fromJSON(status_res$body) +# expect_equal(status_body$data$id, submit_body$data$id) +# expect_true(status_body$data$done) +# expect_equal(status_body$data$status, "COMPLETE") +# expect_true(status_body$data$success) +# expect_equal(status_body$data$queue, 0) +# expect_match(status_body$data$progress[[1]], +# "Saving outputs - [\\d.m\\s]+s elapsed", perl = TRUE) +# +# ## Get metadata +# metadata_res <- api$request("GET", paste0("/calibrate/result/metadata/", +# status_body$data$id)) +# expect_equal(metadata_res$status, 200) +# metadata_body <- jsonlite::fromJSON(metadata_res$body) +# expect_equal(names(metadata_body$data), +# c("filterTypes", "indicators", "plotSettingsControl", "warnings")) +# +# ## Get data +# data_res <- api$request("GET", paste0("/calibrate/result/data/", +# status_body$data$id)) +# expect_equal(data_res$status, 200) +# data_body <- jsonlite::fromJSON(data_res$body) +# expect_equal(colnames(data_body$data$data), +# c("area_id", "sex", "age_group", "calendar_quarter", +# "indicator", "mode", "mean", "lower", "upper")) +# expect_true(nrow(data_body$data$data) > 84042) +# +# ## Get path to data +# path_res <- api$request("GET", paste0("/calibrate/result/path/", +# status_body$data$id)) +# expect_equal(path_res$status, 200) +# path_body <- jsonlite::fromJSON(path_res$body) +# expect_true(file.exists(file.path(q$queue$results_dir, path_body$data$path))) +# }) +# +# test_that("can get calibrate plot data", { +# test_mock_model_available() +# test_redis_available() +# q <- test_queue_result() +# +# endpoint <- endpoint_model_calibrate_plot(q$queue) +# response <- endpoint$run(q$calibrate_id) +# +# expect_equal(response$status_code, 200) +# response_data <- response$data +# expect_setequal(names(response_data), c("data", "metadata")) +# expect_setequal(names(response_data$data), +# c("data_type", "spectrum_region_code", "spectrum_region_name", +# "sex", "age_group", "calendar_quarter", "indicator", +# "mean")) +# expect_true(nrow(response_data$data) > 0) +# expect_calibrate_plot_metadata(response_data$metadata) +# }) +# +# test_that("API can return calibration plotting data", { +# test_redis_available() +# test_mock_model_available() +# q <- test_queue_result() +# +# api <- api_build(q$queue) +# res <- api$request("GET", paste0("/calibrate/plot/", q$calibrate_id)) +# expect_equal(res$status, 200) +# body <- jsonlite::fromJSON(res$body, simplifyDataFrame = FALSE) +# expect_equal(body$status, "success") +# expect_null(body$errors) +# +# response_data <- body$data +# expect_setequal(names(response_data), c("data", "metadata")) +# data <- do.call(rbind, response_data$data) +# expect_setequal(colnames(data), +# c("data_type", "spectrum_region_code", "spectrum_region_name", +# "sex", "age_group", "calendar_quarter", "indicator", +# "mean")) +# expect_true(nrow(data) > 0) +# expect_calibrate_plot_metadata(response_data$metadata) +# }) +# +# test_that("error returned from calibrate_plot for old model output", { +# test_mock_model_available() +# test_redis_available() +# q <- test_queue_result(calibrate = mock_calibrate_v1.0.7) +# +# endpoint <- endpoint_model_calibrate_plot(q$queue) +# response <- endpoint$run(q$calibrate_id) +# +# expect_equal(response$status_code, 500) +# expect_equal(response$value$errors[[1]]$detail, scalar( +# "Model output out of date please re-run model and try again.")) +# }) +# +# test_that("model calibrate metadata includes warnings", { +# test_mock_model_available() +# q <- test_queue_result() +# +# result <- endpoint_model_calibrate_metadata(q$queue) +# response <- result$run(q$calibrate_id) +# +# expect_equal(response$status_code, 200) +# expect_length(response$data$warnings, 2) +# expect_equal(response$data$warnings[[1]]$text, +# scalar("ART coverage greater than 100% for 10 age groups")) +# expect_equal(response$data$warnings[[1]]$locations, "model_calibrate") +# expect_equal(response$data$warnings[[2]]$text, +# scalar("Prevalence greater than 40%")) +# expect_equal(response$data$warnings[[2]]$locations, +# c("model_calibrate", "review_output")) +# }) +# +# test_that("calibrate plot metadata is translated", { +# test_mock_model_available() +# test_redis_available() +# q <- test_queue_result() +# +# response <- with_hintr_language("fr", { +# endpoint <- endpoint_model_calibrate_plot(q$queue) +# response <- endpoint$run(q$calibrate_id) +# }) +# +# expect_equal(response$status_code, 200) +# +# filters <- response$data$metadata$filterTypes +# expect_equal(filters[[1]]$options[[1]]$label, scalar("Juin 2019")) +# expect_equal(filters[[2]]$options[[1]]$label, scalar("Both")) +# expect_equal(filters[[3]]$options[[1]]$label, scalar("15-49")) +# expect_equal(filters[[4]]$options[1, "label"], "Population") +# expect_equal(filters[[5]]$options[[1]]$label, scalar("Spectrum")) +# expect_equal(filters[[6]]$options[[1]]$label, scalar("Malawi")) +# }) +# +# test_that("can get comparison plot data", { +# test_mock_model_available() +# test_redis_available() +# q <- test_queue_result() +# +# endpoint <- endpoint_comparison_plot(q$queue) +# response <- endpoint$run(q$calibrate_id) +# +# expect_equal(response$status_code, 200) +# response_data <- response$data +# expect_setequal(names(response_data), c("data", "metadata")) +# expect_setequal(names(response_data$data), +# c("area_id", "area_name", "area_level", "age_group", "sex", +# "calendar_quarter", "indicator", "source", "mean", +# "lower", "upper")) +# expect_true(nrow(response_data$data) > 0) +# expect_comparison_metadata(response_data$metadata) +# }) +# +# test_that("API can return comparison plotting data", { +# test_redis_available() +# test_mock_model_available() +# q <- test_queue_result() +# +# api <- api_build(q$queue) +# res <- api$request("GET", paste0("/comparison/plot/", q$calibrate_id)) +# expect_equal(res$status, 200) +# body <- jsonlite::fromJSON(res$body, simplifyDataFrame = FALSE) +# expect_equal(body$status, "success") +# expect_null(body$errors) +# +# response_data <- body$data +# expect_setequal(names(response_data), c("data", "metadata")) +# data <- do.call(rbind, response_data$data) +# expect_setequal(colnames(data), +# c("area_id", "area_name", "area_level", "age_group", "sex", +# "calendar_quarter", "indicator", "source", "mean", +# "lower", "upper")) +# expect_true(nrow(data) > 0) +# expect_comparison_metadata(response_data$metadata) +# }) diff --git a/tests/testthat/test-07-endpoints-model.R b/tests/testthat/test-07-endpoints-model.R index 962a5ea7..463b04dd 100644 --- a/tests/testthat/test-07-endpoints-model.R +++ b/tests/testthat/test-07-endpoints-model.R @@ -163,7 +163,7 @@ test_that("erroring model run returns useful messages", { test_redis_available() ## Call the endpoint - queue <- MockQueue$new() + queue <- MockQueue$new(workers = 1) payload <- setup_payload_submit() model_submit <- submit_model(queue) response <- model_submit(payload) diff --git a/tests/testthat/test-migrations.R b/tests/testthat/test-migrations.R index 26815419..5c8f1da3 100644 --- a/tests/testthat/test-migrations.R +++ b/tests/testthat/test-migrations.R @@ -169,7 +169,7 @@ test_that("all tasks can be migrated", { test_that("only completed tasks are migrated", { test_mock_model_available() ## Setup errored model run - queue <- MockQueue$new() + queue <- MockQueue$new(workers = 1) payload <- setup_payload_submit() model_submit <- submit_model(queue) response <- model_submit(payload) diff --git a/tests/testthat/test-queue.R b/tests/testthat/test-queue.R index a53399f9..aa34700f 100644 --- a/tests/testthat/test-queue.R +++ b/tests/testthat/test-queue.R @@ -2,7 +2,7 @@ test_that("queue works as intended", { test_redis_available() test_mock_model_available() - queue <- Queue$new(timeout = 300, delete_data_on_exit = TRUE) + queue <- Queue$new(timeout = 300, workers = 2, delete_data_on_exit = TRUE) ctrl <- queue$controller expect_equal(rrq::rrq_worker_len(controller = ctrl), 2) @@ -123,7 +123,10 @@ test_that("queue object starts up 2 queues", { }) test_that("calibrate gets run before model running", { - queue <- test_queue(workers = 0) + ## Don't delete data here as we are creating a worker separately which is + ## leading to some race condition on cleanup. Where it is trying to finalize + ## the worker after all redis data has been deleted terminating the R process. + queue <- test_queue(workers = 0, delete_data_on_exit = FALSE) ctrl <- queue$controller worker <- create_blocking_worker(queue$controller) run_id <- queue$submit_model_run(NULL, NULL)