From 07deee18dc558041b5b273705c3b805a44b14611 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Thu, 23 Jan 2025 17:37:29 -0600 Subject: [PATCH 1/2] Refine `resp_stream_sse()` It's a bit difficult to translate the spec to the behaviour that we should implement, but I think this bullet: > If the data buffer is an empty string, set the data buffer and the event type buffer to the empty string and return. Implies that if the event has no data, we should automatically wait for an event with data. This allows APIs to stream SSEs containing only comments to keep the stream alive. I think it makes sense to handle this hear, rather than requiring the end user to deal with it. `event$data` is also now guaranteed to be a single string, which I think is easier to deal with. Fixes #650 --- NEWS.md | 2 + R/resp-stream.R | 103 +++++++++++++++++++++++++----- man/resp_stream_raw.Rd | 4 +- tests/testthat/test-resp-stream.R | 45 +++++++++++-- 4 files changed, 129 insertions(+), 25 deletions(-) diff --git a/NEWS.md b/NEWS.md index 91d41266..80dfc03a 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,7 @@ # httr2 (development version) +* `resp_stream_sse()` now automatically retrieves the next event if the current event contains no data. The data is now returned as a single string (#650). + # httr2 1.1.0 ## Lifecycle changes diff --git a/R/resp-stream.R b/R/resp-stream.R index 7b2c08c1..52d7b734 100644 --- a/R/resp-stream.R +++ b/R/resp-stream.R @@ -14,7 +14,9 @@ #' @returns #' * `resp_stream_raw()`: a raw vector. #' * `resp_stream_lines()`: a character vector. -#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id` +#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id`. +#' `type`, `data`, and `id` are always strings; `data` and `id` may be empty +#' strings. #' * `resp_stream_aws()`: a list with components `headers` and `body`. #' `body` will be automatically parsed if the event contents a `:content-type` #' header with `application/json`. @@ -101,12 +103,26 @@ resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) { #' @rdname resp_stream_raw #' @order 1 resp_stream_sse <- function(resp, max_size = Inf) { - event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE) - if (is.null(event_bytes)) { - return() + + repeat { + event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE) + if (is.null(event_bytes)) { + return() + } + + if (resp_stream_show_buffer(resp)) { + log_stream( + "Raw server sent event: ----------------------\n", + charToRaw(event_bytes), + "-----------------------------\n", + ) + } + + event <- parse_event(event_bytes) + if (!is.null(event)) + break } - event <- parse_event(event_bytes) if (resp_stream_show_body(resp)) { for (key in names(event)) { log_stream(cli::style_bold(key), ": ", pretty_json(event[[key]])) @@ -329,31 +345,84 @@ resp_boundary_pushback <- function(resp, max_size, boundary_func, include_traile } } +# https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation parse_event <- function(event_data) { - # always treat event_data as UTF-8, it's in the spec - str_data <- rawToChar(event_data) - Encoding(str_data) <- "UTF-8" - # The spec says \r\n, \r, and \n are all valid separators + if (is.raw(event_data)) { + # Streams must be decoded using the UTF-8 decode algorithm. + str_data <- rawToChar(event_data) + Encoding(str_data) <- "UTF-8" + } else { + # for testing + str_data <- event_data + } + + # The stream must then be parsed by reading everything line by line, with a + # U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single + # U+000A LINE FEED (LF) character not preceded by a U+000D CARRIAGE RETURN + # (CR) character, and a single U+000D CARRIAGE RETURN (CR) character not + # followed by a U+000A LINE FEED (LF) character being the ways in + # which a line can end. lines <- strsplit(str_data, "\r\n|\r|\n")[[1]] + # When a stream is parsed, a data buffer, an event type buffer, and a + # last event ID buffer must be associated with it. They must be initialized + # to the empty string. + data <- "" + type <- "" + last_id <- "" + + # If the line starts with a U+003A COLON character (:) - Ignore the line. + lines <- lines[!grepl("^:", lines)] + + # If the line contains a U+003A COLON character (:) + # * Collect the characters on the line before the first U+003A COLON + # character (:), and let field be that string. + # * Collect the characters on the line after the first U+003A COLON character + # (:), and let value be that string. If value starts with a U+0020 SPACE + # character, remove it from value. m <- regexec("([^:]*)(: ?)?(.*)", lines) matches <- regmatches(lines, m) keys <- c("event", vapply(matches, function(x) x[2], character(1))) values <- c("message", vapply(matches, function(x) x[4], character(1))) - remove_dupes <- duplicated(keys, fromLast = TRUE) & keys != "data" - keys <- keys[!remove_dupes] - values <- values[!remove_dupes] + for (i in seq_along(matches)) { + key <- matches[[i]][2] + value <- matches[[i]][4] + + if (key == "event") { + # Set the event type buffer to field value. + type <- value + } else if (key == "data") { + # Append the field value to the data buffer, then append a single + # U+000A LINE FEED (LF) character to the data buffer. + data <- paste0(data, value, "\n") + } else if (key == "id") { + # If the field value does not contain U+0000 NULL, then set the last + # event ID buffer to the field value. Otherwise, ignore the field. + last_id <- value + } + } - event_type <- values[keys == "event"] - data <- values[keys == "data"] - id <- values[keys == "id"] + # If the data buffer is an empty string, set the data buffer and the event + # type buffer to the empty string and return. + if (data == "") { + return() + } + + # If the data buffer's last character is a U+000A LINE FEED (LF) character, + # then remove the last character from the data buffer. + if (grepl("\n$", data)) { + data <- substr(data, 1, nchar(data) - 1) + } + if (type == "") { + type <- "message" + } list( - type = event_type, + type = type, data = data, - id = id + id = last_id ) } diff --git a/man/resp_stream_raw.Rd b/man/resp_stream_raw.Rd index c6a43b6b..48dba2d6 100644 --- a/man/resp_stream_raw.Rd +++ b/man/resp_stream_raw.Rd @@ -40,7 +40,9 @@ EOL.} \itemize{ \item \code{resp_stream_raw()}: a raw vector. \item \code{resp_stream_lines()}: a character vector. -\item \code{resp_stream_sse()}: a list with components \code{type}, \code{data}, and \code{id} +\item \code{resp_stream_sse()}: a list with components \code{type}, \code{data}, and \code{id}. +\code{type}, \code{data}, and \code{id} are always strings; \code{data} and \code{id} may be empty +strings. \item \code{resp_stream_aws()}: a list with components \code{headers} and \code{body}. \code{body} will be automatically parsed if the event contents a \verb{:content-type} header with \code{application/json}. diff --git a/tests/testthat/test-resp-stream.R b/tests/testthat/test-resp-stream.R index a476de30..0fec2246 100644 --- a/tests/testthat/test-resp-stream.R +++ b/tests/testthat/test-resp-stream.R @@ -42,7 +42,7 @@ test_that("can determine if incomplete data is complete", { }) con <- req %>% req_perform_connection(blocking = TRUE) - expect_equal(resp_stream_sse(con, 10), list(type = "message", data = "1", id = character())) + expect_equal(resp_stream_sse(con, 10), list(type = "message", data = "1", id = "")) expect_snapshot(expect_equal(resp_stream_sse(con), NULL)) expect_true(resp_stream_is_complete(con)) close(con) @@ -188,17 +188,31 @@ test_that("can feed sse events one at a time", { expect_equal( resp_stream_sse(resp), - list(type = "message", data = "1", id = character()) + list(type = "message", data = "1", id = "") ) expect_equal( resp_stream_sse(resp), - list(type = "message", data = "2", id = character()) + list(type = "message", data = "2", id = "") ) resp_stream_sse(resp) expect_equal(resp_stream_sse(resp), NULL) }) +test_that("ignores events with no data", { + req <- local_app_request(function(req, res) { + res$send_chunk(": comment\n\n") + res$send_chunk("data: 1\n\n") + }) + resp <- req_perform_connection(req) + withr::defer(close(resp)) + + expect_equal( + resp_stream_sse(resp), + list(type = "message", data = "1", id = "") + ) +}) + test_that("can join sse events across multiple reads", { req <- local_app_request(function(req, res) { res$send_chunk("data: 1\n") @@ -221,17 +235,17 @@ test_that("can join sse events across multiple reads", { Sys.sleep(0.1) out <- resp_stream_sse(resp1) } - expect_equal(out, list(type = "message", data = c("1", "2"), id = character())) + expect_equal(out, list(type = "message", data = "1\n2", id = "")) expect_equal(resp1$cache$push_back, charToRaw("data: 3\n\n")) out <- resp_stream_sse(resp1) - expect_equal(out, list(type = "message", data = "3", id = character())) + expect_equal(out, list(type = "message", data = "3", id = "")) # Blocking waits for a complete event resp2 <- req_perform_connection(req) withr::defer(close(resp2)) out <- resp_stream_sse(resp2) - expect_equal(out, list(type = "message", data = c("1", "2"), id = character())) + expect_equal(out, list(type = "message", data = "1\n2", id = "")) }) test_that("sse always interprets data as UTF-8", { @@ -252,7 +266,7 @@ test_that("sse always interprets data as UTF-8", { s <- "\xE3\x81\x82" Encoding(s) <- "UTF-8" - expect_equal(out, list(type = "message", data = s, id = character())) + expect_equal(out, list(type = "message", data = s, id = "")) expect_equal(Encoding(out$data), "UTF-8") expect_equal(resp1$cache$push_back, raw()) }) @@ -368,3 +382,20 @@ test_that("has a working find_event_boundary", { expect_null(find_event_boundary(charToRaw("12"))) expect_null(find_event_boundary(charToRaw("\r\n\r"))) }) + +# parse_event ---------------------------------------------------------------- + +test_that("event with no data returns NULL", { + expect_null(parse_event("")) + expect_null(parse_event(":comment")) + expect_null(parse_event("id: 1")) + + expect_equal(parse_event("data: ")$data, "") + expect_equal(parse_event("data")$data, "") +}) + +test_that("examples from spec work", { + event <- parse_event("data: YHOO\ndata: +2\ndata: 10") + expect_equal(event$type, "message") + expect_equal(event$data, "YHOO\n+2\n10") +}) From 4fd89fd2522372c302a6eaa59263fc76d093d63b Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Fri, 24 Jan 2025 07:53:11 -0600 Subject: [PATCH 2/2] Test raw event logging --- R/req-perform-connection.R | 4 ++-- R/resp-stream.R | 10 ++++----- man/req_perform_connection.Rd | 4 ++-- tests/testthat/_snaps/resp-stream.md | 33 +++++++++++++++++++++++++--- tests/testthat/test-resp-stream.R | 10 +++++++++ 5 files changed, 49 insertions(+), 12 deletions(-) diff --git a/R/req-perform-connection.R b/R/req-perform-connection.R index 066aa682..623243d7 100644 --- a/R/req-perform-connection.R +++ b/R/req-perform-connection.R @@ -23,8 +23,8 @@ #' * `0`: no output #' * `1`: show headers #' * `2`: show headers and bodies as they're streamed -#' * `3`: show headers, bodies, curl status messages, and stream buffer -#' management +#' * `3`: show headers, bodies, curl status messages, raw SSEs, and stream +#' buffer management #' #' Use [with_verbosity()] to control the verbosity of requests that #' you can't affect directly. diff --git a/R/resp-stream.R b/R/resp-stream.R index 52d7b734..bdbcb44f 100644 --- a/R/resp-stream.R +++ b/R/resp-stream.R @@ -90,7 +90,6 @@ resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) { if (resp_stream_show_body(resp)) { log_stream(lines_read) - cli::cat_line() } lines_read @@ -112,9 +111,9 @@ resp_stream_sse <- function(resp, max_size = Inf) { if (resp_stream_show_buffer(resp)) { log_stream( - "Raw server sent event: ----------------------\n", - charToRaw(event_bytes), - "-----------------------------\n", + cli::rule("Raw server sent event"), "\n", + rawToChar(event_bytes), + prefix = " * " ) } @@ -279,8 +278,9 @@ resp_boundary_pushback <- function(resp, max_size, boundary_func, include_traile resp$cache$push_back <- raw() if (resp_stream_show_buffer(resp)) { + log_stream(cli::rule("Buffer"), prefix = " * ") print_buffer <- function(buf, label) { - cli::cat_line(" * ", label, ": ", paste(as.character(buf), collapse = " ")) + log_stream(label, ": ", paste(as.character(buf), collapse = " "), prefix = " * ") } } else { print_buffer <- function(buf, label) {} diff --git a/man/req_perform_connection.Rd b/man/req_perform_connection.Rd index cc9961b3..03666c4f 100644 --- a/man/req_perform_connection.Rd +++ b/man/req_perform_connection.Rd @@ -19,8 +19,8 @@ around \code{\link[=req_verbose]{req_verbose()}} that uses an integer to control \item \code{0}: no output \item \code{1}: show headers \item \code{2}: show headers and bodies as they're streamed -\item \code{3}: show headers, bodies, curl status messages, and stream buffer -management +\item \code{3}: show headers, bodies, curl status messages, raw SSEs, and stream +buffer management } Use \code{\link[=with_verbosity]{with_verbosity()}} to control the verbosity of requests that diff --git a/tests/testthat/_snaps/resp-stream.md b/tests/testthat/_snaps/resp-stream.md index 11dfe947..9f6e2882 100644 --- a/tests/testthat/_snaps/resp-stream.md +++ b/tests/testthat/_snaps/resp-stream.md @@ -20,9 +20,7 @@ stream_all(req, resp_stream_lines, 1) Output << line 1 - << line 2 - Code stream_all(req, resp_stream_raw, 5 / 1024) Output @@ -40,6 +38,7 @@ resp_stream_lines(con, 1) } Output + * -- Buffer ---------------------------------------------------------------------- * Buffer to parse: * Received chunk: 6c 69 6e 65 20 31 0a 6c 69 6e 65 20 32 0a * Combined buffer: 6c 69 6e 65 20 31 0a 6c 69 6e 65 20 32 0a @@ -47,10 +46,38 @@ * Matched data: 6c 69 6e 65 20 31 0a * Remaining buffer: 6c 69 6e 65 20 32 0a << line 1 - + * -- Buffer ---------------------------------------------------------------------- * Buffer to parse: 6c 69 6e 65 20 32 0a * Matched data: 6c 69 6e 65 20 32 0a * Remaining buffer: << line 2 + +# verbosity = 3 shows raw sse events + + Code + . <- resp_stream_sse(resp) + Output + * -- Buffer ---------------------------------------------------------------------- + * Buffer to parse: + * Received chunk: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a 64 61 74 61 3a 20 31 0a 0a + * Combined buffer: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a 64 61 74 61 3a 20 31 0a 0a + * Buffer to parse: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a 64 61 74 61 3a 20 31 0a 0a + * Matched data: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a + * Remaining buffer: 64 61 74 61 3a 20 31 0a 0a + * -- Raw server sent event ------------------------------------------------------- + * : comment + * + * + * -- Buffer ---------------------------------------------------------------------- + * Buffer to parse: 64 61 74 61 3a 20 31 0a 0a + * Matched data: 64 61 74 61 3a 20 31 0a 0a + * Remaining buffer: + * -- Raw server sent event ------------------------------------------------------- + * data: 1 + * + * + << type: message + << data: 1 + << id: diff --git a/tests/testthat/test-resp-stream.R b/tests/testthat/test-resp-stream.R index 0fec2246..824a0a2f 100644 --- a/tests/testthat/test-resp-stream.R +++ b/tests/testthat/test-resp-stream.R @@ -341,6 +341,16 @@ test_that("verbosity = 3 shows buffer info", { ) }) +test_that("verbosity = 3 shows raw sse events", { + req <- local_app_request(function(req, res) { + res$send_chunk(": comment\n\n") + res$send_chunk("data: 1\n\n") + }) + + resp <- req_perform_connection(req, verbosity = 3) + withr::defer(close(resp)) + expect_snapshot(. <- resp_stream_sse(resp)) +}) test_that("has a working find_event_boundary", { boundary_test <- function(x, matched, remaining) {