diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fee8384c8..e8ae5a0dd7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731 * [FEATURE] Tracing: Add `tracing.otel.round-robin` flag to use `round_robin` gRPC client side LB policy for sending OTLP traces. #5731 * [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766 +* [FEATURE] Distributor Queryable: Experimental: Add config `zone_results_quorum_metadata`. When querying ingesters using metadata APIs such as label names, values and series, only results from quorum number of zones will be included and merged. #5779 * [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 * [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683 * [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684 diff --git a/integration/zone_aware_test.go b/integration/zone_aware_test.go index c4d7937478..938be900b0 100644 --- a/integration/zone_aware_test.go +++ b/integration/zone_aware_test.go @@ -151,3 +151,95 @@ func TestZoneAwareReplication(t *testing.T) { require.Equal(t, 500, res.StatusCode) } + +func TestZoneResultsQuorum(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + flags := BlocksStorageFlags() + flags["-distributor.shard-by-all-labels"] = "true" + flags["-distributor.replication-factor"] = "3" + flags["-distributor.zone-awareness-enabled"] = "true" + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components. + ingesterFlags := func(zone string) map[string]string { + return mergeFlags(flags, map[string]string{ + "-ingester.availability-zone": zone, + }) + } + + ingester1 := e2ecortex.NewIngesterWithConfigFile("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "") + ingester2 := e2ecortex.NewIngesterWithConfigFile("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "") + ingester3 := e2ecortex.NewIngesterWithConfigFile("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "") + ingester4 := e2ecortex.NewIngesterWithConfigFile("ingester-4", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "") + ingester5 := e2ecortex.NewIngesterWithConfigFile("ingester-5", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "") + ingester6 := e2ecortex.NewIngesterWithConfigFile("ingester-6", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "") + require.NoError(t, s.StartAndWaitReady(ingester1, ingester2, ingester3, ingester4, ingester5, ingester6)) + + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + flagsZoneResultsQuorum := mergeFlags(flags, map[string]string{ + "-distributor.zone-results-quorum-metadata": "true", + }) + querierZoneResultsQuorum := e2ecortex.NewQuerier("querier-zrq", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsZoneResultsQuorum, "") + require.NoError(t, s.StartAndWaitReady(distributor, querier, querierZoneResultsQuorum)) + + // Wait until distributor and queriers have updated the ring. + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + require.NoError(t, querierZoneResultsQuorum.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID) + require.NoError(t, err) + clientZoneResultsQuorum, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querierZoneResultsQuorum.HTTPEndpoint(), "", "", userID) + require.NoError(t, err) + + // Push some series + now := time.Now() + numSeries := 100 + expectedVectors := map[string]model.Vector{} + + for i := 1; i <= numSeries; i++ { + metricName := fmt.Sprintf("series_%d", i) + series, expectedVector := generateSeries(metricName, now) + res, err := client.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + expectedVectors[metricName] = expectedVector + } + + start := now.Add(-time.Hour) + end := now.Add(time.Hour) + res1, err := client.LabelNames(start, end) + require.NoError(t, err) + res2, err := clientZoneResultsQuorum.LabelNames(start, end) + require.NoError(t, err) + assert.Equal(t, res1, res2) + + values1, err := client.LabelValues(labels.MetricName, start, end, nil) + require.NoError(t, err) + values2, err := clientZoneResultsQuorum.LabelValues(labels.MetricName, start, end, nil) + require.NoError(t, err) + assert.Equal(t, values1, values2) + + series1, err := client.Series([]string{`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`}, start, end) + require.NoError(t, err) + series2, err := clientZoneResultsQuorum.Series([]string{`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`}, start, end) + require.NoError(t, err) + assert.Equal(t, series1, series2) +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4b4eea0bf9..2fce10cf31 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -145,6 +145,11 @@ type Config struct { // This config is dynamically injected because defined in the querier config. ShuffleShardingLookbackPeriod time.Duration `yaml:"-"` + // ZoneResultsQuorumMetadata enables zone results quorum when querying ingester replication set + // with metadata APIs (labels names, values and series). When zone awareness is enabled, only results + // from quorum number of zones will be included to reduce data merged and improve performance. + ZoneResultsQuorumMetadata bool `yaml:"zone_results_quorum_metadata" doc:"hidden"` + // Limits for distributor InstanceLimits InstanceLimits `yaml:"instance_limits"` } @@ -167,6 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.") f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.") + f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "Experimental, this flag may change in the future. If zone awareness and this both enabled, when querying metadata APIs (labels names, values and series), only results from quorum number of zones will be included.") f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.") f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.") @@ -924,8 +930,8 @@ func getErrorStatus(err error) string { } // ForReplicationSet runs f, in parallel, for all ingesters in the input replication set. -func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) { - return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { +func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, zoneResultsQuorum bool, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) { + return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, zoneResultsQuorum, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { client, err := d.ingesterPool.GetClientFor(ing.Addr) if err != nil { return nil, err @@ -981,7 +987,7 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t // LabelValuesForLabelName returns all the label values that are associated with a given label name. func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) { return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) { - return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { resp, err := client.LabelValues(ctx, req) if err != nil { return nil, err @@ -994,7 +1000,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode // LabelValuesForLabelNameStream returns all the label values that are associated with a given label name. func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) { return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) { - return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { stream, err := client.LabelValuesStream(ctx, req) if err != nil { return nil, err @@ -1059,7 +1065,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) { return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) { - return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { stream, err := client.LabelNamesStream(ctx, req) if err != nil { return nil, err @@ -1085,7 +1091,7 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) // LabelNames returns all the label names. func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) { return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) { - return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { resp, err := client.LabelNames(ctx, req) if err != nil { return nil, err @@ -1098,7 +1104,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st // MetricsForLabelMatchers gets the metrics that match said matchers func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) { return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error { - _, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + _, err := d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { resp, err := client.MetricsForLabelMatchers(ctx, req) if err != nil { return nil, err @@ -1127,7 +1133,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) { return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error { - _, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + _, err := d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { stream, err := client.MetricsForLabelMatchersStream(ctx, req) if err != nil { return nil, err @@ -1205,7 +1211,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad req := &ingester_client.MetricsMetadataRequest{} // TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled. - resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { return client.MetricsMetadata(ctx, req) }) if err != nil { @@ -1247,7 +1253,7 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) { replicationSet.MaxErrors = 0 req := &ingester_client.UserStatsRequest{} - resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + resps, err := d.ForReplicationSet(ctx, replicationSet, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { return client.UserStats(ctx, req) }) if err != nil { diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 8013e16a84..3048b1c471 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -161,7 +161,7 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (model.Matrix, error) { // Fetch samples from multiple ingesters in parallel, using the replicationSet // to deal with consistency. - results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { + results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { client, err := d.ingesterPool.GetClientFor(ing.Addr) if err != nil { return nil, err @@ -232,7 +232,7 @@ func mergeExemplarSets(a, b []cortexpb.Exemplar) []cortexpb.Exemplar { func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.ExemplarQueryRequest) (*ingester_client.ExemplarQueryResponse, error) { // Fetch exemplars from multiple ingesters in parallel, using the replicationSet // to deal with consistency. - results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { + results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { client, err := d.ingesterPool.GetClientFor(ing.Addr) if err != nil { return nil, err @@ -293,7 +293,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri ) // Fetch samples from multiple ingesters - results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { + results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { client, err := d.ingesterPool.GetClientFor(ing.Addr) if err != nil { return nil, err diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 67630bf53c..0182207fd7 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -21,18 +21,19 @@ type ReplicationSet struct { } // Do function f in parallel for all replicas in the set, erroring is we exceed -// MaxErrors and returning early otherwise. -func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) { +// MaxErrors and returning early otherwise. zoneResultsQuorum allows only include +// results from zones that already reach quorum to improve performance. +func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResultsQuorum bool, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) { type instanceResult struct { res interface{} err error instance *InstanceDesc } - // Initialise the result tracker, which is use to keep track of successes and failures. + // Initialise the result tracker, which is used to keep track of successes and failures. var tracker replicationSetResultTracker if r.MaxUnavailableZones > 0 { - tracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones) + tracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones, zoneResultsQuorum) } else { tracker = newDefaultResultTracker(r.Instances, r.MaxErrors) } @@ -67,12 +68,10 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont }(i, &r.Instances[i]) } - results := make([]interface{}, 0, len(r.Instances)) - for !tracker.succeeded() { select { case res := <-ch: - tracker.done(res.instance, res.err) + tracker.done(res.instance, res.res, res.err) if res.err != nil { if tracker.failed() { return nil, res.err @@ -82,8 +81,6 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont if delay > 0 && r.MaxUnavailableZones == 0 { forceStart <- struct{}{} } - } else { - results = append(results, res.res) } case <-ctx.Done(): @@ -91,7 +88,7 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont } } - return results, nil + return tracker.getResults(), nil } // Includes returns whether the replication set includes the replica with the provided addr. diff --git a/pkg/ring/replication_set_test.go b/pkg/ring/replication_set_test.go index 0e63184170..f4fb7449c5 100644 --- a/pkg/ring/replication_set_test.go +++ b/pkg/ring/replication_set_test.go @@ -120,6 +120,7 @@ func TestReplicationSet_Do(t *testing.T) { cancelContextDelay time.Duration want []interface{} expectedError error + zoneResultsQuorum bool }{ { name: "max errors = 0, no errors no delay", @@ -211,6 +212,26 @@ func TestReplicationSet_Do(t *testing.T) { maxUnavailableZones: 2, want: []interface{}{1, 1, 1, 1, 1, 1}, }, + { + name: "max unavailable zones = 1, zoneResultsQuorum = false, should contain 5 results (2 from zone1, 2 from zone2 and 1 from zone3)", + instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}, {Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, + f: func(c context.Context, id *InstanceDesc) (interface{}, error) { + return 1, nil + }, + maxUnavailableZones: 1, + want: []interface{}{1, 1, 1, 1, 1}, + zoneResultsQuorum: false, + }, + { + name: "max unavailable zones = 1, zoneResultsQuorum = true, should contain 4 results (2 from zone1, 2 from zone2)", + instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}, {Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, + f: func(c context.Context, id *InstanceDesc) (interface{}, error) { + return 1, nil + }, + maxUnavailableZones: 1, + want: []interface{}{1, 1, 1, 1}, + zoneResultsQuorum: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -231,7 +252,7 @@ func TestReplicationSet_Do(t *testing.T) { cancel() }) } - got, err := r.Do(ctx, tt.delay, tt.f) + got, err := r.Do(ctx, tt.delay, tt.zoneResultsQuorum, tt.f) if tt.expectedError != nil { assert.Equal(t, tt.expectedError, err) } else { diff --git a/pkg/ring/replication_set_tracker.go b/pkg/ring/replication_set_tracker.go index fcdf5441dd..dd22909747 100644 --- a/pkg/ring/replication_set_tracker.go +++ b/pkg/ring/replication_set_tracker.go @@ -2,14 +2,18 @@ package ring type replicationSetResultTracker interface { // Signals an instance has done the execution, either successful (no error) - // or failed (with error). - done(instance *InstanceDesc, err error) + // or failed (with error). If successful, result will be recorded and can + // be accessed via getResults. + done(instance *InstanceDesc, result interface{}, err error) // Returns true if the minimum number of successful results have been received. succeeded() bool // Returns true if the maximum number of failed executions have been reached. failed() bool + + // Returns recorded results. + getResults() []interface{} } type defaultResultTracker struct { @@ -17,6 +21,7 @@ type defaultResultTracker struct { numSucceeded int numErrors int maxErrors int + results []interface{} } func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultResultTracker { @@ -25,12 +30,14 @@ func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultRe numSucceeded: 0, numErrors: 0, maxErrors: maxErrors, + results: make([]interface{}, 0, len(instances)), } } -func (t *defaultResultTracker) done(_ *InstanceDesc, err error) { +func (t *defaultResultTracker) done(_ *InstanceDesc, result interface{}, err error) { if err == nil { t.numSucceeded++ + t.results = append(t.results, result) } else { t.numErrors++ } @@ -44,6 +51,10 @@ func (t *defaultResultTracker) failed() bool { return t.numErrors > t.maxErrors } +func (t *defaultResultTracker) getResults() []interface{} { + return t.results +} + // zoneAwareResultTracker tracks the results per zone. // All instances in a zone must succeed in order for the zone to succeed. type zoneAwareResultTracker struct { @@ -51,29 +62,42 @@ type zoneAwareResultTracker struct { failuresByZone map[string]int minSuccessfulZones int maxUnavailableZones int + resultsPerZone map[string][]interface{} + numInstances int + zoneResultsQuorum bool } -func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int) *zoneAwareResultTracker { +func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int, zoneResultsQuorum bool) *zoneAwareResultTracker { t := &zoneAwareResultTracker{ waitingByZone: make(map[string]int), failuresByZone: make(map[string]int), maxUnavailableZones: maxUnavailableZones, + numInstances: len(instances), + zoneResultsQuorum: zoneResultsQuorum, } for _, instance := range instances { t.waitingByZone[instance.Zone]++ } t.minSuccessfulZones = len(t.waitingByZone) - maxUnavailableZones + t.resultsPerZone = make(map[string][]interface{}, len(t.waitingByZone)) return t } -func (t *zoneAwareResultTracker) done(instance *InstanceDesc, err error) { - t.waitingByZone[instance.Zone]-- - +func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}, err error) { if err != nil { t.failuresByZone[instance.Zone]++ + } else { + if _, ok := t.resultsPerZone[instance.Zone]; !ok { + // If it is the first result in the zone, then total number of instances + // in this zone should be number of waiting required. + t.resultsPerZone[instance.Zone] = make([]interface{}, 0, t.waitingByZone[instance.Zone]) + } + t.resultsPerZone[instance.Zone] = append(t.resultsPerZone[instance.Zone], result) } + + t.waitingByZone[instance.Zone]-- } func (t *zoneAwareResultTracker) succeeded() bool { @@ -94,3 +118,21 @@ func (t *zoneAwareResultTracker) failed() bool { failedZones := len(t.failuresByZone) return failedZones > t.maxUnavailableZones } + +func (t *zoneAwareResultTracker) getResults() []interface{} { + results := make([]interface{}, 0, t.numInstances) + if t.zoneResultsQuorum { + for zone, waiting := range t.waitingByZone { + // No need to check failuresByZone since tracker + // should already succeed before reaching here. + if waiting == 0 { + results = append(results, t.resultsPerZone[zone]...) + } + } + } else { + for zone := range t.resultsPerZone { + results = append(results, t.resultsPerZone[zone]...) + } + } + return results +} diff --git a/pkg/ring/replication_set_tracker_test.go b/pkg/ring/replication_set_tracker_test.go index f24d23c00a..a0d04f1279 100644 --- a/pkg/ring/replication_set_tracker_test.go +++ b/pkg/ring/replication_set_tracker_test.go @@ -33,19 +33,19 @@ func TestDefaultResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, nil) + tracker.done(&instance2, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance3, nil) + tracker.done(&instance3, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance4, nil) + tracker.done(&instance4, nil, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) }, @@ -57,11 +57,11 @@ func TestDefaultResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, errors.New("test")) + tracker.done(&instance2, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) }, @@ -73,15 +73,15 @@ func TestDefaultResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, errors.New("test")) + tracker.done(&instance2, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance3, errors.New("test")) + tracker.done(&instance3, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) }, @@ -93,23 +93,67 @@ func TestDefaultResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, errors.New("test")) + tracker.done(&instance2, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance3, errors.New("test")) + tracker.done(&instance3, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance4, errors.New("test")) + tracker.done(&instance4, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) }, }, + "record and getResults": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4}, + maxErrors: 1, + run: func(t *testing.T, tracker *defaultResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, 2, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, 3, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{1, 2, 3}, tracker.getResults()) + }, + }, + "record and getResults2": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4}, + maxErrors: 1, + run: func(t *testing.T, tracker *defaultResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, []int{1, 1, 1}, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, []int{2, 2, 2}, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, []int{3, 3, 3}, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{[]int{1, 1, 1}, []int{2, 2, 2}, []int{3, 3, 3}}, tracker.getResults()) + }, + }, } for testName, testCase := range tests { @@ -130,6 +174,7 @@ func TestZoneAwareResultTracker(t *testing.T) { tests := map[string]struct { instances []InstanceDesc maxUnavailableZones int + zoneResultsQuorum bool run func(t *testing.T, tracker *zoneAwareResultTracker) }{ "should succeed on no instances to track": { @@ -147,17 +192,115 @@ func TestZoneAwareResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, 1, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, nil) + tracker.done(&instance2, 1, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance3, nil) + tracker.done(&instance3, 1, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{1, 1, 1}, tracker.getResults()) + }, + }, + "should succeed once all 6 instances succeed on max unavailable zones = 0": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 0, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance4, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance5, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance6, 1, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{1, 1, 1, 1, 1, 1}, tracker.getResults()) + }, + }, + "should succeed once all 5 instances succeed on max unavailable zones = 1, zone results quorum disabled": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 1, + zoneResultsQuorum: false, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance5, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance4, 1, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{1, 1, 1, 1, 1}, tracker.getResults()) + }, + }, + "should succeed once all 5 instances succeed on max unavailable zones = 1, zone results quorum enabled": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 1, + zoneResultsQuorum: true, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance5, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance4, 1, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{1, 1, 1, 1}, tracker.getResults()) }, }, "should fail on 1st failing instance on max unavailable zones = 0": { @@ -167,11 +310,11 @@ func TestZoneAwareResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, errors.New("test")) + tracker.done(&instance2, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) }, @@ -182,19 +325,19 @@ func TestZoneAwareResultTracker(t *testing.T) { run: func(t *testing.T, tracker *zoneAwareResultTracker) { // Track failing instances. for _, instance := range []InstanceDesc{instance1, instance2} { - tracker.done(&instance, errors.New("test")) + tracker.done(&instance, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) } // Track successful instances. for _, instance := range []InstanceDesc{instance3, instance4, instance5} { - tracker.done(&instance, nil) + tracker.done(&instance, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) } - tracker.done(&instance6, nil) + tracker.done(&instance6, nil, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) }, @@ -205,12 +348,12 @@ func TestZoneAwareResultTracker(t *testing.T) { run: func(t *testing.T, tracker *zoneAwareResultTracker) { // Track successful instances. for _, instance := range []InstanceDesc{instance1, instance2, instance3} { - tracker.done(&instance, nil) + tracker.done(&instance, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) } - tracker.done(&instance4, nil) + tracker.done(&instance4, nil, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) }, @@ -221,17 +364,17 @@ func TestZoneAwareResultTracker(t *testing.T) { run: func(t *testing.T, tracker *zoneAwareResultTracker) { // Track failing instances. for _, instance := range []InstanceDesc{instance1, instance2, instance3, instance4} { - tracker.done(&instance, errors.New("test")) + tracker.done(&instance, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) } // Track successful instances. - tracker.done(&instance5, nil) + tracker.done(&instance5, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance6, nil) + tracker.done(&instance6, nil, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) }, @@ -241,17 +384,17 @@ func TestZoneAwareResultTracker(t *testing.T) { maxUnavailableZones: 2, run: func(t *testing.T, tracker *zoneAwareResultTracker) { // Zone-a - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) // Zone-b - tracker.done(&instance3, nil) + tracker.done(&instance3, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) // Zone-a - tracker.done(&instance2, nil) + tracker.done(&instance2, nil, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) }, @@ -260,7 +403,7 @@ func TestZoneAwareResultTracker(t *testing.T) { for testName, testCase := range tests { t.Run(testName, func(t *testing.T) { - testCase.run(t, newZoneAwareResultTracker(testCase.instances, testCase.maxUnavailableZones)) + testCase.run(t, newZoneAwareResultTracker(testCase.instances, testCase.maxUnavailableZones, testCase.zoneResultsQuorum)) }) } }