From 9dd9d260e7cc85ec44859a023da0c8adfa0b7d02 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Sun, 6 Dec 2020 22:44:42 -0500 Subject: [PATCH 1/5] Use a single goroutine to emit metrics for all instance queues. --- src/aggregator/client/queue.go | 41 +++----------- src/aggregator/client/queue_mock.go | 16 +++++- src/aggregator/client/tcp_client_test.go | 3 +- src/aggregator/client/writer.go | 7 +++ .../client/writer_benchmark_test.go | 1 + src/aggregator/client/writer_mgr.go | 56 ++++++++++++++++++- src/aggregator/client/writer_mgr_test.go | 20 +++++-- src/aggregator/client/writer_mock.go | 16 +++++- 8 files changed, 115 insertions(+), 45 deletions(-) diff --git a/src/aggregator/client/queue.go b/src/aggregator/client/queue.go index b3aadeb2b3..1cda1e45e7 100644 --- a/src/aggregator/client/queue.go +++ b/src/aggregator/client/queue.go @@ -34,12 +34,6 @@ import ( "go.uber.org/zap" ) -const ( - // By default we use 6 buckets for the queue size histogram metrics - // to achieve a good balance between metric granularity and overhead. - defaultQueueSizeNumBuckets = 6 -) - var ( errInstanceQueueClosed = errors.New("instance queue is closed") errWriterQueueFull = errors.New("writer queue is full") @@ -101,6 +95,9 @@ type instanceQueue interface { // Enqueue enqueues a data buffer. Enqueue(buf protobuf.Buffer) error + // Size returns the number of times in the queue. + Size() int + // Close closes the queue, it blocks until the queue is drained. Close() error } @@ -154,9 +151,8 @@ func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue { } q.writeFn = q.conn.Write - q.wg.Add(2) + q.wg.Add(1) go q.drain() - go q.reportQueueSize(iOpts.ReportInterval()) return q } @@ -199,15 +195,15 @@ func (q *queue) Enqueue(buf protobuf.Buffer) error { func (q *queue) Close() error { q.Lock() if q.closed { - q.Unlock() return errInstanceQueueClosed } q.closed = true - close(q.doneCh) - close(q.bufCh) q.Unlock() + close(q.doneCh) q.wg.Wait() + + close(q.bufCh) return nil } @@ -271,24 +267,11 @@ func (q *queue) drain() { } } -func (q *queue) reportQueueSize(reportInterval time.Duration) { - defer q.wg.Done() - - ticker := time.NewTicker(reportInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - q.metrics.queueLen.RecordValue(float64(len(q.bufCh))) - case <-q.doneCh: - return - } - } +func (q *queue) Size() int { + return len(q.bufCh) } type queueMetrics struct { - queueLen tally.Histogram enqueueSuccesses tally.Counter enqueueOldestDropped tally.Counter enqueueCurrentDropped tally.Counter @@ -298,15 +281,9 @@ type queueMetrics struct { } func newQueueMetrics(s tally.Scope, queueSize int) queueMetrics { - numBuckets := defaultQueueSizeNumBuckets - if queueSize < numBuckets { - numBuckets = queueSize - } - buckets := tally.MustMakeLinearValueBuckets(0, float64(queueSize/numBuckets), numBuckets) enqueueScope := s.Tagged(map[string]string{"action": "enqueue"}) connWriteScope := s.Tagged(map[string]string{"action": "conn-write"}) return queueMetrics{ - queueLen: s.Histogram("queue-length", buckets), enqueueSuccesses: enqueueScope.Counter("successes"), enqueueOldestDropped: enqueueScope.Tagged(map[string]string{"drop-type": "oldest"}). Counter("dropped"), diff --git a/src/aggregator/client/queue_mock.go b/src/aggregator/client/queue_mock.go index 906b90625a..b40fbeb653 100644 --- a/src/aggregator/client/queue_mock.go +++ b/src/aggregator/client/queue_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/aggregator/client/queue.go -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -69,6 +69,20 @@ func (mr *MockinstanceQueueMockRecorder) Enqueue(buf interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enqueue", reflect.TypeOf((*MockinstanceQueue)(nil).Enqueue), buf) } +// Size mocks base method +func (m *MockinstanceQueue) Size() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Size") + ret0, _ := ret[0].(int) + return ret0 +} + +// Size indicates an expected call of Size +func (mr *MockinstanceQueueMockRecorder) Size() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockinstanceQueue)(nil).Size)) +} + // Close mocks base method func (m *MockinstanceQueue) Close() error { m.ctrl.T.Helper() diff --git a/src/aggregator/client/tcp_client_test.go b/src/aggregator/client/tcp_client_test.go index a36a5c6020..9907e09b35 100644 --- a/src/aggregator/client/tcp_client_test.go +++ b/src/aggregator/client/tcp_client_test.go @@ -744,7 +744,6 @@ func TestTCPClientClosed(t *testing.T) { c := mustNewTestTCPClient(t, testOptions()) require.NoError(t, c.Close()) - require.Equal(t, errInstanceWriterManagerClosed, c.Close()) } func TestTCPClientCloseSuccess(t *testing.T) { @@ -845,7 +844,7 @@ func testTCPClientOptions() Options { plOpts := placement.NewStagedPlacementWatcherOptions(). SetStagedPlacementStore(store). SetStagedPlacementKey(placementKey). - SetInitWatchTimeout(time.Nanosecond) + SetInitWatchTimeout(time.Millisecond) return NewOptions(). SetClockOptions(clock.NewOptions()). SetConnectionOptions(testConnectionOptions()). diff --git a/src/aggregator/client/writer.go b/src/aggregator/client/writer.go index 4d94a69319..4db58ae8ea 100644 --- a/src/aggregator/client/writer.go +++ b/src/aggregator/client/writer.go @@ -51,6 +51,9 @@ type instanceWriter interface { // Flush flushes any buffered metrics. Flush() error + // QueueSize returns the size of the instance queue. + QueueSize() int + // Close closes the writer. Close() error } @@ -154,6 +157,10 @@ func (w *writer) Close() error { return w.queue.Close() } +func (w *writer) QueueSize() int { + return w.queue.Size() +} + func (w *writer) encodeWithLock( encoder *lockedEncoder, payload payloadUnion, diff --git a/src/aggregator/client/writer_benchmark_test.go b/src/aggregator/client/writer_benchmark_test.go index 37f5194e26..0a44d192b7 100644 --- a/src/aggregator/client/writer_benchmark_test.go +++ b/src/aggregator/client/writer_benchmark_test.go @@ -150,6 +150,7 @@ type testNoOpQueue struct{} func (q testNoOpQueue) Enqueue(protobuf.Buffer) error { return nil } func (q testNoOpQueue) Close() error { return nil } +func (q testNoOpQueue) Size() int { return 0 } type testSerialWriter struct { *writer diff --git a/src/aggregator/client/writer_mgr.go b/src/aggregator/client/writer_mgr.go index 7be59aeedc..99b4d2f2f9 100644 --- a/src/aggregator/client/writer_mgr.go +++ b/src/aggregator/client/writer_mgr.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/m3db/m3/src/cluster/placement" xerrors "github.com/m3db/m3/src/x/errors" @@ -35,6 +36,12 @@ var ( errInstanceWriterManagerClosed = errors.New("instance writer manager closed") ) +const ( + _queueMetricReportInterval = 10 * time.Second + _queueMetricBuckets = 8 + _queueMetricBucketStart = 64 +) + // instanceWriterManager manages instance writers. type instanceWriterManager interface { // AddInstances adds instances. @@ -60,9 +67,15 @@ type instanceWriterManager interface { type writerManagerMetrics struct { instancesAdded tally.Counter instancesRemoved tally.Counter + queueLen tally.Histogram } func newWriterManagerMetrics(scope tally.Scope) writerManagerMetrics { + buckets := append( + tally.ValueBuckets{0}, + tally.MustMakeExponentialValueBuckets(_queueMetricBucketStart, 2, _queueMetricBuckets)..., + ) + return writerManagerMetrics{ instancesAdded: scope.Tagged(map[string]string{ "action": "add", @@ -70,12 +83,14 @@ func newWriterManagerMetrics(scope tally.Scope) writerManagerMetrics { instancesRemoved: scope.Tagged(map[string]string{ "action": "remove", }).Counter("instances"), + queueLen: scope.Histogram("queue-length", buckets), } } type writerManager struct { sync.RWMutex - + wg sync.WaitGroup + doneCh chan struct{} opts Options writers map[string]*refCountedWriter closed bool @@ -83,11 +98,15 @@ type writerManager struct { } func newInstanceWriterManager(opts Options) instanceWriterManager { - return &writerManager{ + wm := &writerManager{ opts: opts, writers: make(map[string]*refCountedWriter), metrics: newWriterManagerMetrics(opts.InstrumentOptions().MetricsScope()), + doneCh: make(chan struct{}), } + wm.wg.Add(1) + go wm.reportMetricsLoop() + return wm } func (mgr *writerManager) AddInstances(instances []placement.Instance) error { @@ -173,14 +192,45 @@ func (mgr *writerManager) Flush() error { func (mgr *writerManager) Close() error { mgr.Lock() - defer mgr.Unlock() + close(mgr.doneCh) if mgr.closed { + mgr.Unlock() return errInstanceWriterManagerClosed } + mgr.closed = true for _, writer := range mgr.writers { writer.Close() } + + mgr.Unlock() + mgr.wg.Wait() + return nil } + +func (mgr *writerManager) reportMetricsLoop() { + defer mgr.wg.Done() + + ticker := time.NewTicker(_queueMetricReportInterval) + defer ticker.Stop() + + for { + select { + case <-mgr.doneCh: + return + case <-ticker.C: + mgr.reportMetrics() + } + } +} + +func (mgr *writerManager) reportMetrics() { + mgr.RLock() + defer mgr.RUnlock() + + for _, writer := range mgr.writers { + mgr.metrics.queueLen.RecordValue(float64(writer.QueueSize())) + } +} diff --git a/src/aggregator/client/writer_mgr_test.go b/src/aggregator/client/writer_mgr_test.go index 8cfe5a1a88..32cc5d1fef 100644 --- a/src/aggregator/client/writer_mgr_test.go +++ b/src/aggregator/client/writer_mgr_test.go @@ -26,11 +26,12 @@ import ( "testing" "time" - "github.com/m3db/m3/src/cluster/placement" - "github.com/m3db/m3/src/x/clock" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/m3db/m3/src/cluster/placement" + "github.com/m3db/m3/src/x/clock" ) var ( @@ -145,6 +146,7 @@ func TestWriterManagerWriteUntimedNoInstances(t *testing.T) { mgr.closed = false err := mgr.Write(testPlacementInstance, 0, payload) require.Error(t, err) + require.NoError(t, mgr.Close()) } func TestWriterManagerWriteUntimedSuccess(t *testing.T) { @@ -249,6 +251,9 @@ func TestWriterManagerCloseAlreadyClosed(t *testing.T) { } func TestWriterManagerCloseSuccess(t *testing.T) { + opts := goleak.IgnoreCurrent() // TODO: other tests don't clean up properly + defer goleak.VerifyNone(t, opts) + mgr := newInstanceWriterManager(testOptions()).(*writerManager) // Add instance list and close. @@ -258,9 +263,12 @@ func TestWriterManagerCloseSuccess(t *testing.T) { require.True(t, clock.WaitUntil(func() bool { for _, w := range mgr.writers { wr := w.instanceWriter.(*writer) - wr.Lock() - defer wr.Unlock() - if !wr.closed { + closed := func() bool { + wr.Lock() + defer wr.Unlock() + return wr.closed + } + if !closed() { return false } } diff --git a/src/aggregator/client/writer_mock.go b/src/aggregator/client/writer_mock.go index 23fd8363d8..7165e24778 100644 --- a/src/aggregator/client/writer_mock.go +++ b/src/aggregator/client/writer_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/aggregator/client/writer.go -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -81,6 +81,20 @@ func (mr *MockinstanceWriterMockRecorder) Flush() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockinstanceWriter)(nil).Flush)) } +// QueueSize mocks base method +func (m *MockinstanceWriter) QueueSize() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueueSize") + ret0, _ := ret[0].(int) + return ret0 +} + +// QueueSize indicates an expected call of QueueSize +func (mr *MockinstanceWriterMockRecorder) QueueSize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueSize", reflect.TypeOf((*MockinstanceWriter)(nil).QueueSize)) +} + // Close mocks base method func (m *MockinstanceWriter) Close() error { m.ctrl.T.Helper() From 15a43f24bb687e3d1b9852ea719b7aec1d5ec9c7 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 7 Dec 2020 11:13:54 -0500 Subject: [PATCH 2/5] fix --- src/aggregator/client/tcp_client_test.go | 1 + src/aggregator/client/writer_mgr.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/aggregator/client/tcp_client_test.go b/src/aggregator/client/tcp_client_test.go index 9907e09b35..b1e5db411d 100644 --- a/src/aggregator/client/tcp_client_test.go +++ b/src/aggregator/client/tcp_client_test.go @@ -744,6 +744,7 @@ func TestTCPClientClosed(t *testing.T) { c := mustNewTestTCPClient(t, testOptions()) require.NoError(t, c.Close()) + require.Equal(t, errInstanceWriterManagerClosed, c.Close()) } func TestTCPClientCloseSuccess(t *testing.T) { diff --git a/src/aggregator/client/writer_mgr.go b/src/aggregator/client/writer_mgr.go index 99b4d2f2f9..249236902a 100644 --- a/src/aggregator/client/writer_mgr.go +++ b/src/aggregator/client/writer_mgr.go @@ -192,7 +192,6 @@ func (mgr *writerManager) Flush() error { func (mgr *writerManager) Close() error { mgr.Lock() - close(mgr.doneCh) if mgr.closed { mgr.Unlock() @@ -204,6 +203,7 @@ func (mgr *writerManager) Close() error { writer.Close() } + close(mgr.doneCh) mgr.Unlock() mgr.wg.Wait() From 2b7f4dec072df99552c7421da1d82d46dc183541 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 7 Dec 2020 11:38:48 -0500 Subject: [PATCH 3/5] fix test --- src/aggregator/client/writer_mgr_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/aggregator/client/writer_mgr_test.go b/src/aggregator/client/writer_mgr_test.go index 32cc5d1fef..67bdaa5f68 100644 --- a/src/aggregator/client/writer_mgr_test.go +++ b/src/aggregator/client/writer_mgr_test.go @@ -164,6 +164,7 @@ func TestWriterManagerWriteUntimedSuccess(t *testing.T) { payloadRes payloadUnion ) writer := NewMockinstanceWriter(ctrl) + writer.EXPECT().QueueSize().AnyTimes() writer.EXPECT(). Write(gomock.Any(), gomock.Any()). DoAndReturn(func( @@ -216,6 +217,7 @@ func TestWriterManagerFlushPartialError(t *testing.T) { ) writer1 := NewMockinstanceWriter(ctrl) + writer1.EXPECT().QueueSize().AnyTimes() writer1.EXPECT(). Flush(). DoAndReturn(func() error { @@ -224,6 +226,7 @@ func TestWriterManagerFlushPartialError(t *testing.T) { }) errTestFlush := errors.New("test flush error") writer2 := NewMockinstanceWriter(ctrl) + writer2.EXPECT().QueueSize().AnyTimes() writer2.EXPECT(). Flush(). DoAndReturn(func() error { From 12dbdcb431d8a8cb1a9e9ef9eaab4899a5620875 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 7 Dec 2020 14:31:44 -0500 Subject: [PATCH 4/5] feedback --- src/aggregator/client/queue.go | 6 ++++-- src/aggregator/client/writer_mgr_test.go | 11 +++++------ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/aggregator/client/queue.go b/src/aggregator/client/queue.go index 1cda1e45e7..07c81d57d7 100644 --- a/src/aggregator/client/queue.go +++ b/src/aggregator/client/queue.go @@ -152,7 +152,10 @@ func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue { q.writeFn = q.conn.Write q.wg.Add(1) - go q.drain() + go func() { + defer q.wg.Done() + q.drain() + }() return q } @@ -226,7 +229,6 @@ func (q *queue) writeAndReset() { } func (q *queue) drain() { - defer q.wg.Done() defer q.conn.Close() timer := time.NewTimer(q.batchFlushDeadline) lastDrain := time.Now() diff --git a/src/aggregator/client/writer_mgr_test.go b/src/aggregator/client/writer_mgr_test.go index 67bdaa5f68..17d962ef6d 100644 --- a/src/aggregator/client/writer_mgr_test.go +++ b/src/aggregator/client/writer_mgr_test.go @@ -266,12 +266,11 @@ func TestWriterManagerCloseSuccess(t *testing.T) { require.True(t, clock.WaitUntil(func() bool { for _, w := range mgr.writers { wr := w.instanceWriter.(*writer) - closed := func() bool { - wr.Lock() - defer wr.Unlock() - return wr.closed - } - if !closed() { + wr.Lock() + closed := wr.closed + wr.Unlock() + + if !closed { return false } } From d267ea9caa627062ae8877605cf8316df7d2dcd1 Mon Sep 17 00:00:00 2001 From: Vytenis Darulis Date: Mon, 7 Dec 2020 14:32:45 -0500 Subject: [PATCH 5/5] typofix --- src/aggregator/client/queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aggregator/client/queue.go b/src/aggregator/client/queue.go index 07c81d57d7..d1dbcb88b4 100644 --- a/src/aggregator/client/queue.go +++ b/src/aggregator/client/queue.go @@ -95,7 +95,7 @@ type instanceQueue interface { // Enqueue enqueues a data buffer. Enqueue(buf protobuf.Buffer) error - // Size returns the number of times in the queue. + // Size returns the number of items in the queue. Size() int // Close closes the queue, it blocks until the queue is drained.