Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine resp_stream_sse() #652

Merged
merged 3 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# httr2 (development version)

* `aws_v4_signature()` now works if url contains query parameters (@
jeffreyzuber, #645).
* `resp_stream_sse()` now automatically retrieves the next event if the current event contains no data. The data is now returned as a single string (#650).
* `aws_v4_signature()` now works if url contains query parameters (@jeffreyzuber, #645).

# httr2 1.1.0

Expand Down
4 changes: 2 additions & 2 deletions R/req-perform-connection.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
#' * `0`: no output
#' * `1`: show headers
#' * `2`: show headers and bodies as they're streamed
#' * `3`: show headers, bodies, curl status messages, and stream buffer
#' management
#' * `3`: show headers, bodies, curl status messages, raw SSEs, and stream
#' buffer management
#'
#' Use [with_verbosity()] to control the verbosity of requests that
#' you can't affect directly.
Expand Down
107 changes: 88 additions & 19 deletions R/resp-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
#' @returns
#' * `resp_stream_raw()`: a raw vector.
#' * `resp_stream_lines()`: a character vector.
#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id`
#' * `resp_stream_sse()`: a list with components `type`, `data`, and `id`.
#' `type`, `data`, and `id` are always strings; `data` and `id` may be empty
#' strings.
#' * `resp_stream_aws()`: a list with components `headers` and `body`.
#' `body` will be automatically parsed if the event contents a `:content-type`
#' header with `application/json`.
Expand Down Expand Up @@ -88,7 +90,6 @@ resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {

if (resp_stream_show_body(resp)) {
log_stream(lines_read)
cli::cat_line()
}

lines_read
Expand All @@ -101,12 +102,26 @@ resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {
#' @rdname resp_stream_raw
#' @order 1
resp_stream_sse <- function(resp, max_size = Inf) {
event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE)
if (is.null(event_bytes)) {
return()

repeat {
event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE)
if (is.null(event_bytes)) {
return()
}

if (resp_stream_show_buffer(resp)) {
log_stream(
cli::rule("Raw server sent event"), "\n",
rawToChar(event_bytes),
prefix = " * "
)
}

event <- parse_event(event_bytes)
if (!is.null(event))
break
}

event <- parse_event(event_bytes)
if (resp_stream_show_body(resp)) {
for (key in names(event)) {
log_stream(cli::style_bold(key), ": ", pretty_json(event[[key]]))
Expand Down Expand Up @@ -263,8 +278,9 @@ resp_boundary_pushback <- function(resp, max_size, boundary_func, include_traile
resp$cache$push_back <- raw()

if (resp_stream_show_buffer(resp)) {
log_stream(cli::rule("Buffer"), prefix = " * ")
print_buffer <- function(buf, label) {
cli::cat_line(" * ", label, ": ", paste(as.character(buf), collapse = " "))
log_stream(label, ": ", paste(as.character(buf), collapse = " "), prefix = " * ")
}
} else {
print_buffer <- function(buf, label) {}
Expand Down Expand Up @@ -329,31 +345,84 @@ resp_boundary_pushback <- function(resp, max_size, boundary_func, include_traile
}
}

# https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
parse_event <- function(event_data) {
# always treat event_data as UTF-8, it's in the spec
str_data <- rawToChar(event_data)
Encoding(str_data) <- "UTF-8"

# The spec says \r\n, \r, and \n are all valid separators
if (is.raw(event_data)) {
# Streams must be decoded using the UTF-8 decode algorithm.
str_data <- rawToChar(event_data)
Encoding(str_data) <- "UTF-8"
} else {
# for testing
str_data <- event_data
}

# The stream must then be parsed by reading everything line by line, with a
# U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single
# U+000A LINE FEED (LF) character not preceded by a U+000D CARRIAGE RETURN
# (CR) character, and a single U+000D CARRIAGE RETURN (CR) character not
# followed by a U+000A LINE FEED (LF) character being the ways in
# which a line can end.
lines <- strsplit(str_data, "\r\n|\r|\n")[[1]]

# When a stream is parsed, a data buffer, an event type buffer, and a
# last event ID buffer must be associated with it. They must be initialized
# to the empty string.
data <- ""
type <- ""
last_id <- ""

# If the line starts with a U+003A COLON character (:) - Ignore the line.
lines <- lines[!grepl("^:", lines)]

# If the line contains a U+003A COLON character (:)
# * Collect the characters on the line before the first U+003A COLON
# character (:), and let field be that string.
# * Collect the characters on the line after the first U+003A COLON character
# (:), and let value be that string. If value starts with a U+0020 SPACE
# character, remove it from value.
m <- regexec("([^:]*)(: ?)?(.*)", lines)
matches <- regmatches(lines, m)
keys <- c("event", vapply(matches, function(x) x[2], character(1)))
values <- c("message", vapply(matches, function(x) x[4], character(1)))

remove_dupes <- duplicated(keys, fromLast = TRUE) & keys != "data"
keys <- keys[!remove_dupes]
values <- values[!remove_dupes]
for (i in seq_along(matches)) {
key <- matches[[i]][2]
value <- matches[[i]][4]

if (key == "event") {
# Set the event type buffer to field value.
type <- value
} else if (key == "data") {
# Append the field value to the data buffer, then append a single
# U+000A LINE FEED (LF) character to the data buffer.
data <- paste0(data, value, "\n")
} else if (key == "id") {
# If the field value does not contain U+0000 NULL, then set the last
# event ID buffer to the field value. Otherwise, ignore the field.
last_id <- value
}
}

event_type <- values[keys == "event"]
data <- values[keys == "data"]
id <- values[keys == "id"]
# If the data buffer is an empty string, set the data buffer and the event
# type buffer to the empty string and return.
if (data == "") {
return()
}

# If the data buffer's last character is a U+000A LINE FEED (LF) character,
# then remove the last character from the data buffer.
if (grepl("\n$", data)) {
data <- substr(data, 1, nchar(data) - 1)
}
if (type == "") {
type <- "message"
}

list(
type = event_type,
type = type,
data = data,
id = id
id = last_id
)
}

Expand Down
4 changes: 2 additions & 2 deletions man/req_perform_connection.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion man/resp_stream_raw.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 30 additions & 3 deletions tests/testthat/_snaps/resp-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
stream_all(req, resp_stream_lines, 1)
Output
<< line 1

<< line 2

Code
stream_all(req, resp_stream_raw, 5 / 1024)
Output
Expand All @@ -40,17 +38,46 @@
resp_stream_lines(con, 1)
}
Output
* -- Buffer ----------------------------------------------------------------------
* Buffer to parse:
* Received chunk: 6c 69 6e 65 20 31 0a 6c 69 6e 65 20 32 0a
* Combined buffer: 6c 69 6e 65 20 31 0a 6c 69 6e 65 20 32 0a
* Buffer to parse: 6c 69 6e 65 20 31 0a 6c 69 6e 65 20 32 0a
* Matched data: 6c 69 6e 65 20 31 0a
* Remaining buffer: 6c 69 6e 65 20 32 0a
<< line 1

* -- Buffer ----------------------------------------------------------------------
* Buffer to parse: 6c 69 6e 65 20 32 0a
* Matched data: 6c 69 6e 65 20 32 0a
* Remaining buffer:
<< line 2

# verbosity = 3 shows raw sse events

Code
. <- resp_stream_sse(resp)
Output
* -- Buffer ----------------------------------------------------------------------
* Buffer to parse:
* Received chunk: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a 64 61 74 61 3a 20 31 0a 0a
* Combined buffer: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a 64 61 74 61 3a 20 31 0a 0a
* Buffer to parse: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a 64 61 74 61 3a 20 31 0a 0a
* Matched data: 3a 20 63 6f 6d 6d 65 6e 74 0a 0a
* Remaining buffer: 64 61 74 61 3a 20 31 0a 0a
* -- Raw server sent event -------------------------------------------------------
* : comment
*
*
* -- Buffer ----------------------------------------------------------------------
* Buffer to parse: 64 61 74 61 3a 20 31 0a 0a
* Matched data: 64 61 74 61 3a 20 31 0a 0a
* Remaining buffer:
* -- Raw server sent event -------------------------------------------------------
* data: 1
*
*
<< type: message
<< data: 1
<< id:


55 changes: 48 additions & 7 deletions tests/testthat/test-resp-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ test_that("can determine if incomplete data is complete", {
})

con <- req %>% req_perform_connection(blocking = TRUE)
expect_equal(resp_stream_sse(con, 10), list(type = "message", data = "1", id = character()))
expect_equal(resp_stream_sse(con, 10), list(type = "message", data = "1", id = ""))
expect_snapshot(expect_equal(resp_stream_sse(con), NULL))
expect_true(resp_stream_is_complete(con))
close(con)
Expand Down Expand Up @@ -188,17 +188,31 @@ test_that("can feed sse events one at a time", {

expect_equal(
resp_stream_sse(resp),
list(type = "message", data = "1", id = character())
list(type = "message", data = "1", id = "")
)
expect_equal(
resp_stream_sse(resp),
list(type = "message", data = "2", id = character())
list(type = "message", data = "2", id = "")
)
resp_stream_sse(resp)

expect_equal(resp_stream_sse(resp), NULL)
})

test_that("ignores events with no data", {
req <- local_app_request(function(req, res) {
res$send_chunk(": comment\n\n")
res$send_chunk("data: 1\n\n")
})
resp <- req_perform_connection(req)
withr::defer(close(resp))

expect_equal(
resp_stream_sse(resp),
list(type = "message", data = "1", id = "")
)
})

test_that("can join sse events across multiple reads", {
req <- local_app_request(function(req, res) {
res$send_chunk("data: 1\n")
Expand All @@ -221,17 +235,17 @@ test_that("can join sse events across multiple reads", {
Sys.sleep(0.1)
out <- resp_stream_sse(resp1)
}
expect_equal(out, list(type = "message", data = c("1", "2"), id = character()))
expect_equal(out, list(type = "message", data = "1\n2", id = ""))
expect_equal(resp1$cache$push_back, charToRaw("data: 3\n\n"))
out <- resp_stream_sse(resp1)
expect_equal(out, list(type = "message", data = "3", id = character()))
expect_equal(out, list(type = "message", data = "3", id = ""))

# Blocking waits for a complete event
resp2 <- req_perform_connection(req)
withr::defer(close(resp2))

out <- resp_stream_sse(resp2)
expect_equal(out, list(type = "message", data = c("1", "2"), id = character()))
expect_equal(out, list(type = "message", data = "1\n2", id = ""))
})

test_that("sse always interprets data as UTF-8", {
Expand All @@ -252,7 +266,7 @@ test_that("sse always interprets data as UTF-8", {

s <- "\xE3\x81\x82"
Encoding(s) <- "UTF-8"
expect_equal(out, list(type = "message", data = s, id = character()))
expect_equal(out, list(type = "message", data = s, id = ""))
expect_equal(Encoding(out$data), "UTF-8")
expect_equal(resp1$cache$push_back, raw())
})
Expand Down Expand Up @@ -327,6 +341,16 @@ test_that("verbosity = 3 shows buffer info", {
)
})

test_that("verbosity = 3 shows raw sse events", {
req <- local_app_request(function(req, res) {
res$send_chunk(": comment\n\n")
res$send_chunk("data: 1\n\n")
})

resp <- req_perform_connection(req, verbosity = 3)
withr::defer(close(resp))
expect_snapshot(. <- resp_stream_sse(resp))
})

test_that("has a working find_event_boundary", {
boundary_test <- function(x, matched, remaining) {
Expand Down Expand Up @@ -368,3 +392,20 @@ test_that("has a working find_event_boundary", {
expect_null(find_event_boundary(charToRaw("12")))
expect_null(find_event_boundary(charToRaw("\r\n\r")))
})

# parse_event ----------------------------------------------------------------

test_that("event with no data returns NULL", {
expect_null(parse_event(""))
expect_null(parse_event(":comment"))
expect_null(parse_event("id: 1"))

expect_equal(parse_event("data: ")$data, "")
expect_equal(parse_event("data")$data, "")
})

test_that("examples from spec work", {
event <- parse_event("data: YHOO\ndata: +2\ndata: 10")
expect_equal(event$type, "message")
expect_equal(event$data, "YHOO\n+2\n10")
})
Loading