Skip to content

Commit

Permalink
Fix usage of split_inclusive in event parser (#31)
Browse files Browse the repository at this point in the history
* Use cimg/rust 1.59

* Fix usage of split_inclusive in event parser logic

It was relying on old behavior where Some(empty slice) would be returned
if there were no matches. This was updated in more recent rust versions to return None,
but CircleCI was using a much older version of rust than I have locally.

* Add a property-test to exercise event parsing logic
  • Loading branch information
cwaldren-ld authored Mar 30, 2022
1 parent 94b9742 commit 08a3496
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 37 deletions.
14 changes: 5 additions & 9 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@ version: 2
jobs:
build:
docker:
- image: circleci/rust:latest

- image: cimg/rust:1.59.0
steps:
- checkout

- run:
name: Install clippy
command: rustup component add clippy
- run:
name: Install rustfmt
command: rustup component add rustfmt

name: Check Version
command: |
cargo --version
rustc --version
- run:
name: Check consistent formatting
command: cargo fmt && git diff --exit-code
Expand Down
2 changes: 2 additions & 0 deletions eventsource-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ maplit = "1.0.1"
simplelog = "0.5.3"
tokio = { version = "1.2.0", features = ["macros", "rt-multi-thread"] }
test-case = "1.2.3"
proptest = "1.0.0"


[features]
default = ["rustls"]
Expand Down
63 changes: 35 additions & 28 deletions eventsource-client/src/event_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,42 +281,39 @@ impl EventParser {
// incomplete lines from previous chunks.
fn decode_and_buffer_lines(&mut self, chunk: Bytes) {
let mut lines = chunk.split_inclusive(|&b| b == b'\n' || b == b'\r');

// The first and last elements in this split are special. The spec requires lines to be
// terminated. But lines may span chunks, so:
// * the last line, if non-empty (i.e. if chunk didn't end with a line terminator),
// should be buffered as an incomplete line
// * the first line should be appended to the incomplete line, if any

if let Some(incomplete_line) = self.incomplete_line.as_mut() {
let line = lines
.next()
// split always returns at least one item
.unwrap();
trace!(
"extending line from previous chunk: {:?}+{:?}",
logify(incomplete_line),
logify(line)
);
if let Some(line) = lines.next() {
trace!(
"extending line from previous chunk: {:?}+{:?}",
logify(incomplete_line),
logify(line)
);

self.last_char_was_cr = false;
if !line.is_empty() {
// Checking the last character handles lines where the last character is a
// terminator, but also where the entire line is a terminator.
match line.last().unwrap() {
b'\r' => {
incomplete_line.extend_from_slice(&line[..line.len() - 1]);
let il = self.incomplete_line.take();
self.complete_lines.push_back(il.unwrap());
self.last_char_was_cr = true;
}
b'\n' => {
incomplete_line.extend_from_slice(&line[..line.len() - 1]);
let il = self.incomplete_line.take();
self.complete_lines.push_back(il.unwrap());
}
_ => incomplete_line.extend_from_slice(line),
};
self.last_char_was_cr = false;
if !line.is_empty() {
// Checking the last character handles lines where the last character is a
// terminator, but also where the entire line is a terminator.
match line.last().unwrap() {
b'\r' => {
incomplete_line.extend_from_slice(&line[..line.len() - 1]);
let il = self.incomplete_line.take();
self.complete_lines.push_back(il.unwrap());
self.last_char_was_cr = true;
}
b'\n' => {
incomplete_line.extend_from_slice(&line[..line.len() - 1]);
let il = self.incomplete_line.take();
self.complete_lines.push_back(il.unwrap());
}
_ => incomplete_line.extend_from_slice(line),
};
}
}
}

Expand Down Expand Up @@ -375,6 +372,7 @@ impl EventParser {
#[cfg(test)]
mod tests {
use super::{Error::*, *};
use proptest::proptest;
use test_case::test_case;

fn field<'a>(key: &'a str, value: &'a str) -> Result<Option<(&'a str, &'a str)>> {
Expand Down Expand Up @@ -671,4 +669,13 @@ mod tests {
std::fs::read(format!("test-data/{}", name))
.unwrap_or_else(|_| panic!("couldn't read {}", name))
}

proptest! {
#[test]
fn test_decode_and_buffer_lines_does_not_crash(next in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)", previous in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)") {
let mut parser = EventParser::new();
parser.incomplete_line = Some(previous.as_bytes().to_vec());
parser.decode_and_buffer_lines(Bytes::from(next));
}
}
}

0 comments on commit 08a3496

Please sign in to comment.