Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle long messages in event parser #2

Merged
merged 2 commits into from
Jul 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions internal/parser/reader_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ import (
"io"
)

var splitFunc bufio.SplitFunc = func(data []byte, _ bool) (advance int, token []byte, err error) {
// splitFunc is a split function for a bufio.Scanner that splits a sequence of
// bytes into SSE events. Each event ends with two consecutive newline sequences,
// where a newline sequence is defined as either "\n", "\r", or "\r\n".
var splitFunc bufio.SplitFunc = func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if len(data) == 0 {
return
return 0, nil, nil
}

var start, index, endlineLen int
Expand All @@ -22,16 +25,22 @@ var splitFunc bufio.SplitFunc = func(data []byte, _ bool) (advance int, token []
}
}

if l := len(data); advance < l {
if l := len(data); advance == l && !atEOF {
// We have reached the end of the buffer but have not yet seen two consecutive
// newline sequences, so we request more data.
return 0, nil, nil
} else if advance < l {
// We have found a newline. Consume the end-of-line sequence.
advance++
// Consume one more character if end-of-line is \r\n.
if advance < l && data[advance-1] == '\r' && data[advance] == '\n' {
advance++
}
}

token = data[start:advance]

return
return advance, token, nil
}

// ReaderParser extracts fields from a reader. Reading is buffered using a bufio.Scanner.
Expand Down
10 changes: 10 additions & 0 deletions internal/parser/reader_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func TestReaderParser(t *testing.T) {
expected []parser.Field
}

longString := strings.Repeat("abcdefghijklmnopqrstuvwxyz", 193)

tests := []test{
{
name: "Valid input",
Expand Down Expand Up @@ -67,6 +69,14 @@ data: still, here's some data: you deserve it
newDataField(t, "still, here's some data: you deserve it"),
},
},
{
name: "Valid input with long string",
input: strings.NewReader("\nid:2\ndata:" + longString + "\n"),
expected: []parser.Field{
newIDField(t, "2"),
newDataField(t, longString),
},
},
{
name: "Error",
input: errReader{nil},
Expand Down
30 changes: 30 additions & 0 deletions internal/parser/split_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,33 @@ func TestSplitFunc(t *testing.T) {
t.Fatalf("wrong tokens:\nreceived: %#v\nexpected: %#v", tokens, expected)
}
}

func TestSplitFuncWithLongLine(t *testing.T) {
t.Parallel()

longString := strings.Repeat("abcdef\rghijklmn\nopqrstu\r\nvwxyz", 193)
text := longString + "\n\n" + longString + "\r\r" + longString + "\r\n\r\n" + longString
r := strings.NewReader(text)
s := bufio.NewScanner(r)
s.Split(splitFunc)

expected := []string{
longString + "\n\n",
longString + "\r\r",
longString + "\r\n\r\n",
longString,
}
tokens := make([]string, 0, len(expected))

for s.Scan() {
tokens = append(tokens, s.Text())
}

if s.Err() != nil {
t.Fatalf("an error occurred: %v", s.Err())
}

if !reflect.DeepEqual(tokens, expected) {
t.Fatalf("wrong tokens:\nreceived: %#v\nexpected: %#v", tokens, expected)
}
}