Skip to content

Commit

Permalink
Add a future example app (#495)
Browse files Browse the repository at this point in the history
* init future example app

* white space

* add hello world future example with notes

* add second future example

* add manual tests and example output

* Update test-future.R

* Update test-future.R

* add backticks
  • Loading branch information
schloerke authored Oct 31, 2019
1 parent 52f2950 commit 23f81b8
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 1 deletion.
2 changes: 1 addition & 1 deletion R/async.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ runStepsUntil <- function(initialValue, errorHandlerStep, conditionFn, steps) {
runStep <- function() {

while (TRUE) {

if (nextStepPos > stepCount) {
return(x)
}
Expand Down
81 changes: 81 additions & 0 deletions inst/examples/14-future/plumber.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@

library(promises)
library(future)

future::plan("multiprocess") # use all available cores
# future::plan(future::multiprocess(workers = 2)) # only two cores

# Quick manual test:
# Within 10 seconds...
# 1. Visit /future
# 2. While /future is loading, visit /sync many times
# /future will not block /sync from being able to be loaded.


#' @json(auto_unbox = TRUE)
#' @get /sync
function() {
# print route, time, and worker pid
paste0("/sync; ", Sys.time(), "; pid:", Sys.getpid())
}

#' @contentType list(type = "text/html")
#' @json(auto_unbox = TRUE)
#' @get /future
function() {

future({
# perform large computations
Sys.sleep(10)

# print route, time, and worker pid
paste0("/future; ", Sys.time(), "; pid:", Sys.getpid())
})
}


# -----------------------------------


# Originally by @antoine-sachet from https://github.com/rstudio/plumber/issues/389
#' @get /divide
#' @json(auto_unbox = TRUE)
#' @param a number
#' @param b number
function(a = NA, b = NA) {
future({
a <- as.numeric(a)
b <- as.numeric(b)
if (is.na(a)) stop("a is missing")
if (is.na(b)) stop("b is missing")
if (b == 0) stop("Cannot divide by 0")

a / b
})
}

#' @get /divide-catch
#' @json(auto_unbox = TRUE)
#' @param a number
#' @param b number
function(a = NA, b = NA) {
future({
a <- as.numeric(a)
b <- as.numeric(b)
if (is.na(a)) stop("a is missing")
if (is.na(b)) stop("b is missing")
if (b == 0) stop("Cannot divide by 0")

a / b
}) %>%
# Handle `future` errors
promises::catch(function(error) {
# handle error here!
if (error$message == "b is missing") {
return(Inf)
}

# rethrow original error
stop(error)
})
}
105 changes: 105 additions & 0 deletions inst/examples/14-future/test-future.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# Instructions:
# 1. `plumb` API - `plumb(system.file("examples/14-future/plumber.R", package = "plumber"))$run(port = 1234)`
# 2. In separate R session, source the test file - `source(system.file("examples/14-future/test-future.R", package = "plumber"))`

local(withAutoprint({ # print when sourced

read_url <- function(...) {
try(readLines(..., warn = FALSE))
}

# example
read_url("http://127.0.0.1:1234/divide?a=6&b=3") # 2
read_url("http://127.0.0.1:1234/divide-catch?a=6&b=3") # 2

# missing 'b' param
read_url("http://127.0.0.1:1234/divide?a=6") # fails
read_url("http://127.0.0.1:1234/divide-catch?a=6") # handles error; returns Inf

# missing 'a' param
read_url("http://127.0.0.1:1234/divide?b=3") # fails
read_url("http://127.0.0.1:1234/divide-catch?b=3") # fails

}))

# --------------------------
# --------------------------

local(withAutoprint({ # print when sourced

counter <- 1
log_file <- "_timings.log"
cat_file <- function(..., append = TRUE) {
Sys.sleep(0.5) # for clean logging purposes only
cat(..., "\n", file = log_file, append = append)
}
curl_route <- function(route) {
Sys.sleep(0.5) # for clean logging purposes only
local_counter <- counter
counter <<- counter + 1
system(
paste0("curl 127.0.0.1:1234/", route, " >> ", log_file, " && echo ' - ", local_counter, "' >> ", log_file),
wait = FALSE
)
}


# Serially call each route
cat_file("--START route requests", append = FALSE)
curl_route("sync")
curl_route("future")
curl_route("sync")
curl_route("sync")
curl_route("sync")
curl_route("future")
curl_route("future") # will wait to execute if only two future processes allowed
curl_route("future")
curl_route("sync")
curl_route("sync")
curl_route("sync")
cat_file("--END route requests\n")


Sys.sleep(21) # wait for everything to finish

# display curl'ed output
cat(readLines(log_file), sep = "\n")


}))
# --------------------------

## Sample output using future::plan(future::multiprocess(workers = 2)) # only two cores
# --START route requests
# "/sync; 2019-10-07 13:11:06; pid:82424" - 1
# "/sync; 2019-10-07 13:11:07; pid:82424" - 3
# "/sync; 2019-10-07 13:11:07; pid:82424" - 4
# "/sync; 2019-10-07 13:11:08; pid:82424" - 5
# --END route requests
#
# "/sync; 2019-10-07 13:11:19; pid:82424" - 9
# "/sync; 2019-10-07 13:11:19; pid:82424" - 10
# "/sync; 2019-10-07 13:11:19; pid:82424" - 11
# "/future; 2019-10-07 13:11:16; pid:37135" - 2
# "/future; 2019-10-07 13:11:18; pid:37148" - 6
# "/future; 2019-10-07 13:11:27; pid:37272" - 7
# "/future; 2019-10-07 13:11:29; pid:37273" - 8


# --------------------------

## Sample output using future::plan("multiprocess") # use all available cores
# --START route requests
# "/sync; 2019-10-07 13:16:22; pid:82424" - 1
# "/sync; 2019-10-07 13:16:23; pid:82424" - 3
# "/sync; 2019-10-07 13:16:24; pid:82424" - 4
# "/sync; 2019-10-07 13:16:25; pid:82424" - 5
# "/sync; 2019-10-07 13:16:27; pid:82424" - 9
# "/sync; 2019-10-07 13:16:27; pid:82424" - 10
# "/sync; 2019-10-07 13:16:28; pid:82424" - 11
# --END route requests
#
# "/future; 2019-10-07 13:16:33; pid:40613" - 2
# "/future; 2019-10-07 13:16:35; pid:40626" - 6
# "/future; 2019-10-07 13:16:36; pid:40630" - 7
# "/future; 2019-10-07 13:16:36; pid:40634" - 8

0 comments on commit 23f81b8

Please sign in to comment.