diff --git a/go.mod b/go.mod index 8e46e640e9..ba2642757f 100644 --- a/go.mod +++ b/go.mod @@ -119,6 +119,7 @@ require ( go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b go.uber.org/atomic v1.6.0 go.uber.org/config v1.4.0 + go.uber.org/goleak v1.1.10 go.uber.org/zap v1.13.0 golang.org/x/net v0.0.0-20200822124328-c89045814202 golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 diff --git a/go.sum b/go.sum index 2971417542..7e5b6d654e 100644 --- a/go.sum +++ b/go.sum @@ -922,6 +922,8 @@ go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/config v1.4.0 h1:upnMPpMm6WlbZtXoasNkK4f0FhxwS+W4Iqz5oNznehQ= go.uber.org/config v1.4.0/go.mod h1:aCyrMHmUAc/s2h9sv1koP84M9ZF/4K+g2oleyESO/Ig= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.4.0 h1:f3WCSC2KzAcBXGATIxAB1E2XuCpNU255wNKZ505qi3E= @@ -1069,6 +1071,7 @@ golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191104232314-dc038396d1f0/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191203134012-c197fd4bf371/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/src/cluster/kv/etcd/store_test.go b/src/cluster/kv/etcd/store_test.go index 18a7fc44c7..531f9f3fb9 100644 --- a/src/cluster/kv/etcd/store_test.go +++ b/src/cluster/kv/etcd/store_test.go @@ -35,12 +35,13 @@ import ( "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cluster/mocks" xclock "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/retry" - "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/integration" "github.com/coreos/pkg/capnslog" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/integration" "golang.org/x/net/context" ) @@ -89,6 +90,8 @@ func TestGetAndSet(t *testing.T) { } func TestNoCache(t *testing.T) { + t.Parallel() + ec, opts, closeFn := testStore(t) store, err := NewStore(ec, ec, opts) @@ -155,6 +158,8 @@ func TestCacheDirCreation(t *testing.T) { } func TestCache(t *testing.T) { + t.Parallel() + ec, opts, closeFn := testStore(t) f, err := ioutil.TempFile("", "") @@ -207,6 +212,8 @@ func TestCache(t *testing.T) { } func TestSetIfNotExist(t *testing.T) { + t.Parallel() + ec, opts, closeFn := testStore(t) defer closeFn() @@ -226,6 +233,8 @@ func TestSetIfNotExist(t *testing.T) { } func TestCheckAndSet(t *testing.T) { + t.Parallel() + ec, opts, closeFn := testStore(t) defer closeFn() @@ -252,6 +261,8 @@ func TestCheckAndSet(t *testing.T) { } func TestWatchClose(t *testing.T) { + t.Parallel() + ec, opts, closeFn := testStore(t) defer closeFn() @@ -280,6 +291,7 @@ func TestWatchClose(t *testing.T) { if !ok { break } + time.Sleep(1 * time.Millisecond) } // getting a new watch will create a new watchale and thread to watch for updates @@ -300,6 +312,8 @@ func TestWatchClose(t *testing.T) { } func TestWatchLastVersion(t *testing.T) { + t.Parallel() + ec, opts, closeFn := testStore(t) defer closeFn() @@ -311,7 +325,7 @@ func TestWatchLastVersion(t *testing.T) { require.Nil(t, w.Get()) var errs int32 - lastVersion := 100 + lastVersion := 50 go func() { for i := 1; i <= lastVersion; i++ { _, err := store.Set("foo", genProto(fmt.Sprintf("bar%d", i))) @@ -334,6 +348,8 @@ func TestWatchLastVersion(t *testing.T) { } func TestWatchFromExist(t *testing.T) { + t.Parallel() + ec, opts, closeFn := testStore(t) defer closeFn() @@ -371,6 +387,8 @@ func TestWatchFromExist(t *testing.T) { } func TestWatchFromNotExist(t *testing.T) { + t.Parallel() + ec, opts, closeFn := testStore(t) defer closeFn() @@ -414,6 +432,8 @@ func TestGetFromKvNotFound(t *testing.T) { } func TestMultipleWatchesFromExist(t *testing.T) { + t.Parallel() + ec, opts, closeFn := testStore(t) defer closeFn() @@ -464,6 +484,8 @@ func TestMultipleWatchesFromExist(t *testing.T) { } func TestMultipleWatchesFromNotExist(t *testing.T) { + t.Parallel() + ec, opts, closeFn := testStore(t) defer closeFn() @@ -506,6 +528,8 @@ func TestMultipleWatchesFromNotExist(t *testing.T) { } func TestWatchNonBlocking(t *testing.T) { + t.Parallel() + ec, opts, closeFn := testStore(t) defer closeFn() @@ -1093,7 +1117,7 @@ func TestWatchWithStartRevision(t *testing.T) { store, err := NewStore(ec, ec, opts) require.NoError(t, err) - for i := 1; i <= 100; i++ { + for i := 1; i <= 50; i++ { _, err = store.Set("foo", genProto(fmt.Sprintf("bar-%d", i))) require.NoError(t, err) } @@ -1110,7 +1134,7 @@ func TestWatchWithStartRevision(t *testing.T) { w1, err := store.Watch("foo") require.NoError(t, err) <-w1.C() - verifyValue(t, w1.Get(), "bar-100", 100) + verifyValue(t, w1.Get(), "bar-50", 50) }) } @@ -1137,7 +1161,11 @@ func testStore(t *testing.T) (*clientv3.Client, Options, func()) { } opts := NewOptions(). - SetWatchChanCheckInterval(10 * time.Millisecond). + SetWatchChanCheckInterval(50 * time.Millisecond). + SetWatchChanResetInterval(150 * time.Millisecond). + SetWatchChanInitTimeout(150 * time.Millisecond). + SetRequestTimeout(100 * time.Millisecond). + SetRetryOptions(retry.NewOptions().SetMaxRetries(1).SetMaxBackoff(0)). SetPrefix("test") return ec, opts, closer diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 4b9f4aac64..ddcfff7d23 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -76,6 +76,8 @@ const ( ) func TestDownsamplerAggregationWithAutoMappingRulesFromNamespacesWatcher(t *testing.T) { + t.Parallel() + ctrl := xtest.NewController(t) defer ctrl.Finish() @@ -123,6 +125,8 @@ func TestDownsamplerAggregationWithAutoMappingRulesFromNamespacesWatcher(t *test } func TestDownsamplerAggregationToggleEnabled(t *testing.T) { + t.Parallel() + ctrl := xtest.NewController(t) defer ctrl.Finish() @@ -158,6 +162,8 @@ func TestDownsamplerAggregationToggleEnabled(t *testing.T) { } func TestDownsamplerAggregationWithRulesStore(t *testing.T) { + t.Parallel() + testDownsampler := newTestDownsampler(t, testDownsamplerOptions{}) rulesStore := testDownsampler.rulesStore @@ -207,6 +213,8 @@ func TestDownsamplerAggregationWithRulesStore(t *testing.T) { } func TestDownsamplerAggregationWithRulesConfigMappingRules(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ nameTag: "foo_metric", @@ -224,7 +232,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRules(t *testing.T) { Aggregations: []aggregation.Type{aggregation.Max}, StoragePolicies: []StoragePolicyConfiguration{ { - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -241,7 +249,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRules(t *testing.T) { values: []expectedValue{{value: 30}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -254,6 +262,8 @@ func TestDownsamplerAggregationWithRulesConfigMappingRules(t *testing.T) { } func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMappingRuleFromNamespacesWatcher(t *testing.T) { + t.Parallel() + ctrl := xtest.NewController(t) defer ctrl.Finish() @@ -263,7 +273,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMapp "app": "nginx_edge", }, timedSamples: []testGaugeMetricTimedSample{ - {value: 15}, {value: 10}, {value: 30}, {value: 5}, {value: 0}, + {value: 15}, {value: 10}, {value: 30}, {value: 5}, {value: 0, offset: 1 * time.Millisecond}, }, } testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ @@ -337,6 +347,8 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMapp } func TestDownsamplerAggregationWithRulesConfigMappingRulesReplaceAutoMappingRuleFromNamespacesWatcher(t *testing.T) { + t.Parallel() + ctrl := xtest.NewController(t) defer ctrl.Finish() @@ -401,6 +413,8 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesReplaceAutoMappingRule } func TestDownsamplerAggregationWithRulesConfigMappingRulesNoNameTag(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ "app": "nginx_edge", @@ -419,7 +433,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesNoNameTag(t *testing.T Aggregations: []aggregation.Type{aggregation.Max}, StoragePolicies: []StoragePolicyConfiguration{ { - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -436,7 +450,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesNoNameTag(t *testing.T values: []expectedValue{{value: 30}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -449,6 +463,8 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesNoNameTag(t *testing.T } func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilter(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ "app": "nginx_edge", @@ -467,7 +483,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilter(t *testing. Aggregations: []aggregation.Type{aggregation.Max}, StoragePolicies: []StoragePolicyConfiguration{ { - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -490,7 +506,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilter(t *testing. values: []expectedValue{{value: 30}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -503,6 +519,8 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilter(t *testing. } func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilterNoMatch(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ "app": "nginx_edge", @@ -521,7 +539,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilterNoMatch(t *t Aggregations: []aggregation.Type{aggregation.Max}, StoragePolicies: []StoragePolicyConfiguration{ { - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -544,6 +562,8 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilterNoMatch(t *t } func TestDownsamplerAggregationWithRulesConfigMappingRulesAggregationType(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ "__g0__": "nginx_edge", @@ -564,7 +584,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesAggregationType(t *tes Aggregations: []aggregation.Type{aggregation.Max}, StoragePolicies: []StoragePolicyConfiguration{ { - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -586,7 +606,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesAggregationType(t *tes values: []expectedValue{{value: 30}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -599,6 +619,8 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesAggregationType(t *tes } func TestDownsamplerAggregationWithRulesConfigMappingRulesMultipleAggregationType(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ "__g0__": "nginx_edge", @@ -618,7 +640,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesMultipleAggregationTyp Aggregations: []aggregation.Type{aggregation.Max}, StoragePolicies: []StoragePolicyConfiguration{ { - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -629,7 +651,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesMultipleAggregationTyp Aggregations: []aggregation.Type{aggregation.Sum}, StoragePolicies: []StoragePolicyConfiguration{ { - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -651,7 +673,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesMultipleAggregationTyp values: []expectedValue{{value: 30}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -664,7 +686,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesMultipleAggregationTyp values: []expectedValue{{value: 60}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -677,6 +699,8 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesMultipleAggregationTyp } func TestDownsamplerAggregationWithRulesConfigMappingRulesGraphitePrefixAndAggregationTags(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ "__g0__": "nginx_edge", @@ -699,7 +723,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesGraphitePrefixAndAggre Aggregations: []aggregation.Type{aggregation.Max}, StoragePolicies: []StoragePolicyConfiguration{ { - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -723,7 +747,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesGraphitePrefixAndAggre values: []expectedValue{{value: 30}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -736,6 +760,8 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesGraphitePrefixAndAggre } func TestDownsamplerAggregationWithRulesConfigMappingRulesGraphitePrefixTag(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ "__g0__": "nginx_edge", @@ -757,7 +783,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesGraphitePrefixTag(t *t Aggregations: []aggregation.Type{aggregation.Max}, StoragePolicies: []StoragePolicyConfiguration{ { - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -780,7 +806,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesGraphitePrefixTag(t *t values: []expectedValue{{value: 30}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -793,6 +819,8 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesGraphitePrefixTag(t *t } func TestDownsamplerAggregationWithRulesConfigMappingRulesAugmentTag(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ "app": "nginx_edge", @@ -814,7 +842,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesAugmentTag(t *testing. Aggregations: []aggregation.Type{aggregation.Max}, StoragePolicies: []StoragePolicyConfiguration{ { - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -836,7 +864,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesAugmentTag(t *testing. values: []expectedValue{{value: 30}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, - Resolution: 5 * time.Second, + Resolution: 1 * time.Second, Retention: 30 * 24 * time.Hour, }, }, @@ -849,6 +877,8 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesAugmentTag(t *testing. } func TestDownsamplerAggregationWithRulesConfigRollupRulesNoNameTag(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ "app": "nginx_edge", @@ -858,10 +888,10 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesNoNameTag(t *testing.T) }, timedSamples: []testGaugeMetricTimedSample{ {value: 42}, - {value: 64, offset: 5 * time.Second}, + {value: 64, offset: 1 * time.Second}, }, } - res := 5 * time.Second + res := 1 * time.Second ret := 30 * 24 * time.Hour testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ identTag: "endpoint", @@ -907,6 +937,8 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesNoNameTag(t *testing.T) } func TestDownsamplerAggregationWithRulesConfigRollupRulesPerSecondSum(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ nameTag: "http_requests", @@ -917,10 +949,10 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesPerSecondSum(t *testing }, timedSamples: []testGaugeMetricTimedSample{ {value: 42}, - {value: 64, offset: 5 * time.Second}, + {value: 64, offset: 1 * time.Second}, }, } - res := 5 * time.Second + res := 1 * time.Second ret := 30 * 24 * time.Hour testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ rulesConfig: &RulesConfiguration{ @@ -965,7 +997,7 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesPerSecondSum(t *testing "status_code": "500", "endpoint": "/foo/bar", }, - values: []expectedValue{{value: 4.4}}, + values: []expectedValue{{value: 22}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, Resolution: res, @@ -981,6 +1013,8 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesPerSecondSum(t *testing } func TestDownsamplerAggregationWithRulesConfigRollupRulesIncreaseAdd(t *testing.T) { + t.Parallel() + gaugeMetrics := []testGaugeMetric{ testGaugeMetric{ tags: map[string]string{ @@ -991,10 +1025,10 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesIncreaseAdd(t *testing. "not_rolled_up": "not_rolled_up_value_1", }, timedSamples: []testGaugeMetricTimedSample{ - {value: 42, offset: 5 * time.Second}, // +42 (should not be accounted since is a reset) + {value: 42, offset: 1 * time.Second}, // +42 (should not be accounted since is a reset) // Explicit no value. - {value: 12, offset: 15 * time.Second}, // +12 - simulate a reset (should not be accounted) - {value: 33, offset: 20 * time.Second}, // +21 + {value: 12, offset: 2 * time.Second}, // +12 - simulate a reset (should not be accounted) + {value: 33, offset: 3 * time.Second}, // +21 }, }, testGaugeMetric{ @@ -1006,14 +1040,14 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesIncreaseAdd(t *testing. "not_rolled_up": "not_rolled_up_value_2", }, timedSamples: []testGaugeMetricTimedSample{ - {value: 13, offset: 5 * time.Second}, // +13 (should not be accounted since is a reset) - {value: 27, offset: 10 * time.Second}, // +14 + {value: 13, offset: 1 * time.Second}, // +13 (should not be accounted since is a reset) + {value: 27, offset: 2 * time.Second}, // +14 // Explicit no value. - {value: 42, offset: 20 * time.Second}, // +15 + {value: 42, offset: 3 * time.Second}, // +15 }, }, } - res := 5 * time.Second + res := 1 * time.Second ret := 30 * 24 * time.Hour testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ rulesConfig: &RulesConfiguration{ @@ -1065,7 +1099,7 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesIncreaseAdd(t *testing. }, values: []expectedValue{ {value: 14}, - {value: 50, offset: 10 * time.Second}, + {value: 50, offset: 1 * time.Second}, }, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, @@ -1082,6 +1116,8 @@ func TestDownsamplerAggregationWithRulesConfigRollupRulesIncreaseAdd(t *testing. } func TestDownsamplerAggregationWithRulesConfigRollupRuleAndDropPolicy(t *testing.T) { + t.Parallel() + gaugeMetric := testGaugeMetric{ tags: map[string]string{ nameTag: "http_requests", @@ -1092,11 +1128,11 @@ func TestDownsamplerAggregationWithRulesConfigRollupRuleAndDropPolicy(t *testing }, timedSamples: []testGaugeMetricTimedSample{ {value: 42}, - {value: 64, offset: 5 * time.Second}, + {value: 64, offset: 1 * time.Second}, }, expectDropPolicyApplied: true, } - res := 5 * time.Second + res := 1 * time.Second ret := 30 * 24 * time.Hour filter := fmt.Sprintf("%s:http_requests app:* status_code:* endpoint:*", nameTag) testDownsampler := newTestDownsampler(t, testDownsamplerOptions{ @@ -1146,7 +1182,7 @@ func TestDownsamplerAggregationWithRulesConfigRollupRuleAndDropPolicy(t *testing "status_code": "500", "endpoint": "/foo/bar", }, - values: []expectedValue{{value: 4.4}}, + values: []expectedValue{{value: 22}}, attributes: &storagemetadata.Attributes{ MetricsType: storagemetadata.AggregatedMetricsType, Resolution: res, @@ -1462,7 +1498,9 @@ func testGaugeMetrics(opts testGaugeMetricsOptions) ([]testGaugeMetric, []testEx if opts.timedSamples { metric.samples = nil metric.timedSamples = []testGaugeMetricTimedSample{ - {value: 4}, {value: 5}, {value: 6}, + {value: 4}, + {value: 5}, + {value: 6, offset: 1 * time.Nanosecond}, } } write := testExpectedWrite{ @@ -1730,6 +1768,8 @@ func testDownsamplerAggregationIngest( if testOpts.sampleAppenderOpts != nil { opts = *testOpts.sampleAppenderOpts } + // make the current timestamp predictable: + now := time.Now().Truncate(time.Microsecond) for _, metric := range testCounterMetrics { appender.NextMetric() @@ -1749,7 +1789,7 @@ func testDownsamplerAggregationIngest( } for _, sample := range metric.timedSamples { if sample.time.IsZero() { - sample.time = time.Now() // Allow empty time to mean "now" + sample.time = now // Allow empty time to mean "now" } if sample.offset > 0 { sample.time = sample.time.Add(sample.offset) @@ -1777,7 +1817,7 @@ func testDownsamplerAggregationIngest( } for _, sample := range metric.timedSamples { if sample.time.IsZero() { - sample.time = time.Now() // Allow empty time to mean "now" + sample.time = now // Allow empty time to mean "now" } if sample.offset > 0 { sample.time = sample.time.Add(sample.offset) @@ -1892,7 +1932,11 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl SetMetricsScope(instrumentOpts.MetricsScope(). SubScope("metrics-appender-pool"))) - var cfg Configuration + cfg := Configuration{ + BufferPastLimits: []BufferPastLimitConfiguration{ + {Resolution: 0, BufferPast: 500 * time.Millisecond}, + }, + } if opts.remoteClientMock != nil { // Optionally set an override to use remote aggregation // with a mock client diff --git a/src/msg/integration/integration_test.go b/src/msg/integration/integration_test.go index 0c9cdf2092..6955b1d12b 100644 --- a/src/msg/integration/integration_test.go +++ b/src/msg/integration/integration_test.go @@ -23,12 +23,12 @@ package integration import ( "testing" - "github.com/m3db/m3/src/msg/topic" - "github.com/m3db/m3/src/x/test" - - "github.com/fortytw2/leaktest" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/m3db/m3/src/msg/topic" + "github.com/m3db/m3/src/x/test" ) const ( @@ -37,12 +37,12 @@ const ( ) func TestSharedConsumer(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -58,12 +58,12 @@ func TestSharedConsumer(t *testing.T) { } func TestReplicatedConsumer(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -78,12 +78,12 @@ func TestReplicatedConsumer(t *testing.T) { } func TestSharedAndReplicatedConsumers(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -102,12 +102,12 @@ func TestSharedAndReplicatedConsumers(t *testing.T) { } func TestSharedConsumerWithDeadInstance(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -136,12 +136,12 @@ func TestSharedConsumerWithDeadInstance(t *testing.T) { } func TestSharedConsumerWithDeadConnection(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -165,12 +165,12 @@ func TestSharedConsumerWithDeadConnection(t *testing.T) { } func TestReplicatedConsumerWithDeadConnection(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -193,12 +193,12 @@ func TestReplicatedConsumerWithDeadConnection(t *testing.T) { } func TestSharedAndReplicatedConsumerWithDeadConnection(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -233,12 +233,12 @@ func TestSharedAndReplicatedConsumerWithDeadConnection(t *testing.T) { } func TestSharedConsumerAddInstances(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -262,12 +262,12 @@ func TestSharedConsumerAddInstances(t *testing.T) { } func TestReplicatedConsumerAddInstances(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -290,12 +290,12 @@ func TestReplicatedConsumerAddInstances(t *testing.T) { } func TestSharedAndReplicatedConsumerAddInstances(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -330,12 +330,12 @@ func TestSharedAndReplicatedConsumerAddInstances(t *testing.T) { } func TestSharedConsumerRemoveInstances(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -359,12 +359,12 @@ func TestSharedConsumerRemoveInstances(t *testing.T) { } func TestReplicatedConsumerRemoveInstances(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -387,12 +387,12 @@ func TestReplicatedConsumerRemoveInstances(t *testing.T) { } func TestSharedAndReplicatedConsumerRemoveInstances(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -427,12 +427,12 @@ func TestSharedAndReplicatedConsumerRemoveInstances(t *testing.T) { } func TestSharedConsumerReplaceInstances(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -456,12 +456,12 @@ func TestSharedConsumerReplaceInstances(t *testing.T) { } func TestReplicatedConsumerReplaceInstances(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -484,12 +484,12 @@ func TestReplicatedConsumerReplaceInstances(t *testing.T) { } func TestSharedAndReplicatedConsumerReplaceInstances(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -524,12 +524,12 @@ func TestSharedAndReplicatedConsumerReplaceInstances(t *testing.T) { } func TestRemoveConsumerService(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -552,12 +552,12 @@ func TestRemoveConsumerService(t *testing.T) { } func TestAddConsumerService(t *testing.T) { + t.Parallel() + if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } - defer leaktest.Check(t)() - ctrl := gomock.NewController(test.Reporter{t}) defer ctrl.Finish() @@ -579,3 +579,7 @@ func TestAddConsumerService(t *testing.T) { require.True(t, s.consumerServices[2].numConsumed() <= s.ExpectedNumMessages()*80/100) } } + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, goleak.IgnoreCurrent()) +}