Skip to content

Commit

Permalink
Memberlist: fix "forget" (#3603)
Browse files Browse the repository at this point in the history
* When deleting entry in the ring and using gossip, update timestamp to deletion time.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
pstibrany committed Dec 16, 2020
1 parent ddfbbf2 commit b9f9410
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [ENHANCEMENT] Memberlist: client can now keep a size-bounded buffer with sent and received messages and display them in the admin UI (/memberlist) for troubleshooting. #3581
* [BUGFIX] Allow `-querier.max-query-lookback` use `y|w|d` suffix like deprecated `-store.max-look-back-period`. #3598
* [BUGFIX] Query-Frontend: `cortex_query_seconds_total` now return seconds not nanoseconds. #3589
* [BUGFIX] Memberlist: Entry in the ring should now not appear again after using "Forget" feature (unless it's still heartbeating). #3603

## 1.6.0-rc.0 in progress

Expand Down
18 changes: 9 additions & 9 deletions pkg/ring/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,30 +356,30 @@ func TestMergeRemoveMissing(t *testing.T) {
Ingesters: map[string]IngesterDesc{
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now, State: LEFT},
"Ing 3": {Addr: "addr3", Timestamp: now + 3, State: LEFT}, // When deleting, time depends on value passed to merge function.
},
}
}

{
our, ch := mergeLocalCAS(firstRing(), secondRing())
our, ch := mergeLocalCAS(firstRing(), secondRing(), now+3)
assert.Equal(t, expectedFirstSecondMerge(), our)
assert.Equal(t, &Desc{
Ingesters: map[string]IngesterDesc{
"Ing 2": {Addr: "addr2", Timestamp: now + 5, State: ACTIVE, Tokens: []uint32{5, 10, 20, 100, 200}},
"Ing 3": {Addr: "addr3", Timestamp: now, State: LEFT},
"Ing 3": {Addr: "addr3", Timestamp: now + 3, State: LEFT}, // When deleting, time depends on value passed to merge function.
},
}, ch) // entire second ring is new
}

{ // idempotency: (no change after applying same ring again)
our, ch := mergeLocalCAS(expectedFirstSecondMerge(), secondRing())
{ // idempotency: (no change after applying same ring again, even if time has advanced)
our, ch := mergeLocalCAS(expectedFirstSecondMerge(), secondRing(), now+10)
assert.Equal(t, expectedFirstSecondMerge(), our)
assert.Equal(t, (*Desc)(nil), ch)
}

{ // commutativity is broken when deleting missing entries. But let's make sure we get reasonable results at least.
our, ch := mergeLocalCAS(secondRing(), firstRing())
our, ch := mergeLocalCAS(secondRing(), firstRing(), now+3)
assert.Equal(t, &Desc{
Ingesters: map[string]IngesterDesc{
"Ing 1": {Addr: "addr1", Timestamp: now, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestMergeMissingIntoLeft(t *testing.T) {
}

{
our, ch := mergeLocalCAS(ring1(), ring2())
our, ch := mergeLocalCAS(ring1(), ring2(), now+10)
assert.Equal(t, &Desc{
Ingesters: map[string]IngesterDesc{
"Ing 1": {Addr: "addr1", Timestamp: now + 10, State: ACTIVE, Tokens: []uint32{30, 40, 50}},
Expand All @@ -438,8 +438,8 @@ func TestMergeMissingIntoLeft(t *testing.T) {
}
}

func mergeLocalCAS(ring1, ring2 *Desc) (*Desc, *Desc) {
change, err := ring1.Merge(ring2, true)
func mergeLocalCAS(ring1, ring2 *Desc, nowUnixTime int64) (*Desc, *Desc) {
change, err := ring1.mergeWithTime(ring2, true, time.Unix(nowUnixTime, 0))
if err != nil {
panic(err)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration) b
//
// This method is part of memberlist.Mergeable interface, and is only used by gossiping ring.
func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error) {
return d.mergeWithTime(mergeable, localCAS, time.Now())
}

func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now time.Time) (memberlist.Mergeable, error) {
if mergeable == nil {
return nil, nil
}
Expand Down Expand Up @@ -229,6 +233,10 @@ func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.
// missing, let's mark our ingester as LEFT
ting.State = LEFT
ting.Tokens = nil
// We are deleting entry "now", and should not keep old timestamp, because there may already be pending
// message in the gossip network with newer timestamp (but still older than "now").
// Such message would "resurrect" this deleted entry.
ting.Timestamp = now.Unix()
thisIngesterMap[name] = ting

updated = append(updated, name)
Expand Down

0 comments on commit b9f9410

Please sign in to comment.