Skip to content

Commit

Permalink
Refactor PeerProvider & hashring interaction (cadence-workflow#6296)
Browse files Browse the repository at this point in the history
* Refactor PeerProvider & hashring interaction

As the result:
 - hashring never misses updated (no "channel is full" situation possible)
 - subscribers of MultiringResolver (history, matching) will always get
   sane ChangedEvent which reflects the REAL changes (see details below)

Previously there were problems:
 - hashring subscribed to PeerProvider (ringpop/uns) with non-buffered channel
   which led to failures to write every time `ring` was doing something
   else than reading the channel (happened 60% of times based on error-logs).
   Switched to calling handlers instead which are implementing
   schedule-update with channel with cap=1 approach (see `signalSelf`).
   This approach never skips updates.
 - PeerProvider supplies ChangedEvent to `ring`, but in reality, we do
   not use it - we refresh everything from scratch. This makes very
   misleading to even rely on the ChangedEvent. Basically, we might be
   triggered by some event (host "c" appeared), but during refresh() we
   realise there are more changes (host "a" removed, host "c" added as
   well, etc.), and we notify our Subscribers with an absolutely
   irrelevant data.
 - Same misleading took behaviour place in other methods like
   `emitHashIdentifier`. It retrieved list of members from PeerProvider
   **independently**, which could lead to emitting hash of a different
   state than members we just retrieved in refresh().

Also fixed race-condition: we were waiting for ring to stop, but in
order to read stop-channel it sometimes had to finish notifying
subscribers which took the same lock. We need to be careful with
lock-scope.

All in all, not methods are more synchronised, called more expectedly
(`compareMembers` should not make a new map), and notifiying subscribers
is **inseparable** from ring::refresh() like it should be.
  • Loading branch information
dkrotx authored Sep 27, 2024
1 parent d19937f commit 9807d5d
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 141 deletions.
133 changes: 79 additions & 54 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package membership

import (
"fmt"
"slices"
"sort"
"strings"
"sync"
Expand All @@ -44,7 +45,6 @@ import (

// ErrInsufficientHosts is thrown when there are not enough hosts to serve the request
var ErrInsufficientHosts = &types.InternalServiceError{Message: "Not enough hosts to serve the request"}
var emptyEvent = &ChangedEvent{}

const (
minRefreshInternal = time.Second * 4
Expand All @@ -59,14 +59,14 @@ type PeerProvider interface {
GetMembers(service string) ([]HostInfo, error)
WhoAmI() (HostInfo, error)
SelfEvict() error
Subscribe(name string, notifyChannel chan<- *ChangedEvent) error
Subscribe(name string, handler func(ChangedEvent)) error
}

type ring struct {
status int32
service string
peerProvider PeerProvider
refreshChan chan *ChangedEvent
refreshChan chan struct{}
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
timeSource clock.TimeSource
Expand Down Expand Up @@ -99,7 +99,7 @@ func newHashring(
service: service,
peerProvider: provider,
shutdownCh: make(chan struct{}),
refreshChan: make(chan *ChangedEvent),
refreshChan: make(chan struct{}, 1),
timeSource: timeSource,
logger: logger,
scope: scope,
Expand All @@ -125,11 +125,11 @@ func (r *ring) Start() {
) {
return
}
if err := r.peerProvider.Subscribe(r.service, r.refreshChan); err != nil {
if err := r.peerProvider.Subscribe(r.service, r.handleUpdates); err != nil {
r.logger.Fatal("subscribing to peer provider", tag.Error(err))
}

if _, err := r.refresh(); err != nil {
if err := r.refresh(); err != nil {
r.logger.Fatal("failed to start service resolver", tag.Error(err))
}

Expand All @@ -151,8 +151,8 @@ func (r *ring) Stop() {
r.value.Store(emptyHashring())

r.subscribers.Lock()
defer r.subscribers.Unlock()
r.subscribers.keys = make(map[string]chan<- *ChangedEvent)
r.subscribers.Unlock()
close(r.shutdownCh)

if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success {
Expand All @@ -166,12 +166,10 @@ func (r *ring) Lookup(
) (HostInfo, error) {
addr, found := r.ring().Lookup(key)
if !found {
select {
case r.refreshChan <- emptyEvent:
default:
}
r.signalSelf()
return HostInfo{}, ErrInsufficientHosts
}

r.members.RLock()
defer r.members.RUnlock()
host, ok := r.members.keys[addr]
Expand All @@ -195,12 +193,25 @@ func (r *ring) Subscribe(watcher string, notifyChannel chan<- *ChangedEvent) err
return nil
}

func (r *ring) notifySubscribers(msg *ChangedEvent) {
func (r *ring) handleUpdates(event ChangedEvent) {
r.signalSelf()
}

func (r *ring) signalSelf() {
var event struct{}
select {
case r.refreshChan <- event:
default: // channel already has an event, don't block
}
}

func (r *ring) notifySubscribers(msg ChangedEvent) {
r.subscribers.Lock()
defer r.subscribers.Unlock()

for name, ch := range r.subscribers.keys {
select {
case ch <- msg:
case ch <- &msg:
default:
r.logger.Error("subscriber notification failed", tag.Name(name))
}
Expand Down Expand Up @@ -240,59 +251,61 @@ func (r *ring) Members() []HostInfo {
return hosts
}

func (r *ring) refresh() (refreshed bool, err error) {
func (r *ring) refresh() error {
if r.members.refreshed.After(r.timeSource.Now().Add(-minRefreshInternal)) {
// refreshed too frequently
return false, nil
return nil
}

members, err := r.peerProvider.GetMembers(r.service)
if err != nil {
return false, fmt.Errorf("getting members from peer provider: %w", err)
return fmt.Errorf("getting members from peer provider: %w", err)
}

r.members.Lock()
defer r.members.Unlock()
newMembersMap, changed := r.compareMembers(members)
if !changed {
return false, nil
newMembersMap := r.makeMembersMap(members)

diff := r.diffMembers(newMembersMap)
if diff.Empty() {
return nil
}

ring := emptyHashring()
ring.AddMembers(castToMembers(members)...)

r.members.keys = newMembersMap
r.members.refreshed = r.timeSource.Now()
r.value.Store(ring)
r.logger.Info("refreshed ring members", tag.Value(members))

return true, nil
r.updateMembersMap(newMembersMap)

r.emitHashIdentifier()
r.notifySubscribers(diff)

return nil
}

func (r *ring) refreshAndNotifySubscribers(event *ChangedEvent) {
refreshed, err := r.refresh()
if err != nil {
r.logger.Error("refreshing ring", tag.Error(err))
}
if refreshed {
r.notifySubscribers(event)
}
func (r *ring) updateMembersMap(newMembers map[string]HostInfo) {
r.members.Lock()
defer r.members.Unlock()

r.members.keys = newMembers
r.members.refreshed = r.timeSource.Now()
}

func (r *ring) refreshRingWorker() {
defer r.shutdownWG.Done()

refreshTicker := r.timeSource.NewTicker(defaultRefreshInterval)
defer refreshTicker.Stop()

for {
select {
case <-r.shutdownCh:
return
case event := <-r.refreshChan: // local signal or signal from provider
r.refreshAndNotifySubscribers(event)
case <-refreshTicker.Chan(): // periodically refresh membership
r.emitHashIdentifier()
r.refreshAndNotifySubscribers(emptyEvent)
case <-r.refreshChan: // local signal or signal from provider
if err := r.refresh(); err != nil {
r.logger.Error("failed to refresh ring", tag.Error(err))
}
case <-refreshTicker.Chan(): // periodically force refreshing membership
r.signalSelf()
}
}
}
Expand All @@ -302,11 +315,7 @@ func (r *ring) ring() *hashring.HashRing {
}

func (r *ring) emitHashIdentifier() float64 {
members, err := r.peerProvider.GetMembers(r.service)
if err != nil {
r.logger.Error("Observed a problem getting peer members while emitting hash identifier metrics", tag.Error(err))
return -1
}
members := r.Members()
self, err := r.peerProvider.WhoAmI()
if err != nil {
r.logger.Error("Observed a problem looking up self from the membership provider while emitting hash identifier metrics", tag.Error(err))
Expand Down Expand Up @@ -343,22 +352,38 @@ func (r *ring) emitHashIdentifier() float64 {
return trimmedForMetric
}

func (r *ring) compareMembers(members []HostInfo) (map[string]HostInfo, bool) {
changed := false
newMembersMap := make(map[string]HostInfo, len(members))
for _, member := range members {
newMembersMap[member.GetAddress()] = member
if _, ok := r.members.keys[member.GetAddress()]; !ok {
changed = true
func (r *ring) makeMembersMap(members []HostInfo) map[string]HostInfo {
membersMap := make(map[string]HostInfo, len(members))
for _, m := range members {
membersMap[m.GetAddress()] = m
}
return membersMap
}

func (r *ring) diffMembers(newMembers map[string]HostInfo) ChangedEvent {
r.members.RLock()
defer r.members.RUnlock()

var combinedChange ChangedEvent

// find newly added hosts
for addr := range newMembers {
if _, found := r.members.keys[addr]; !found {
combinedChange.HostsAdded = append(combinedChange.HostsAdded, addr)
}
}
// find removed hosts
for addr := range r.members.keys {
if _, ok := newMembersMap[addr]; !ok {
changed = true
break
if _, found := newMembers[addr]; !found {
combinedChange.HostsRemoved = append(combinedChange.HostsRemoved, addr)
}
}
return newMembersMap, changed

// order since it will most probably used in logs
slices.Sort(combinedChange.HostsAdded)
slices.Sort(combinedChange.HostsUpdated)
slices.Sort(combinedChange.HostsRemoved)
return combinedChange
}

func castToMembers[T membership.Member](members []T) []membership.Member {
Expand Down
Loading

0 comments on commit 9807d5d

Please sign in to comment.