From fb68785f62885157d34ad41e004c23761dbb7188 Mon Sep 17 00:00:00 2001 From: Eric Bannatyne Date: Thu, 6 Jul 2023 22:56:53 -0700 Subject: [PATCH 1/2] Handle long messages in event parser --- internal/parser/reader_parser.go | 8 ++++++- internal/parser/reader_parser_test.go | 10 +++++++++ internal/parser/split_func_test.go | 30 +++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/internal/parser/reader_parser.go b/internal/parser/reader_parser.go index 4713859..40177ab 100644 --- a/internal/parser/reader_parser.go +++ b/internal/parser/reader_parser.go @@ -5,7 +5,7 @@ import ( "io" ) -var splitFunc bufio.SplitFunc = func(data []byte, _ bool) (advance int, token []byte, err error) { +var splitFunc bufio.SplitFunc = func(data []byte, atEOF bool) (advance int, token []byte, err error) { if len(data) == 0 { return } @@ -22,6 +22,12 @@ var splitFunc bufio.SplitFunc = func(data []byte, _ bool) (advance int, token [] } } + if advance == len(data) && !atEOF { + // We have reached the end of the buffer but have not yet seen two consecutive + // newline sequences, so we request more data. + return 0, nil, nil + } + if l := len(data); advance < l { advance++ if advance < l && data[advance-1] == '\r' && data[advance] == '\n' { diff --git a/internal/parser/reader_parser_test.go b/internal/parser/reader_parser_test.go index 4227828..6d47968 100644 --- a/internal/parser/reader_parser_test.go +++ b/internal/parser/reader_parser_test.go @@ -30,6 +30,8 @@ func TestReaderParser(t *testing.T) { expected []parser.Field } + longString := strings.Repeat("abcdefghijklmnopqrstuvwxyz", 193) + tests := []test{ { name: "Valid input", @@ -67,6 +69,14 @@ data: still, here's some data: you deserve it newDataField(t, "still, here's some data: you deserve it"), }, }, + { + name: "Valid input with long string", + input: strings.NewReader("\nid:2\ndata:" + longString + "\n"), + expected: []parser.Field{ + newIDField(t, "2"), + newDataField(t, longString), + }, + }, { name: "Error", input: errReader{nil}, diff --git a/internal/parser/split_func_test.go b/internal/parser/split_func_test.go index 4958d5f..c0d8eb1 100644 --- a/internal/parser/split_func_test.go +++ b/internal/parser/split_func_test.go @@ -34,3 +34,33 @@ func TestSplitFunc(t *testing.T) { t.Fatalf("wrong tokens:\nreceived: %#v\nexpected: %#v", tokens, expected) } } + +func TestSplitFuncWithLongLine(t *testing.T) { + t.Parallel() + + longString := strings.Repeat("abcdef\rghijklmn\nopqrstu\r\nvwxyz", 193) + text := longString + "\n\n" + longString + "\r\r" + longString + "\r\n\r\n" + longString + r := strings.NewReader(text) + s := bufio.NewScanner(r) + s.Split(splitFunc) + + expected := []string{ + longString + "\n\n", + longString + "\r\r", + longString + "\r\n\r\n", + longString, + } + tokens := make([]string, 0, len(expected)) + + for s.Scan() { + tokens = append(tokens, s.Text()) + } + + if s.Err() != nil { + t.Fatalf("an error occurred: %v", s.Err()) + } + + if !reflect.DeepEqual(tokens, expected) { + t.Fatalf("wrong tokens:\nreceived: %#v\nexpected: %#v", tokens, expected) + } +} From c235a393dfa3cf6d2c3a84cd00893c13fee6db9c Mon Sep 17 00:00:00 2001 From: Eric Bannatyne Date: Fri, 7 Jul 2023 19:35:12 -0700 Subject: [PATCH 2/2] Address review comments, add documentation to split function --- internal/parser/reader_parser.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/parser/reader_parser.go b/internal/parser/reader_parser.go index 40177ab..3d702c8 100644 --- a/internal/parser/reader_parser.go +++ b/internal/parser/reader_parser.go @@ -5,9 +5,12 @@ import ( "io" ) +// splitFunc is a split function for a bufio.Scanner that splits a sequence of +// bytes into SSE events. Each event ends with two consecutive newline sequences, +// where a newline sequence is defined as either "\n", "\r", or "\r\n". var splitFunc bufio.SplitFunc = func(data []byte, atEOF bool) (advance int, token []byte, err error) { if len(data) == 0 { - return + return 0, nil, nil } var start, index, endlineLen int @@ -22,14 +25,14 @@ var splitFunc bufio.SplitFunc = func(data []byte, atEOF bool) (advance int, toke } } - if advance == len(data) && !atEOF { + if l := len(data); advance == l && !atEOF { // We have reached the end of the buffer but have not yet seen two consecutive // newline sequences, so we request more data. return 0, nil, nil - } - - if l := len(data); advance < l { + } else if advance < l { + // We have found a newline. Consume the end-of-line sequence. advance++ + // Consume one more character if end-of-line is \r\n. if advance < l && data[advance-1] == '\r' && data[advance] == '\n' { advance++ } @@ -37,7 +40,7 @@ var splitFunc bufio.SplitFunc = func(data []byte, atEOF bool) (advance int, toke token = data[start:advance] - return + return advance, token, nil } // ReaderParser extracts fields from a reader. Reading is buffered using a bufio.Scanner.