From 6b083c20c8e7f0c57b5d8a8d1dd85d8688ba1649 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Szabolcs=20Gelencs=C3=A9r?= Date: Fri, 21 May 2021 19:17:07 +0200 Subject: [PATCH] events: fix slow client connection to empty event stream (#10637) * events: fix slow client connection to empty event stream * doc: fix changelog of event stream connection init --- CHANGELOG.md | 1 + nomad/stream/ndjson.go | 4 +++- nomad/stream/ndjson_test.go | 17 ++++++----------- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9dcc6c85707..737116c7d711 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 1.1.1 (Unreleased) BUG FIXES: +* api: Fixed event stream connection initialization when there are no events to send [[GH-10637](https://github.com/hashicorp/nomad/issues/10637)] * cli: Fixed a bug where `quota status` and `namespace status` commands may panic if the CLI targets a pre-1.1.0 cluster ## 1.1.0 (May 18, 2021) diff --git a/nomad/stream/ndjson.go b/nomad/stream/ndjson.go index ec69a6c1cb99..c3ea3e7ddb00 100644 --- a/nomad/stream/ndjson.go +++ b/nomad/stream/ndjson.go @@ -33,7 +33,8 @@ type JsonStream struct { // NewJsonStream creates a new json stream that will output Json structs // to the passed output channel. The constructor starts a goroutine -// to begin heartbeating on its set interval. +// to begin heartbeating on its set interval and also sends an initial heartbeat +// to notify the client about the successful connection initialization. func NewJsonStream(ctx context.Context, heartbeat time.Duration) *JsonStream { s := &JsonStream{ ctx: ctx, @@ -41,6 +42,7 @@ func NewJsonStream(ctx context.Context, heartbeat time.Duration) *JsonStream { heartbeatTick: time.NewTicker(heartbeat), } + s.outCh <- JsonHeartbeat go s.heartbeat() return s diff --git a/nomad/stream/ndjson_test.go b/nomad/stream/ndjson_test.go index 0c7c4de787dd..95bc2b23a045 100644 --- a/nomad/stream/ndjson_test.go +++ b/nomad/stream/ndjson_test.go @@ -1,7 +1,6 @@ package stream import ( - "bytes" "context" "testing" "time" @@ -24,12 +23,12 @@ func TestJsonStream(t *testing.T) { require.NoError(t, s.Send(testObj{Name: "test"})) - out1 := <-out + initialHeartbeat := <-out + require.Equal(t, []byte(`{}`), initialHeartbeat.Data) - var expected bytes.Buffer - expected.Write([]byte(`{"name":"test"}`)) + testMessage1 := <-out + require.Equal(t, []byte(`{"name":"test"}`), testMessage1.Data) - require.Equal(t, expected.Bytes(), out1.Data) select { case msg := <-out: require.Failf(t, "Did not expect another message", "%#v", msg) @@ -38,12 +37,8 @@ func TestJsonStream(t *testing.T) { require.NoError(t, s.Send(testObj{Name: "test2"})) - out2 := <-out - expected.Reset() - - expected.Write([]byte(`{"name":"test2"}`)) - require.Equal(t, expected.Bytes(), out2.Data) - + testMessage2 := <-out + require.Equal(t, []byte(`{"name":"test2"}`), testMessage2.Data) } func TestJson_Send_After_Stop(t *testing.T) {