diff --git a/src/aggregator/aggregator/options_test.go b/src/aggregator/aggregator/options_test.go index 3bb5d446d4..16675a8950 100644 --- a/src/aggregator/aggregator/options_test.go +++ b/src/aggregator/aggregator/options_test.go @@ -124,12 +124,9 @@ func TestSetStreamOptions(t *testing.T) { } func TestSetAdminClient(t *testing.T) { - c, err := client.NewClient(client.NewOptions()) - require.NoError(t, err) - value, ok := c.(client.AdminClient) - require.True(t, ok) - o := NewOptions().SetAdminClient(value) - require.True(t, value == o.AdminClient()) + var c client.AdminClient = &client.M3MsgClient{} + o := NewOptions().SetAdminClient(c) + require.True(t, c == o.AdminClient()) } func TestSetRuntimeOptionsManager(t *testing.T) { diff --git a/src/aggregator/client/client.go b/src/aggregator/client/client.go index 34293e664a..63e48e8464 100644 --- a/src/aggregator/client/client.go +++ b/src/aggregator/client/client.go @@ -23,26 +23,11 @@ package client import ( "errors" "fmt" - "math" - "sync" - "time" - "github.com/m3db/m3/src/aggregator/sharding" - "github.com/m3db/m3/src/cluster/placement" - "github.com/m3db/m3/src/cluster/shard" - "github.com/m3db/m3/src/metrics/generated/proto/metricpb" "github.com/m3db/m3/src/metrics/metadata" - "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/aggregated" - "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/metrics/policy" - "github.com/m3db/m3/src/msg/producer" - "github.com/m3db/m3/src/x/clock" - xerrors "github.com/m3db/m3/src/x/errors" - "github.com/m3db/m3/src/x/instrument" - - "github.com/uber-go/tally" ) var ( @@ -111,589 +96,20 @@ type AdminClient interface { ) error } -type clientState int - -const ( - clientUninitialized clientState = iota - clientInitialized - clientClosed -) - -type clientMetrics struct { - writeUntimedCounter instrument.MethodMetrics - writeUntimedBatchTimer instrument.MethodMetrics - writeUntimedGauge instrument.MethodMetrics - writePassthrough instrument.MethodMetrics - writeForwarded instrument.MethodMetrics - flush instrument.MethodMetrics - shardNotOwned tally.Counter - shardNotWriteable tally.Counter -} - -func newClientMetrics( - scope tally.Scope, - opts instrument.TimerOptions, -) clientMetrics { - return clientMetrics{ - writeUntimedCounter: instrument.NewMethodMetrics(scope, "writeUntimedCounter", opts), - writeUntimedBatchTimer: instrument.NewMethodMetrics(scope, "writeUntimedBatchTimer", opts), - writeUntimedGauge: instrument.NewMethodMetrics(scope, "writeUntimedGauge", opts), - writePassthrough: instrument.NewMethodMetrics(scope, "writePassthrough", opts), - writeForwarded: instrument.NewMethodMetrics(scope, "writeForwarded", opts), - flush: instrument.NewMethodMetrics(scope, "flush", opts), - shardNotOwned: scope.Counter("shard-not-owned"), - shardNotWriteable: scope.Counter("shard-not-writeable"), - } -} - -// client partitions metrics and send them via different routes based on their partitions. -type client struct { - sync.RWMutex - - opts Options - aggregatorClientType AggregatorClientType - state clientState - - m3msg m3msgClient - - nowFn clock.NowFn - shardCutoverWarmupDuration time.Duration - shardCutoffLingerDuration time.Duration - writerMgr instanceWriterManager - shardFn sharding.ShardFn - placementWatcher placement.StagedPlacementWatcher - - metrics clientMetrics -} - -type m3msgClient struct { - producer producer.Producer - numShards uint32 - messagePool *messagePool -} - // NewClient creates a new client. func NewClient(opts Options) (Client, error) { if err := opts.Validate(); err != nil { return nil, err } - var ( - clientType = opts.AggregatorClientType() - instrumentOpts = opts.InstrumentOptions() - msgClient m3msgClient - writerMgr instanceWriterManager - placementWatcher placement.StagedPlacementWatcher - ) + clientType := opts.AggregatorClientType() switch clientType { case M3MsgAggregatorClient: - m3msgOpts := opts.M3MsgOptions() - if err := m3msgOpts.Validate(); err != nil { - return nil, err - } - - producer := m3msgOpts.Producer() - if err := producer.Init(); err != nil { - return nil, err - } - - msgClient = m3msgClient{ - producer: producer, - numShards: producer.NumShards(), - messagePool: newMessagePool(), - } - case LegacyAggregatorClient: - writerMgrScope := instrumentOpts.MetricsScope().SubScope("writer-manager") - writerMgrOpts := opts.SetInstrumentOptions(instrumentOpts.SetMetricsScope(writerMgrScope)) - writerMgr = newInstanceWriterManager(writerMgrOpts) - onPlacementsAddedFn := func(placements []placement.Placement) { - for _, placement := range placements { - writerMgr.AddInstances(placement.Instances()) // nolint: errcheck - } - } - onPlacementsRemovedFn := func(placements []placement.Placement) { - for _, placement := range placements { - writerMgr.RemoveInstances(placement.Instances()) // nolint: errcheck - } - } - activeStagedPlacementOpts := placement.NewActiveStagedPlacementOptions(). - SetClockOptions(opts.ClockOptions()). - SetOnPlacementsAddedFn(onPlacementsAddedFn). - SetOnPlacementsRemovedFn(onPlacementsRemovedFn) - placementWatcherOpts := opts.StagedPlacementWatcherOptions(). - SetActiveStagedPlacementOptions(activeStagedPlacementOpts) - placementWatcher = placement.NewStagedPlacementWatcher(placementWatcherOpts) - default: - return nil, fmt.Errorf("unrecognized client type: %v", clientType) - } - - return &client{ - aggregatorClientType: clientType, - m3msg: msgClient, - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - shardCutoverWarmupDuration: opts.ShardCutoverWarmupDuration(), - shardCutoffLingerDuration: opts.ShardCutoffLingerDuration(), - writerMgr: writerMgr, - shardFn: opts.ShardFn(), - placementWatcher: placementWatcher, - metrics: newClientMetrics(instrumentOpts.MetricsScope(), - instrumentOpts.TimerOptions()), - }, nil -} - -func (c *client) Init() error { - c.Lock() - defer c.Unlock() - - if c.state != clientUninitialized { - return errClientIsInitializedOrClosed - } - - switch c.aggregatorClientType { - case M3MsgAggregatorClient: - // Nothing more to do. - case LegacyAggregatorClient: - if err := c.placementWatcher.Watch(); err != nil { - return err - } - default: - return fmt.Errorf("unrecognized client type: %v", c.aggregatorClientType) - } - - c.state = clientInitialized - return nil -} - -func (c *client) WriteUntimedCounter( - counter unaggregated.Counter, - metadatas metadata.StagedMetadatas, -) error { - callStart := c.nowFn() - payload := payloadUnion{ - payloadType: untimedType, - untimed: untimedPayload{ - metric: counter.ToUnion(), - metadatas: metadatas, - }, - } - err := c.write(counter.ID, c.nowNanos(), payload) - c.metrics.writeUntimedCounter.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) - return err -} - -func (c *client) WriteUntimedBatchTimer( - batchTimer unaggregated.BatchTimer, - metadatas metadata.StagedMetadatas, -) error { - callStart := c.nowFn() - payload := payloadUnion{ - payloadType: untimedType, - untimed: untimedPayload{ - metric: batchTimer.ToUnion(), - metadatas: metadatas, - }, - } - err := c.write(batchTimer.ID, c.nowNanos(), payload) - c.metrics.writeUntimedBatchTimer.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) - return err -} - -func (c *client) WriteUntimedGauge( - gauge unaggregated.Gauge, - metadatas metadata.StagedMetadatas, -) error { - callStart := c.nowFn() - payload := payloadUnion{ - payloadType: untimedType, - untimed: untimedPayload{ - metric: gauge.ToUnion(), - metadatas: metadatas, - }, - } - err := c.write(gauge.ID, c.nowNanos(), payload) - c.metrics.writeUntimedGauge.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) - return err -} - -func (c *client) WriteTimed( - metric aggregated.Metric, - metadata metadata.TimedMetadata, -) error { - callStart := c.nowFn() - payload := payloadUnion{ - payloadType: timedType, - timed: timedPayload{ - metric: metric, - metadata: metadata, - }, - } - err := c.write(metric.ID, metric.TimeNanos, payload) - c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) - return err -} - -func (c *client) WritePassthrough( - metric aggregated.Metric, - storagePolicy policy.StoragePolicy, -) error { - callStart := c.nowFn() - payload := payloadUnion{ - payloadType: passthroughType, - passthrough: passthroughPayload{ - metric: metric, - storagePolicy: storagePolicy, - }, - } - err := c.write(metric.ID, metric.TimeNanos, payload) - c.metrics.writePassthrough.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) - return err -} - -func (c *client) WriteTimedWithStagedMetadatas( - metric aggregated.Metric, - metadatas metadata.StagedMetadatas, -) error { - callStart := c.nowFn() - payload := payloadUnion{ - payloadType: timedWithStagedMetadatasType, - timedWithStagedMetadatas: timedWithStagedMetadatas{ - metric: metric, - metadatas: metadatas, - }, - } - err := c.write(metric.ID, metric.TimeNanos, payload) - c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) - return err -} - -func (c *client) WriteForwarded( - metric aggregated.ForwardedMetric, - metadata metadata.ForwardMetadata, -) error { - callStart := c.nowFn() - payload := payloadUnion{ - payloadType: forwardedType, - forwarded: forwardedPayload{ - metric: metric, - metadata: metadata, - }, - } - err := c.write(metric.ID, metric.TimeNanos, payload) - c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) - return err -} - -func (c *client) Flush() error { - var ( - callStart = c.nowFn() - err error - ) - c.RLock() - defer c.RUnlock() - - if c.state != clientInitialized { - return errClientIsUninitializedOrClosed - } - - switch c.aggregatorClientType { - case LegacyAggregatorClient: - err = c.writerMgr.Flush() - c.metrics.flush.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) - } - - return err -} - -func (c *client) Close() error { - c.Lock() - defer c.Unlock() - - if c.state != clientInitialized { - return errClientIsUninitializedOrClosed - } - - var err error - switch c.aggregatorClientType { - case M3MsgAggregatorClient: - c.m3msg.producer.Close(producer.WaitForConsumption) - case LegacyAggregatorClient: - c.placementWatcher.Unwatch() // nolint: errcheck - err = c.writerMgr.Close() - default: - return fmt.Errorf("unrecognized client type: %v", c.aggregatorClientType) - } - - c.state = clientClosed - - return err -} - -func (c *client) write(metricID id.RawID, timeNanos int64, payload payloadUnion) error { - switch c.aggregatorClientType { + return NewM3MsgClient(opts) case LegacyAggregatorClient: - return c.writeLegacy(metricID, timeNanos, payload) - case M3MsgAggregatorClient: - return c.writeM3Msg(metricID, timeNanos, payload) - default: - return fmt.Errorf("unrecognized client type: %v", c.aggregatorClientType) - } -} - -func (c *client) writeLegacy(metricID id.RawID, timeNanos int64, payload payloadUnion) error { - c.RLock() - if c.state != clientInitialized { - c.RUnlock() - return errClientIsUninitializedOrClosed - } - stagedPlacement, onStagedPlacementDoneFn, err := c.placementWatcher.ActiveStagedPlacement() - if err != nil { - c.RUnlock() - return err - } - placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement() - if err != nil { - onStagedPlacementDoneFn() - c.RUnlock() - return err - } - var ( - shardID = c.shardFn(metricID, uint32(placement.NumShards())) - instances = placement.InstancesForShard(shardID) - multiErr = xerrors.NewMultiError() - ) - for _, instance := range instances { - // NB(xichen): the shard should technically always be found because the instances - // are computed from the placement, but protect against errors here regardless. - shard, ok := instance.Shards().Shard(shardID) - if !ok { - err = fmt.Errorf("instance %s does not own shard %d", instance.ID(), shardID) - multiErr = multiErr.Add(err) - c.metrics.shardNotOwned.Inc(1) - continue - } - if !c.shouldWriteForShard(timeNanos, shard) { - c.metrics.shardNotWriteable.Inc(1) - continue - } - if err = c.writerMgr.Write(instance, shardID, payload); err != nil { - multiErr = multiErr.Add(err) - } - } - - onPlacementDoneFn() - onStagedPlacementDoneFn() - c.RUnlock() - return multiErr.FinalError() -} - -func (c *client) writeM3Msg(metricID id.RawID, timeNanos int64, payload payloadUnion) error { - shard := c.shardFn(metricID, c.m3msg.numShards) - - msg := c.m3msg.messagePool.Get() - if err := msg.Encode(shard, payload); err != nil { - msg.Finalize(producer.Dropped) - return err + fallthrough // LegacyAggregatorClient is an alias + case TCPAggregatorClient: + return NewTCPClient(opts) } - - if err := c.m3msg.producer.Produce(msg); err != nil { - msg.Finalize(producer.Dropped) - return err - } - - return nil -} - -func (c *client) shouldWriteForShard(nowNanos int64, shard shard.Shard) bool { - writeEarliestNanos, writeLatestNanos := c.writeTimeRangeFor(shard) - return nowNanos >= writeEarliestNanos && nowNanos <= writeLatestNanos -} - -// writeTimeRangeFor returns the time range for writes going to a given shard. -func (c *client) writeTimeRangeFor(shard shard.Shard) (int64, int64) { - var ( - earliestNanos = int64(0) - latestNanos = int64(math.MaxInt64) - ) - if cutoverNanos := shard.CutoverNanos(); cutoverNanos >= int64(c.shardCutoverWarmupDuration) { - earliestNanos = cutoverNanos - int64(c.shardCutoverWarmupDuration) - } - if cutoffNanos := shard.CutoffNanos(); cutoffNanos <= math.MaxInt64-int64(c.shardCutoffLingerDuration) { - latestNanos = cutoffNanos + int64(c.shardCutoffLingerDuration) - } - return earliestNanos, latestNanos -} - -func (c *client) nowNanos() int64 { - return c.nowFn().UnixNano() -} - -type messagePool struct { - pool sync.Pool -} - -func newMessagePool() *messagePool { - p := &messagePool{} - p.pool.New = func() interface{} { - return newMessage(p) - } - return p -} - -func (m *messagePool) Get() *message { - return m.pool.Get().(*message) -} - -func (m *messagePool) Put(msg *message) { - m.pool.Put(msg) -} - -// Ensure message implements m3msg producer message interface. -var _ producer.Message = (*message)(nil) - -type message struct { - pool *messagePool - shard uint32 - - metric metricpb.MetricWithMetadatas - cm metricpb.CounterWithMetadatas - bm metricpb.BatchTimerWithMetadatas - gm metricpb.GaugeWithMetadatas - fm metricpb.ForwardedMetricWithMetadata - tm metricpb.TimedMetricWithMetadata - tms metricpb.TimedMetricWithMetadatas - - buf []byte -} - -func newMessage(pool *messagePool) *message { - return &message{ - pool: pool, - } -} - -func (m *message) Encode( - shard uint32, - payload payloadUnion, -) error { - m.shard = shard - - switch payload.payloadType { - case untimedType: - switch payload.untimed.metric.Type { - case metric.CounterType: - value := unaggregated.CounterWithMetadatas{ - Counter: payload.untimed.metric.Counter(), - StagedMetadatas: payload.untimed.metadatas, - } - if err := value.ToProto(&m.cm); err != nil { - return err - } - - m.metric = metricpb.MetricWithMetadatas{ - Type: metricpb.MetricWithMetadatas_COUNTER_WITH_METADATAS, - CounterWithMetadatas: &m.cm, - } - case metric.TimerType: - value := unaggregated.BatchTimerWithMetadatas{ - BatchTimer: payload.untimed.metric.BatchTimer(), - StagedMetadatas: payload.untimed.metadatas, - } - if err := value.ToProto(&m.bm); err != nil { - return err - } - - m.metric = metricpb.MetricWithMetadatas{ - Type: metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS, - BatchTimerWithMetadatas: &m.bm, - } - case metric.GaugeType: - value := unaggregated.GaugeWithMetadatas{ - Gauge: payload.untimed.metric.Gauge(), - StagedMetadatas: payload.untimed.metadatas, - } - if err := value.ToProto(&m.gm); err != nil { - return err - } - - m.metric = metricpb.MetricWithMetadatas{ - Type: metricpb.MetricWithMetadatas_GAUGE_WITH_METADATAS, - GaugeWithMetadatas: &m.gm, - } - default: - return fmt.Errorf("unrecognized metric type: %v", - payload.untimed.metric.Type) - } - case forwardedType: - value := aggregated.ForwardedMetricWithMetadata{ - ForwardedMetric: payload.forwarded.metric, - ForwardMetadata: payload.forwarded.metadata, - } - if err := value.ToProto(&m.fm); err != nil { - return err - } - - m.metric = metricpb.MetricWithMetadatas{ - Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA, - ForwardedMetricWithMetadata: &m.fm, - } - case timedType: - value := aggregated.TimedMetricWithMetadata{ - Metric: payload.timed.metric, - TimedMetadata: payload.timed.metadata, - } - if err := value.ToProto(&m.tm); err != nil { - return err - } - - m.metric = metricpb.MetricWithMetadatas{ - Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA, - TimedMetricWithMetadata: &m.tm, - } - case timedWithStagedMetadatasType: - value := aggregated.TimedMetricWithMetadatas{ - Metric: payload.timedWithStagedMetadatas.metric, - StagedMetadatas: payload.timedWithStagedMetadatas.metadatas, - } - if err := value.ToProto(&m.tms); err != nil { - return err - } - - m.metric = metricpb.MetricWithMetadatas{ - Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATAS, - TimedMetricWithMetadatas: &m.tms, - } - default: - return fmt.Errorf("unrecognized payload type: %v", - payload.payloadType) - } - - size := m.metric.Size() - if size > cap(m.buf) { - const growthFactor = 2 - m.buf = make([]byte, int(growthFactor*float64(size))) - } - - // Resize buffer to exactly how long we need for marshalling. - m.buf = m.buf[:size] - - _, err := m.metric.MarshalTo(m.buf) - return err -} - -func (m *message) Shard() uint32 { - return m.shard -} - -func (m *message) Bytes() []byte { - return m.buf -} - -func (m *message) Size() int { - return len(m.buf) -} - -func (m *message) Finalize(reason producer.FinalizeReason) { - // Return to pool. - m.pool.Put(m) + return nil, fmt.Errorf("unrecognized client type: %v", clientType) } diff --git a/src/aggregator/client/m3msg_client.go b/src/aggregator/client/m3msg_client.go new file mode 100644 index 0000000000..01053d83d6 --- /dev/null +++ b/src/aggregator/client/m3msg_client.go @@ -0,0 +1,443 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package client + +import ( + "fmt" + "sync" + + "github.com/uber-go/tally" + + "github.com/m3db/m3/src/aggregator/sharding" + "github.com/m3db/m3/src/metrics/generated/proto/metricpb" + "github.com/m3db/m3/src/metrics/metadata" + "github.com/m3db/m3/src/metrics/metric" + "github.com/m3db/m3/src/metrics/metric/aggregated" + "github.com/m3db/m3/src/metrics/metric/id" + "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/metrics/policy" + "github.com/m3db/m3/src/msg/producer" + "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/instrument" +) + +var _ AdminClient = (*M3MsgClient)(nil) + +// M3MsgClient sends metrics to M3 Aggregator over m3msg. +type M3MsgClient struct { + m3msg m3msgClient + nowFn clock.NowFn + shardFn sharding.ShardFn + metrics m3msgClientMetrics +} + +type m3msgClient struct { + producer producer.Producer + numShards uint32 + messagePool *messagePool +} + +// NewM3MsgClient creates a new M3 Aggregator client that uses M3Msg. +func NewM3MsgClient(opts Options) (Client, error) { + if err := opts.Validate(); err != nil { + return nil, err + } + + var ( + instrumentOpts = opts.InstrumentOptions() + msgClient m3msgClient + ) + m3msgOpts := opts.M3MsgOptions() + if err := m3msgOpts.Validate(); err != nil { + return nil, err + } + + producer := m3msgOpts.Producer() + if err := producer.Init(); err != nil { + return nil, err + } + + msgClient = m3msgClient{ + producer: producer, + numShards: producer.NumShards(), + messagePool: newMessagePool(), + } + return &M3MsgClient{ + m3msg: msgClient, + nowFn: opts.ClockOptions().NowFn(), + shardFn: opts.ShardFn(), + metrics: newM3msgClientMetrics(instrumentOpts.MetricsScope(), + instrumentOpts.TimerOptions()), + }, nil +} + +// Init just satisfies Client interface, M3Msg client does not need explicit initialization. +func (c *M3MsgClient) Init() error { + return nil +} + +// WriteUntimedCounter writes untimed counter metrics. +func (c *M3MsgClient) WriteUntimedCounter( + counter unaggregated.Counter, + metadatas metadata.StagedMetadatas, +) error { + callStart := c.nowFn() + payload := payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: counter.ToUnion(), + metadatas: metadatas, + }, + } + err := c.write(counter.ID, payload) + c.metrics.writeUntimedCounter.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) + return err +} + +// WriteUntimedBatchTimer writes untimed batch timer metrics. +func (c *M3MsgClient) WriteUntimedBatchTimer( + batchTimer unaggregated.BatchTimer, + metadatas metadata.StagedMetadatas, +) error { + callStart := c.nowFn() + payload := payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: batchTimer.ToUnion(), + metadatas: metadatas, + }, + } + err := c.write(batchTimer.ID, payload) + c.metrics.writeUntimedBatchTimer.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) + return err +} + +// WriteUntimedGauge writes untimed gauge metrics. +func (c *M3MsgClient) WriteUntimedGauge( + gauge unaggregated.Gauge, + metadatas metadata.StagedMetadatas, +) error { + callStart := c.nowFn() + payload := payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: gauge.ToUnion(), + metadatas: metadatas, + }, + } + err := c.write(gauge.ID, payload) + c.metrics.writeUntimedGauge.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) + return err +} + +// WriteTimed writes timed metrics. +func (c *M3MsgClient) WriteTimed( + metric aggregated.Metric, + metadata metadata.TimedMetadata, +) error { + callStart := c.nowFn() + payload := payloadUnion{ + payloadType: timedType, + timed: timedPayload{ + metric: metric, + metadata: metadata, + }, + } + err := c.write(metric.ID, payload) + c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) + return err +} + +// WritePassthrough writes passthrough metrics. +func (c *M3MsgClient) WritePassthrough( + metric aggregated.Metric, + storagePolicy policy.StoragePolicy, +) error { + callStart := c.nowFn() + payload := payloadUnion{ + payloadType: passthroughType, + passthrough: passthroughPayload{ + metric: metric, + storagePolicy: storagePolicy, + }, + } + err := c.write(metric.ID, payload) + c.metrics.writePassthrough.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) + return err +} + +// WriteTimedWithStagedMetadatas writes timed metrics with staged metadatas. +func (c *M3MsgClient) WriteTimedWithStagedMetadatas( + metric aggregated.Metric, + metadatas metadata.StagedMetadatas, +) error { + callStart := c.nowFn() + payload := payloadUnion{ + payloadType: timedWithStagedMetadatasType, + timedWithStagedMetadatas: timedWithStagedMetadatas{ + metric: metric, + metadatas: metadatas, + }, + } + err := c.write(metric.ID, payload) + c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) + return err +} + +// WriteForwarded writes forwarded metrics. +func (c *M3MsgClient) WriteForwarded( + metric aggregated.ForwardedMetric, + metadata metadata.ForwardMetadata, +) error { + callStart := c.nowFn() + payload := payloadUnion{ + payloadType: forwardedType, + forwarded: forwardedPayload{ + metric: metric, + metadata: metadata, + }, + } + err := c.write(metric.ID, payload) + c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) + return err +} + +//nolint:gocritic +func (c *M3MsgClient) write(metricID id.RawID, payload payloadUnion) error { + shard := c.shardFn(metricID, c.m3msg.numShards) + + msg := c.m3msg.messagePool.Get() + if err := msg.Encode(shard, payload); err != nil { + msg.Finalize(producer.Dropped) + return err + } + + if err := c.m3msg.producer.Produce(msg); err != nil { + msg.Finalize(producer.Dropped) + return err + } + + return nil +} + +// Flush satisfies Client interface, as M3Msg client does not need explicit flushing. +func (c *M3MsgClient) Flush() error { + return nil +} + +// Close closes the client. +func (c *M3MsgClient) Close() error { + c.m3msg.producer.Close(producer.WaitForConsumption) + return nil +} + +type m3msgClientMetrics struct { + writeUntimedCounter instrument.MethodMetrics + writeUntimedBatchTimer instrument.MethodMetrics + writeUntimedGauge instrument.MethodMetrics + writePassthrough instrument.MethodMetrics + writeForwarded instrument.MethodMetrics +} + +func newM3msgClientMetrics( + scope tally.Scope, + opts instrument.TimerOptions, +) m3msgClientMetrics { + return m3msgClientMetrics{ + writeUntimedCounter: instrument.NewMethodMetrics(scope, "writeUntimedCounter", opts), + writeUntimedBatchTimer: instrument.NewMethodMetrics(scope, "writeUntimedBatchTimer", opts), + writeUntimedGauge: instrument.NewMethodMetrics(scope, "writeUntimedGauge", opts), + writePassthrough: instrument.NewMethodMetrics(scope, "writePassthrough", opts), + writeForwarded: instrument.NewMethodMetrics(scope, "writeForwarded", opts), + } +} + +type messagePool struct { + pool sync.Pool +} + +func newMessagePool() *messagePool { + p := &messagePool{} + p.pool.New = func() interface{} { + return newMessage(p) + } + return p +} + +func (m *messagePool) Get() *message { + return m.pool.Get().(*message) +} + +func (m *messagePool) Put(msg *message) { + m.pool.Put(msg) +} + +// Ensure message implements m3msg producer message interface. +var _ producer.Message = (*message)(nil) + +type message struct { + pool *messagePool + shard uint32 + + metric metricpb.MetricWithMetadatas + cm metricpb.CounterWithMetadatas + bm metricpb.BatchTimerWithMetadatas + gm metricpb.GaugeWithMetadatas + fm metricpb.ForwardedMetricWithMetadata + tm metricpb.TimedMetricWithMetadata + tms metricpb.TimedMetricWithMetadatas + + buf []byte +} + +func newMessage(pool *messagePool) *message { + return &message{ + pool: pool, + } +} + +// Encode encodes a m3msg payload +//nolint:gocyclo,gocritic +func (m *message) Encode( + shard uint32, + payload payloadUnion, +) error { + m.shard = shard + + switch payload.payloadType { + case untimedType: + switch payload.untimed.metric.Type { + case metric.CounterType: + value := unaggregated.CounterWithMetadatas{ + Counter: payload.untimed.metric.Counter(), + StagedMetadatas: payload.untimed.metadatas, + } + if err := value.ToProto(&m.cm); err != nil { + return err + } + + m.metric = metricpb.MetricWithMetadatas{ + Type: metricpb.MetricWithMetadatas_COUNTER_WITH_METADATAS, + CounterWithMetadatas: &m.cm, + } + case metric.TimerType: + value := unaggregated.BatchTimerWithMetadatas{ + BatchTimer: payload.untimed.metric.BatchTimer(), + StagedMetadatas: payload.untimed.metadatas, + } + if err := value.ToProto(&m.bm); err != nil { + return err + } + + m.metric = metricpb.MetricWithMetadatas{ + Type: metricpb.MetricWithMetadatas_BATCH_TIMER_WITH_METADATAS, + BatchTimerWithMetadatas: &m.bm, + } + case metric.GaugeType: + value := unaggregated.GaugeWithMetadatas{ + Gauge: payload.untimed.metric.Gauge(), + StagedMetadatas: payload.untimed.metadatas, + } + if err := value.ToProto(&m.gm); err != nil { + return err + } + + m.metric = metricpb.MetricWithMetadatas{ + Type: metricpb.MetricWithMetadatas_GAUGE_WITH_METADATAS, + GaugeWithMetadatas: &m.gm, + } + default: + return fmt.Errorf("unrecognized metric type: %v", + payload.untimed.metric.Type) + } + case forwardedType: + value := aggregated.ForwardedMetricWithMetadata{ + ForwardedMetric: payload.forwarded.metric, + ForwardMetadata: payload.forwarded.metadata, + } + if err := value.ToProto(&m.fm); err != nil { + return err + } + + m.metric = metricpb.MetricWithMetadatas{ + Type: metricpb.MetricWithMetadatas_FORWARDED_METRIC_WITH_METADATA, + ForwardedMetricWithMetadata: &m.fm, + } + case timedType: + value := aggregated.TimedMetricWithMetadata{ + Metric: payload.timed.metric, + TimedMetadata: payload.timed.metadata, + } + if err := value.ToProto(&m.tm); err != nil { + return err + } + + m.metric = metricpb.MetricWithMetadatas{ + Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATA, + TimedMetricWithMetadata: &m.tm, + } + case timedWithStagedMetadatasType: + value := aggregated.TimedMetricWithMetadatas{ + Metric: payload.timedWithStagedMetadatas.metric, + StagedMetadatas: payload.timedWithStagedMetadatas.metadatas, + } + if err := value.ToProto(&m.tms); err != nil { + return err + } + + m.metric = metricpb.MetricWithMetadatas{ + Type: metricpb.MetricWithMetadatas_TIMED_METRIC_WITH_METADATAS, + TimedMetricWithMetadatas: &m.tms, + } + default: + return fmt.Errorf("unrecognized payload type: %v", + payload.payloadType) + } + + size := m.metric.Size() + if size > cap(m.buf) { + const growthFactor = 2 + m.buf = make([]byte, int(growthFactor*float64(size))) + } + + // Resize buffer to exactly how long we need for marshaling. + m.buf = m.buf[:size] + + _, err := m.metric.MarshalTo(m.buf) + return err +} + +func (m *message) Shard() uint32 { + return m.shard +} + +func (m *message) Bytes() []byte { + return m.buf +} + +func (m *message) Size() int { + return len(m.buf) +} + +func (m *message) Finalize(reason producer.FinalizeReason) { + // Return to pool. + m.pool.Put(m) +} diff --git a/src/aggregator/client/m3msg_client_test.go b/src/aggregator/client/m3msg_client_test.go new file mode 100644 index 0000000000..9c76c8e84f --- /dev/null +++ b/src/aggregator/client/m3msg_client_test.go @@ -0,0 +1,44 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package client + +import ( + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + + "github.com/m3db/m3/src/msg/producer" +) + +func TestNewM3MsgClient(t *testing.T) { + ctrl := gomock.NewController(t) + p := producer.NewMockProducer(ctrl) + p.EXPECT().Init() + p.EXPECT().NumShards().Return(uint32(1)) + + opts := NewM3MsgOptions(). + SetProducer(p) + + c, err := NewM3MsgClient(NewOptions().SetM3MsgOptions(opts)) + assert.NotNil(t, c) + assert.NoError(t, err) +} diff --git a/src/aggregator/client/options.go b/src/aggregator/client/options.go index 73d91d9ca1..6e65e57436 100644 --- a/src/aggregator/client/options.go +++ b/src/aggregator/client/options.go @@ -37,12 +37,14 @@ import ( type AggregatorClientType int const ( - // LegacyAggregatorClient is the legacy aggregator client type and uses it's own - // TCP negotation, load balancing and data transmission protocol. + // LegacyAggregatorClient is an alias for TCPAggregatorClient LegacyAggregatorClient AggregatorClientType = iota // M3MsgAggregatorClient is the M3Msg aggregator client type that uses M3Msg to // handle publishing to a M3Msg topic the aggregator consumes from. M3MsgAggregatorClient + // TCPAggregatorClient is the TCP aggregator client type and uses it's own + // TCP negotiation, load balancing and data transmission protocol. + TCPAggregatorClient defaultAggregatorClient = LegacyAggregatorClient @@ -79,11 +81,12 @@ var ( validAggregatorClientTypes = []AggregatorClientType{ LegacyAggregatorClient, M3MsgAggregatorClient, + TCPAggregatorClient, } - errLegacyClientNoWatcherOptions = errors.New("legacy client: no watcher options set") - errM3MsgClientNoOptions = errors.New("m3msg aggregator client: no m3msg options set") - errNoRWOpts = errors.New("no rw opts set for aggregator") + errTCPClientNoWatcherOptions = errors.New("legacy client: no watcher options set") + errM3MsgClientNoOptions = errors.New("m3msg aggregator client: no m3msg options set") + errNoRWOpts = errors.New("no rw opts set for aggregator") ) func (t AggregatorClientType) String() string { @@ -92,6 +95,8 @@ func (t AggregatorClientType) String() string { return "legacy" case M3MsgAggregatorClient: return "m3msg" + case TCPAggregatorClient: + return "tcp" } return "unknown" } @@ -279,8 +284,10 @@ func (o *options) Validate() error { } return opts.Validate() case LegacyAggregatorClient: + fallthrough // intentional, LegacyAggregatorClient is an alias + case TCPAggregatorClient: if o.watcherOpts == nil { - return errLegacyClientNoWatcherOptions + return errTCPClientNoWatcherOptions } return nil default: diff --git a/src/aggregator/client/tcp_client.go b/src/aggregator/client/tcp_client.go new file mode 100644 index 0000000000..18d2fb2c00 --- /dev/null +++ b/src/aggregator/client/tcp_client.go @@ -0,0 +1,330 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package client + +import ( + "fmt" + "math" + "time" + + "github.com/uber-go/tally" + + "github.com/m3db/m3/src/aggregator/sharding" + "github.com/m3db/m3/src/cluster/placement" + "github.com/m3db/m3/src/cluster/shard" + "github.com/m3db/m3/src/metrics/metadata" + "github.com/m3db/m3/src/metrics/metric/aggregated" + "github.com/m3db/m3/src/metrics/metric/id" + "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/metrics/policy" + "github.com/m3db/m3/src/x/clock" + xerrors "github.com/m3db/m3/src/x/errors" +) + +var _ AdminClient = (*TCPClient)(nil) + +// TCPClient sends metrics to M3 Aggregator via over custom TCP protocol. +type TCPClient struct { + nowFn clock.NowFn + shardCutoverWarmupDuration time.Duration + shardCutoffLingerDuration time.Duration + writerMgr instanceWriterManager + shardFn sharding.ShardFn + placementWatcher placement.StagedPlacementWatcher + metrics tcpClientMetrics +} + +// NewTCPClient returns new Protobuf over TCP M3 Aggregator client. +func NewTCPClient(opts Options) (*TCPClient, error) { + if err := opts.Validate(); err != nil { + return nil, err + } + + var ( + instrumentOpts = opts.InstrumentOptions() + writerMgr instanceWriterManager + placementWatcher placement.StagedPlacementWatcher + ) + + writerMgrScope := instrumentOpts.MetricsScope().SubScope("writer-manager") + writerMgrOpts := opts.SetInstrumentOptions(instrumentOpts.SetMetricsScope(writerMgrScope)) + writerMgr = newInstanceWriterManager(writerMgrOpts) + + onPlacementsAddedFn := func(placements []placement.Placement) { + for _, placement := range placements { + writerMgr.AddInstances(placement.Instances()) // nolint: errcheck + } + } + + onPlacementsRemovedFn := func(placements []placement.Placement) { + for _, placement := range placements { + writerMgr.RemoveInstances(placement.Instances()) // nolint: errcheck + } + } + + activeStagedPlacementOpts := placement.NewActiveStagedPlacementOptions(). + SetClockOptions(opts.ClockOptions()). + SetOnPlacementsAddedFn(onPlacementsAddedFn). + SetOnPlacementsRemovedFn(onPlacementsRemovedFn) + placementWatcher = placement.NewStagedPlacementWatcher(opts.StagedPlacementWatcherOptions(). + SetActiveStagedPlacementOptions(activeStagedPlacementOpts)) + + return &TCPClient{ + nowFn: opts.ClockOptions().NowFn(), + shardCutoverWarmupDuration: opts.ShardCutoverWarmupDuration(), + shardCutoffLingerDuration: opts.ShardCutoffLingerDuration(), + writerMgr: writerMgr, + shardFn: opts.ShardFn(), + placementWatcher: placementWatcher, + metrics: newTCPClientMetrics(instrumentOpts.MetricsScope()), + }, nil +} + +// Init initializes TCPClient. +func (c *TCPClient) Init() error { + return c.placementWatcher.Watch() +} + +// WriteUntimedCounter writes untimed counter metrics. +func (c *TCPClient) WriteUntimedCounter( + counter unaggregated.Counter, + metadatas metadata.StagedMetadatas, +) error { + payload := payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: counter.ToUnion(), + metadatas: metadatas, + }, + } + + c.metrics.writeUntimedCounter.Inc(1) + return c.write(counter.ID, c.nowFn().UnixNano(), payload) +} + +// WriteUntimedBatchTimer writes untimed batch timer metrics. +func (c *TCPClient) WriteUntimedBatchTimer( + batchTimer unaggregated.BatchTimer, + metadatas metadata.StagedMetadatas, +) error { + payload := payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: batchTimer.ToUnion(), + metadatas: metadatas, + }, + } + + c.metrics.writeUntimedBatchTimer.Inc(1) + return c.write(batchTimer.ID, c.nowFn().UnixNano(), payload) +} + +// WriteUntimedGauge writes untimed gauge metrics. +func (c *TCPClient) WriteUntimedGauge( + gauge unaggregated.Gauge, + metadatas metadata.StagedMetadatas, +) error { + payload := payloadUnion{ + payloadType: untimedType, + untimed: untimedPayload{ + metric: gauge.ToUnion(), + metadatas: metadatas, + }, + } + + c.metrics.writeUntimedGauge.Inc(1) + return c.write(gauge.ID, c.nowFn().UnixNano(), payload) +} + +// WriteTimed writes timed metrics. +func (c *TCPClient) WriteTimed( + metric aggregated.Metric, + metadata metadata.TimedMetadata, +) error { + payload := payloadUnion{ + payloadType: timedType, + timed: timedPayload{ + metric: metric, + metadata: metadata, + }, + } + + c.metrics.writeForwarded.Inc(1) + return c.write(metric.ID, metric.TimeNanos, payload) +} + +// WritePassthrough writes passthrough metrics. +func (c *TCPClient) WritePassthrough( + metric aggregated.Metric, + storagePolicy policy.StoragePolicy, +) error { + payload := payloadUnion{ + payloadType: passthroughType, + passthrough: passthroughPayload{ + metric: metric, + storagePolicy: storagePolicy, + }, + } + + c.metrics.writePassthrough.Inc(1) + return c.write(metric.ID, metric.TimeNanos, payload) +} + +// WriteTimedWithStagedMetadatas writes timed metrics with staged metadatas. +func (c *TCPClient) WriteTimedWithStagedMetadatas( + metric aggregated.Metric, + metadatas metadata.StagedMetadatas, +) error { + payload := payloadUnion{ + payloadType: timedWithStagedMetadatasType, + timedWithStagedMetadatas: timedWithStagedMetadatas{ + metric: metric, + metadatas: metadatas, + }, + } + + c.metrics.writeForwarded.Inc(1) + return c.write(metric.ID, metric.TimeNanos, payload) +} + +// WriteForwarded writes forwarded metrics. +func (c *TCPClient) WriteForwarded( + metric aggregated.ForwardedMetric, + metadata metadata.ForwardMetadata, +) error { + payload := payloadUnion{ + payloadType: forwardedType, + forwarded: forwardedPayload{ + metric: metric, + metadata: metadata, + }, + } + + c.metrics.writeForwarded.Inc(1) + return c.write(metric.ID, metric.TimeNanos, payload) +} + +// Flush flushes any remaining data buffered by the client. +func (c *TCPClient) Flush() error { + c.metrics.flush.Inc(1) + return c.writerMgr.Flush() +} + +// Close closes the client. +func (c *TCPClient) Close() error { + c.placementWatcher.Unwatch() // nolint: errcheck + // writerMgr errors out if trying to close twice + return c.writerMgr.Close() +} + +//nolint:gocritic +func (c *TCPClient) write( + metricID id.RawID, + timeNanos int64, + payload payloadUnion, +) error { + stagedPlacement, onStagedPlacementDoneFn, err := c.placementWatcher.ActiveStagedPlacement() + if err != nil { + return err + } + placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement() + if err != nil { + onStagedPlacementDoneFn() + return err + } + var ( + shardID = c.shardFn(metricID, uint32(placement.NumShards())) + instances = placement.InstancesForShard(shardID) + multiErr = xerrors.NewMultiError() + ) + for _, instance := range instances { + // NB(xichen): the shard should technically always be found because the instances + // are computed from the placement, but protect against errors here regardless. + shard, ok := instance.Shards().Shard(shardID) + if !ok { + err = fmt.Errorf("instance %s does not own shard %d", instance.ID(), shardID) + multiErr = multiErr.Add(err) + c.metrics.shardNotOwned.Inc(1) + continue + } + if !c.shouldWriteForShard(timeNanos, shard) { + c.metrics.shardNotWriteable.Inc(1) + continue + } + if err = c.writerMgr.Write(instance, shardID, payload); err != nil { + multiErr = multiErr.Add(err) + } + } + + onPlacementDoneFn() + onStagedPlacementDoneFn() + return multiErr.FinalError() +} + +func (c *TCPClient) shouldWriteForShard(nowNanos int64, shard shard.Shard) bool { + writeEarliestNanos, writeLatestNanos := c.writeTimeRangeFor(shard) + return nowNanos >= writeEarliestNanos && nowNanos <= writeLatestNanos +} + +// writeTimeRangeFor returns the time range for writes going to a given shard. +func (c *TCPClient) writeTimeRangeFor(shard shard.Shard) (int64, int64) { + var ( + cutoverNanos = shard.CutoverNanos() + cutoffNanos = shard.CutoffNanos() + earliestNanos = int64(0) + latestNanos = int64(math.MaxInt64) + ) + + if cutoverNanos >= int64(c.shardCutoverWarmupDuration) { + earliestNanos = cutoverNanos - int64(c.shardCutoverWarmupDuration) + } + + if cutoffNanos <= math.MaxInt64-int64(c.shardCutoffLingerDuration) { + latestNanos = cutoffNanos + int64(c.shardCutoffLingerDuration) + } + return earliestNanos, latestNanos +} + +type tcpClientMetrics struct { + writeUntimedCounter tally.Counter + writeUntimedBatchTimer tally.Counter + writeUntimedGauge tally.Counter + writePassthrough tally.Counter + writeForwarded tally.Counter + flush tally.Counter + shardNotOwned tally.Counter + shardNotWriteable tally.Counter +} + +func newTCPClientMetrics( + scope tally.Scope, +) tcpClientMetrics { + return tcpClientMetrics{ + writeUntimedCounter: scope.Counter("writeUntimedCounter"), + writeUntimedBatchTimer: scope.Counter("writeUntimedBatchTimer"), + writeUntimedGauge: scope.Counter("writeUntimedGauge"), + writePassthrough: scope.Counter("writePassthrough"), + writeForwarded: scope.Counter("writeForwarded"), + flush: scope.Counter("flush"), + shardNotOwned: scope.Counter("shard-not-owned"), + shardNotWriteable: scope.Counter("shard-not-writeable"), + } +} diff --git a/src/aggregator/client/client_test.go b/src/aggregator/client/tcp_client_test.go similarity index 86% rename from src/aggregator/client/client_test.go rename to src/aggregator/client/tcp_client_test.go index e486742635..1b4327b003 100644 --- a/src/aggregator/client/client_test.go +++ b/src/aggregator/client/tcp_client_test.go @@ -18,6 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//nolint:dupl,exhaustive package client import ( @@ -27,6 +28,10 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/cluster/kv/mem" "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/metrics/aggregation" @@ -40,9 +45,6 @@ import ( "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/require" ) var ( @@ -195,51 +197,9 @@ var ( SetInstances(testPlacementInstances) ) -func mustNewTestClient(t *testing.T, opts Options) *client { - c, err := NewClient(opts) - require.NoError(t, err) - value, ok := c.(*client) - require.True(t, ok) - return value -} - -func TestClientInitUninitializedOrClosed(t *testing.T) { - c := mustNewTestClient(t, testOptions()) - - c.state = clientInitialized - require.Equal(t, errClientIsInitializedOrClosed, c.Init()) - - c.state = clientClosed - require.Equal(t, errClientIsInitializedOrClosed, c.Init()) -} - -func TestClientInitWatcherWatchError(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - errTestWatcherWatch := errors.New("error watching") - watcher := placement.NewMockStagedPlacementWatcher(ctrl) - watcher.EXPECT().Watch().Return(errTestWatcherWatch) - c := mustNewTestClient(t, testOptions()) - c.placementWatcher = watcher - require.Equal(t, errTestWatcherWatch, c.Init()) -} - -func TestClientInitSuccess(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - watcher := placement.NewMockStagedPlacementWatcher(ctrl) - watcher.EXPECT().Watch().Return(nil) - c := mustNewTestClient(t, testOptions()) - c.placementWatcher = watcher - require.NoError(t, c.Init()) - require.Equal(t, clientInitialized, c.state) -} - -func TestClientWriteUntimedMetricClosed(t *testing.T) { - c := mustNewTestClient(t, testOptions()) - c.state = clientUninitialized +func TestTCPClientWriteUntimedMetricClosed(t *testing.T) { + c := mustNewTestTCPClient(t, testOptions()) + require.NoError(t, c.Close()) for _, input := range []unaggregated.MetricUnion{testCounter, testBatchTimer, testGauge} { var err error switch input.Type { @@ -250,19 +210,20 @@ func TestClientWriteUntimedMetricClosed(t *testing.T) { case metric.GaugeType: err = c.WriteUntimedGauge(input.Gauge(), testStagedMetadatas) } - require.Equal(t, errClientIsUninitializedOrClosed, err) + require.Error(t, err) } } -func TestClientWriteUntimedMetricActiveStagedPlacementError(t *testing.T) { +func TestTCPClientWriteUntimedMetricActiveStagedPlacementError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() errActiveStagedPlacementError := errors.New("error active staged placement") watcher := placement.NewMockStagedPlacementWatcher(ctrl) - watcher.EXPECT().ActiveStagedPlacement().Return(nil, nil, errActiveStagedPlacementError).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + watcher.EXPECT().ActiveStagedPlacement(). + Return(nil, nil, errActiveStagedPlacementError). + MinTimes(1) + c := mustNewTestTCPClient(t, testOptions()) c.placementWatcher = watcher for _, input := range []unaggregated.MetricUnion{testCounter, testBatchTimer, testGauge} { @@ -279,7 +240,7 @@ func TestClientWriteUntimedMetricActiveStagedPlacementError(t *testing.T) { } } -func TestClientWriteUntimedMetricActivePlacementError(t *testing.T) { +func TestTCPClientWriteUntimedMetricActivePlacementError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -288,8 +249,7 @@ func TestClientWriteUntimedMetricActivePlacementError(t *testing.T) { stagedPlacement.EXPECT().ActivePlacement().Return(nil, nil, errActivePlacementError).MinTimes(1) watcher := placement.NewMockStagedPlacementWatcher(ctrl) watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + c := mustNewTestTCPClient(t, testOptions()) c.placementWatcher = watcher for _, input := range []unaggregated.MetricUnion{testCounter, testBatchTimer, testGauge} { @@ -306,7 +266,7 @@ func TestClientWriteUntimedMetricActivePlacementError(t *testing.T) { } } -func TestClientWriteUntimedMetricSuccess(t *testing.T) { +func TestTCPClientWriteUntimedMetricSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -333,8 +293,7 @@ func TestClientWriteUntimedMetricSuccess(t *testing.T) { stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) watcher := placement.NewMockStagedPlacementWatcher(ctrl) watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + c := mustNewTestTCPClient(t, testOptions()) c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) } c.writerMgr = writerMgr c.placementWatcher = watcher @@ -368,7 +327,7 @@ func TestClientWriteUntimedMetricSuccess(t *testing.T) { } } -func TestClientWriteUntimedMetricPartialError(t *testing.T) { +func TestTCPClientWriteUntimedMetricPartialError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -399,8 +358,7 @@ func TestClientWriteUntimedMetricPartialError(t *testing.T) { stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) watcher := placement.NewMockStagedPlacementWatcher(ctrl) watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + c := mustNewTestTCPClient(t, testOptions()) c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) } c.writerMgr = writerMgr c.placementWatcher = watcher @@ -418,7 +376,7 @@ func TestClientWriteUntimedMetricPartialError(t *testing.T) { require.Equal(t, testStagedMetadatas, payloadRes.untimed.metadatas) } -func TestClientWriteUntimedMetricBeforeShardCutover(t *testing.T) { +func TestTCPClientWriteUntimedMetricBeforeShardCutover(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -427,9 +385,8 @@ func TestClientWriteUntimedMetricBeforeShardCutover(t *testing.T) { stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) watcher := placement.NewMockStagedPlacementWatcher(ctrl) watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) + c := mustNewTestTCPClient(t, testOptions()) c.shardCutoverWarmupDuration = time.Second - c.state = clientInitialized c.nowFn = func() time.Time { return time.Unix(0, testCutoverNanos-1).Add(-time.Second) } c.writerMgr = nil c.placementWatcher = watcher @@ -439,7 +396,7 @@ func TestClientWriteUntimedMetricBeforeShardCutover(t *testing.T) { require.Nil(t, instancesRes) } -func TestClientWriteUntimedMetricAfterShardCutoff(t *testing.T) { +func TestTCPClientWriteUntimedMetricAfterShardCutoff(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -448,9 +405,8 @@ func TestClientWriteUntimedMetricAfterShardCutoff(t *testing.T) { stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) watcher := placement.NewMockStagedPlacementWatcher(ctrl) watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) + c := mustNewTestTCPClient(t, testOptions()) c.shardCutoffLingerDuration = time.Second - c.state = clientInitialized c.nowFn = func() time.Time { return time.Unix(0, testCutoffNanos+1).Add(time.Second) } c.writerMgr = nil c.placementWatcher = watcher @@ -460,7 +416,7 @@ func TestClientWriteUntimedMetricAfterShardCutoff(t *testing.T) { require.Nil(t, instancesRes) } -func TestClientWriteTimedMetricSuccess(t *testing.T) { +func TestTCPClientWriteTimedMetricSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -487,8 +443,7 @@ func TestClientWriteTimedMetricSuccess(t *testing.T) { stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) watcher := placement.NewMockStagedPlacementWatcher(ctrl) watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + c := mustNewTestTCPClient(t, testOptions()) c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) } c.writerMgr = writerMgr c.placementWatcher = watcher @@ -508,7 +463,7 @@ func TestClientWriteTimedMetricSuccess(t *testing.T) { require.Equal(t, testTimedMetadata, payloadRes.timed.metadata) } -func TestClientWriteTimedMetricPartialError(t *testing.T) { +func TestTCPClientWriteTimedMetricPartialError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -539,8 +494,7 @@ func TestClientWriteTimedMetricPartialError(t *testing.T) { stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) watcher := placement.NewMockStagedPlacementWatcher(ctrl) watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + c := mustNewTestTCPClient(t, testOptions()) c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) } c.writerMgr = writerMgr c.placementWatcher = watcher @@ -560,7 +514,7 @@ func TestClientWriteTimedMetricPartialError(t *testing.T) { require.Equal(t, testTimedMetadata, payloadRes.timed.metadata) } -func TestClientWriteForwardedMetricSuccess(t *testing.T) { +func TestTCPClientWriteForwardedMetricSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -587,8 +541,7 @@ func TestClientWriteForwardedMetricSuccess(t *testing.T) { stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) watcher := placement.NewMockStagedPlacementWatcher(ctrl) watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + c := mustNewTestTCPClient(t, testOptions()) c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) } c.writerMgr = writerMgr c.placementWatcher = watcher @@ -608,7 +561,7 @@ func TestClientWriteForwardedMetricSuccess(t *testing.T) { require.Equal(t, testForwardMetadata, payloadRes.forwarded.metadata) } -func TestClientWriteForwardedMetricPartialError(t *testing.T) { +func TestTCPClientWriteForwardedMetricPartialError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -639,8 +592,7 @@ func TestClientWriteForwardedMetricPartialError(t *testing.T) { stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) watcher := placement.NewMockStagedPlacementWatcher(ctrl) watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + c := mustNewTestTCPClient(t, testOptions()) c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) } c.writerMgr = writerMgr c.placementWatcher = watcher @@ -660,7 +612,7 @@ func TestClientWriteForwardedMetricPartialError(t *testing.T) { require.Equal(t, testForwardMetadata, payloadRes.forwarded.metadata) } -func TestClientWritePassthroughMetricSuccess(t *testing.T) { +func TestTCPClientWritePassthroughMetricSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -687,8 +639,7 @@ func TestClientWritePassthroughMetricSuccess(t *testing.T) { stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) watcher := placement.NewMockStagedPlacementWatcher(ctrl) watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + c := mustNewTestTCPClient(t, testOptions()) c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) } c.writerMgr = writerMgr c.placementWatcher = watcher @@ -708,7 +659,7 @@ func TestClientWritePassthroughMetricSuccess(t *testing.T) { require.Equal(t, testPassthroughMetadata, payloadRes.passthrough.storagePolicy) } -func TestClientWritePassthroughMetricPartialError(t *testing.T) { +func TestTCPClientWritePassthroughMetricPartialError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -739,8 +690,7 @@ func TestClientWritePassthroughMetricPartialError(t *testing.T) { stagedPlacement.EXPECT().ActivePlacement().Return(testPlacement, func() {}, nil).MinTimes(1) watcher := placement.NewMockStagedPlacementWatcher(ctrl) watcher.EXPECT().ActiveStagedPlacement().Return(stagedPlacement, func() {}, nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + c := mustNewTestTCPClient(t, testOptions()) c.nowFn = func() time.Time { return time.Unix(0, testNowNanos) } c.writerMgr = writerMgr c.placementWatcher = watcher @@ -760,55 +710,49 @@ func TestClientWritePassthroughMetricPartialError(t *testing.T) { require.Equal(t, testPassthroughMetadata, payloadRes.passthrough.storagePolicy) } -func TestClientFlushClosed(t *testing.T) { - c := mustNewTestClient(t, testOptions()) - c.state = clientClosed - require.Equal(t, errClientIsUninitializedOrClosed, c.Flush()) +func TestTCPClientFlushClosed(t *testing.T) { + c := mustNewTestTCPClient(t, testOptions()) + require.NoError(t, c.Close()) + require.Equal(t, errInstanceWriterManagerClosed, c.Flush()) } -func TestClientFlushError(t *testing.T) { +func TestTCPClientFlushError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() errTestFlush := errors.New("test flush error") writerMgr := NewMockinstanceWriterManager(ctrl) writerMgr.EXPECT().Flush().Return(errTestFlush).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + c := mustNewTestTCPClient(t, testOptions()) c.writerMgr = writerMgr require.Equal(t, errTestFlush, c.Flush()) } -func TestClientFlushSuccess(t *testing.T) { +func TestTCPClientFlushSuccess(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() writerMgr := NewMockinstanceWriterManager(ctrl) writerMgr.EXPECT().Flush().Return(nil).MinTimes(1) - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized + c := mustNewTestTCPClient(t, testOptions()) c.writerMgr = writerMgr require.NoError(t, c.Flush()) } -func TestClientCloseUninitializedOrClosed(t *testing.T) { - c := mustNewTestClient(t, testOptions()) - - c.state = clientUninitialized - require.Equal(t, errClientIsUninitializedOrClosed, c.Close()) +func TestTCPClientClosed(t *testing.T) { + c := mustNewTestTCPClient(t, testOptions()) - c.state = clientClosed - require.Equal(t, errClientIsUninitializedOrClosed, c.Close()) + require.NoError(t, c.Close()) + require.Equal(t, errInstanceWriterManagerClosed, c.Close()) } -func TestClientCloseSuccess(t *testing.T) { - c := mustNewTestClient(t, testOptions()) - c.state = clientInitialized +func TestTCPClientCloseSuccess(t *testing.T) { + c := mustNewTestTCPClient(t, testOptions()) require.NoError(t, c.Close()) } -func TestClientWriteTimeRangeFor(t *testing.T) { - c := mustNewTestClient(t, testOptions()) +func TestTCPClientWriteTimeRangeFor(t *testing.T) { + c := mustNewTestTCPClient(t, testOptions()) testShard := shard.NewShard(0).SetState(shard.Initializing) for _, input := range []struct { cutoverNanos int64 @@ -842,7 +786,41 @@ func TestClientWriteTimeRangeFor(t *testing.T) { } } +func TestTCPClientInitAndClose(t *testing.T) { + c := mustNewTestTCPClient(t, testOptions()) + require.NoError(t, c.Init()) + require.NoError(t, c.Close()) +} + +func mustNewTestTCPClient(t *testing.T, opts Options) *TCPClient { + c, err := NewClient(opts) + require.NoError(t, err) + value, ok := c.(*TCPClient) + require.True(t, ok) + return value +} + +// TODO: clean this up as it's in use by other test files func testOptions() Options { + return testTCPClientOptions() +} + +func testTCPClientOptions() Options { + const placementKey = "placement" + pl, err := placement.NewPlacement().Proto() + if err != nil { + panic(err.Error()) + } + + store := mem.NewStore() + if _, err := store.Set(placementKey, pl); err != nil { + panic(err.Error()) + } + + plOpts := placement.NewStagedPlacementWatcherOptions(). + SetStagedPlacementStore(store). + SetStagedPlacementKey(placementKey). + SetInitWatchTimeout(time.Nanosecond) return NewOptions(). SetClockOptions(clock.NewOptions()). SetConnectionOptions(testConnectionOptions()). @@ -851,5 +829,7 @@ func testOptions() Options { SetInstanceQueueSize(10). SetMaxTimerBatchSize(140). SetShardCutoverWarmupDuration(time.Minute). - SetShardCutoffLingerDuration(10 * time.Minute) + SetShardCutoffLingerDuration(10 * time.Minute). + SetAggregatorClientType(TCPAggregatorClient). + SetStagedPlacementWatcherOptions(plOpts) }