Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor PeerProvider & hashring interaction #6296

Merged
merged 2 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 79 additions & 54 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package membership

import (
"fmt"
"slices"
"sort"
"strings"
"sync"
Expand All @@ -44,7 +45,6 @@ import (

// ErrInsufficientHosts is thrown when there are not enough hosts to serve the request
var ErrInsufficientHosts = &types.InternalServiceError{Message: "Not enough hosts to serve the request"}
var emptyEvent = &ChangedEvent{}

const (
minRefreshInternal = time.Second * 4
Expand All @@ -59,14 +59,14 @@ type PeerProvider interface {
GetMembers(service string) ([]HostInfo, error)
WhoAmI() (HostInfo, error)
SelfEvict() error
Subscribe(name string, notifyChannel chan<- *ChangedEvent) error
Subscribe(name string, handler func(ChangedEvent)) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably a better API yeah,

}

type ring struct {
status int32
service string
peerProvider PeerProvider
refreshChan chan *ChangedEvent
refreshChan chan struct{}
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
timeSource clock.TimeSource
Expand Down Expand Up @@ -99,7 +99,7 @@ func newHashring(
service: service,
peerProvider: provider,
shutdownCh: make(chan struct{}),
refreshChan: make(chan *ChangedEvent),
refreshChan: make(chan struct{}, 1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the log you're referring to the in the description, but I'm slightly nervous changing this: I'd want to ensure that it doesn't cause some upstream process to block way more suddenly.

Have we tested it in practice? I'm guessing the result would be to drop just a lot more events, or block a lot more?

Given that we don't really care about the changes, only the whole state, would a debounce (on an event, waiting for 100 MS and then only processing the last event and dropping all the rest for example) more sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I think there's already a debounce (line 255)

I'm not sure I understand why we should try to make this blocking then? Not opposed to the change, but apart from the log noise, I'm not sure if it presents a problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me explain this since it is really tricky part. What we currently have in hashring.go is unbuffered channel. Writes from peerProvider (ringpop/uns) are already happening in a non-blocking way (via select). It means write will only succeed if receiver is reading the channel right now. If it is doing something else (not ready yet, or it is in refresh() which ofc takes some time) - write will fail. That's why we see huge % of writes are failing - another membership change happens during refresh() or update or notifying subscribers.

I made this playground to illustrate the issue: https://go.dev/play/p/2Os6fkarH8W
Having a channel of size 1 guarantees we never miss the fact of being notified (notice, PeerProvider now calls an update function).

What happened before (with non-buffered chan):

  1. PeerProvider writes ChangedEvent
  2. We received ChangedEvent and called refresh() etc.
  3. While we were in refresh() PeerProvider got another event and tried to write it to the channel. Since we're not reading it right now (we are in refresh()), it won't be able to notify us, and will log "channel is full"
  4. We finished refresh(), but we don't have anything in the channel, so we will wait until the next refresh() or defaultRefreshInterval=10s to capture the update.

What happens now (with buffer=1):

  1. PeerProvider calls handler
  2. handler adds an event to channel to call "refresh()"
  3. We read refreshChan; channel capacity is now empty; we are calling called refresh() etc.
  4. While we were in refresh() PeerProvider got another event and calls handler again. Handler will add notification to the refreshChan.
  5. refresh() and others are finished, we back to reading refreshChan. There is a message to read; which means, we didn't loose the fact we've been notified.

Copy link
Member Author

@dkrotx dkrotx Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we should try to make this blocking then?

I don't quite understand what do you mean by that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my comment was wrong, I follow your point after after discussing: That we risk loosing the last event if they're sequenced one after the other and this keeps diverting to the default path. 🙏 for the explanation

timeSource: timeSource,
logger: logger,
scope: scope,
Expand All @@ -125,11 +125,11 @@ func (r *ring) Start() {
) {
return
}
if err := r.peerProvider.Subscribe(r.service, r.refreshChan); err != nil {
if err := r.peerProvider.Subscribe(r.service, r.handleUpdates); err != nil {
r.logger.Fatal("subscribing to peer provider", tag.Error(err))
}

if _, err := r.refresh(); err != nil {
if err := r.refresh(); err != nil {
r.logger.Fatal("failed to start service resolver", tag.Error(err))
}

Expand All @@ -151,8 +151,8 @@ func (r *ring) Stop() {
r.value.Store(emptyHashring())

r.subscribers.Lock()
defer r.subscribers.Unlock()
r.subscribers.keys = make(map[string]chan<- *ChangedEvent)
r.subscribers.Unlock()
close(r.shutdownCh)

if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success {
Expand All @@ -166,12 +166,10 @@ func (r *ring) Lookup(
) (HostInfo, error) {
addr, found := r.ring().Lookup(key)
if !found {
select {
case r.refreshChan <- emptyEvent:
default:
}
r.signalSelf()
return HostInfo{}, ErrInsufficientHosts
}

r.members.RLock()
defer r.members.RUnlock()
host, ok := r.members.keys[addr]
Expand All @@ -195,12 +193,25 @@ func (r *ring) Subscribe(watcher string, notifyChannel chan<- *ChangedEvent) err
return nil
}

func (r *ring) notifySubscribers(msg *ChangedEvent) {
func (r *ring) handleUpdates(event ChangedEvent) {
dkrotx marked this conversation as resolved.
Show resolved Hide resolved
r.signalSelf()
}

func (r *ring) signalSelf() {
var event struct{}
select {
case r.refreshChan <- event:
default: // channel already has an event, don't block
}
}

func (r *ring) notifySubscribers(msg ChangedEvent) {
r.subscribers.Lock()
defer r.subscribers.Unlock()

for name, ch := range r.subscribers.keys {
select {
case ch <- msg:
case ch <- &msg:
default:
r.logger.Error("subscriber notification failed", tag.Name(name))
}
Expand Down Expand Up @@ -240,59 +251,61 @@ func (r *ring) Members() []HostInfo {
return hosts
}

func (r *ring) refresh() (refreshed bool, err error) {
func (r *ring) refresh() error {
if r.members.refreshed.After(r.timeSource.Now().Add(-minRefreshInternal)) {
// refreshed too frequently
return false, nil
return nil
}

members, err := r.peerProvider.GetMembers(r.service)
if err != nil {
return false, fmt.Errorf("getting members from peer provider: %w", err)
return fmt.Errorf("getting members from peer provider: %w", err)
}

r.members.Lock()
defer r.members.Unlock()
newMembersMap, changed := r.compareMembers(members)
if !changed {
return false, nil
newMembersMap := r.makeMembersMap(members)

diff := r.diffMembers(newMembersMap)
if diff.Empty() {
return nil
}

ring := emptyHashring()
ring.AddMembers(castToMembers(members)...)

r.members.keys = newMembersMap
r.members.refreshed = r.timeSource.Now()
r.value.Store(ring)
r.logger.Info("refreshed ring members", tag.Value(members))

return true, nil
r.updateMembersMap(newMembersMap)

r.emitHashIdentifier()
r.notifySubscribers(diff)

return nil
}

func (r *ring) refreshAndNotifySubscribers(event *ChangedEvent) {
refreshed, err := r.refresh()
if err != nil {
r.logger.Error("refreshing ring", tag.Error(err))
}
if refreshed {
r.notifySubscribers(event)
}
func (r *ring) updateMembersMap(newMembers map[string]HostInfo) {
r.members.Lock()
defer r.members.Unlock()

r.members.keys = newMembers
r.members.refreshed = r.timeSource.Now()
}

func (r *ring) refreshRingWorker() {
defer r.shutdownWG.Done()

refreshTicker := r.timeSource.NewTicker(defaultRefreshInterval)
defer refreshTicker.Stop()

for {
select {
case <-r.shutdownCh:
return
case event := <-r.refreshChan: // local signal or signal from provider
r.refreshAndNotifySubscribers(event)
case <-refreshTicker.Chan(): // periodically refresh membership
r.emitHashIdentifier()
r.refreshAndNotifySubscribers(emptyEvent)
case <-r.refreshChan: // local signal or signal from provider
if err := r.refresh(); err != nil {
r.logger.Error("failed to refresh ring", tag.Error(err))
}
case <-refreshTicker.Chan(): // periodically force refreshing membership
r.signalSelf()
}
}
}
Expand All @@ -302,11 +315,7 @@ func (r *ring) ring() *hashring.HashRing {
}

func (r *ring) emitHashIdentifier() float64 {
members, err := r.peerProvider.GetMembers(r.service)
if err != nil {
r.logger.Error("Observed a problem getting peer members while emitting hash identifier metrics", tag.Error(err))
return -1
}
members := r.Members()
self, err := r.peerProvider.WhoAmI()
if err != nil {
r.logger.Error("Observed a problem looking up self from the membership provider while emitting hash identifier metrics", tag.Error(err))
Expand Down Expand Up @@ -343,22 +352,38 @@ func (r *ring) emitHashIdentifier() float64 {
return trimmedForMetric
}

func (r *ring) compareMembers(members []HostInfo) (map[string]HostInfo, bool) {
changed := false
newMembersMap := make(map[string]HostInfo, len(members))
for _, member := range members {
newMembersMap[member.GetAddress()] = member
if _, ok := r.members.keys[member.GetAddress()]; !ok {
changed = true
func (r *ring) makeMembersMap(members []HostInfo) map[string]HostInfo {
membersMap := make(map[string]HostInfo, len(members))
for _, m := range members {
membersMap[m.GetAddress()] = m
}
return membersMap
}

func (r *ring) diffMembers(newMembers map[string]HostInfo) ChangedEvent {
r.members.RLock()
defer r.members.RUnlock()

var combinedChange ChangedEvent

// find newly added hosts
for addr := range newMembers {
if _, found := r.members.keys[addr]; !found {
combinedChange.HostsAdded = append(combinedChange.HostsAdded, addr)
}
}
// find removed hosts
for addr := range r.members.keys {
if _, ok := newMembersMap[addr]; !ok {
changed = true
break
if _, found := newMembers[addr]; !found {
combinedChange.HostsRemoved = append(combinedChange.HostsRemoved, addr)
}
}
return newMembersMap, changed

// order since it will most probably used in logs
slices.Sort(combinedChange.HostsAdded)
slices.Sort(combinedChange.HostsUpdated)
slices.Sort(combinedChange.HostsRemoved)
return combinedChange
}

func castToMembers[T membership.Member](members []T) []membership.Member {
Expand Down
Loading
Loading