Skip to content

Commit

Permalink
events: fix slow client connection to empty event stream (#10637)
Browse files Browse the repository at this point in the history
* events: fix slow client connection to empty event stream

* doc: fix changelog of event stream connection init
  • Loading branch information
h3yduck authored and Mahmood Ali committed Jun 9, 2021
1 parent a3a7507 commit 8ed26e9
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
4 changes: 3 additions & 1 deletion nomad/stream/ndjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ type JsonStream struct {

// NewJsonStream creates a new json stream that will output Json structs
// to the passed output channel. The constructor starts a goroutine
// to begin heartbeating on its set interval.
// to begin heartbeating on its set interval and also sends an initial heartbeat
// to notify the client about the successful connection initialization.
func NewJsonStream(ctx context.Context, heartbeat time.Duration) *JsonStream {
s := &JsonStream{
ctx: ctx,
outCh: make(chan *structs.EventJson, 10),
heartbeatTick: time.NewTicker(heartbeat),
}

s.outCh <- JsonHeartbeat
go s.heartbeat()

return s
Expand Down
17 changes: 6 additions & 11 deletions nomad/stream/ndjson_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package stream

import (
"bytes"
"context"
"testing"
"time"
Expand All @@ -24,12 +23,12 @@ func TestJsonStream(t *testing.T) {

require.NoError(t, s.Send(testObj{Name: "test"}))

out1 := <-out
initialHeartbeat := <-out
require.Equal(t, []byte(`{}`), initialHeartbeat.Data)

var expected bytes.Buffer
expected.Write([]byte(`{"name":"test"}`))
testMessage1 := <-out
require.Equal(t, []byte(`{"name":"test"}`), testMessage1.Data)

require.Equal(t, expected.Bytes(), out1.Data)
select {
case msg := <-out:
require.Failf(t, "Did not expect another message", "%#v", msg)
Expand All @@ -38,12 +37,8 @@ func TestJsonStream(t *testing.T) {

require.NoError(t, s.Send(testObj{Name: "test2"}))

out2 := <-out
expected.Reset()

expected.Write([]byte(`{"name":"test2"}`))
require.Equal(t, expected.Bytes(), out2.Data)

testMessage2 := <-out
require.Equal(t, []byte(`{"name":"test2"}`), testMessage2.Data)
}

func TestJson_Send_After_Stop(t *testing.T) {
Expand Down

0 comments on commit 8ed26e9

Please sign in to comment.