Skip to content

Commit

Permalink
Revert "Refactor PeerProvider & hashring interaction (cadence-workflo…
Browse files Browse the repository at this point in the history
…w#6296)"

This reverts commit 9807d5d.
  • Loading branch information
Shaddoll committed Oct 15, 2024
1 parent a32159c commit 31a46d5
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 336 deletions.
133 changes: 54 additions & 79 deletions common/membership/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package membership

import (
"fmt"
"slices"
"sort"
"strings"
"sync"
Expand All @@ -45,6 +44,7 @@ import (

// ErrInsufficientHosts is thrown when there are not enough hosts to serve the request
var ErrInsufficientHosts = &types.InternalServiceError{Message: "Not enough hosts to serve the request"}
var emptyEvent = &ChangedEvent{}

const (
minRefreshInternal = time.Second
Expand All @@ -59,14 +59,14 @@ type PeerProvider interface {
GetMembers(service string) ([]HostInfo, error)
WhoAmI() (HostInfo, error)
SelfEvict() error
Subscribe(name string, handler func(ChangedEvent)) error
Subscribe(name string, notifyChannel chan<- *ChangedEvent) error
}

type ring struct {
status int32
service string
peerProvider PeerProvider
refreshChan chan struct{}
refreshChan chan *ChangedEvent
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
timeSource clock.TimeSource
Expand Down Expand Up @@ -99,7 +99,7 @@ func newHashring(
service: service,
peerProvider: provider,
shutdownCh: make(chan struct{}),
refreshChan: make(chan struct{}, 1),
refreshChan: make(chan *ChangedEvent),
timeSource: timeSource,
logger: logger,
scope: scope,
Expand All @@ -125,11 +125,11 @@ func (r *ring) Start() {
) {
return
}
if err := r.peerProvider.Subscribe(r.service, r.handleUpdates); err != nil {
if err := r.peerProvider.Subscribe(r.service, r.refreshChan); err != nil {
r.logger.Fatal("subscribing to peer provider", tag.Error(err))
}

if err := r.refresh(); err != nil {
if _, err := r.refresh(); err != nil {
r.logger.Fatal("failed to start service resolver", tag.Error(err))
}

Expand All @@ -151,8 +151,8 @@ func (r *ring) Stop() {
r.value.Store(emptyHashring())

r.subscribers.Lock()
defer r.subscribers.Unlock()
r.subscribers.keys = make(map[string]chan<- *ChangedEvent)
r.subscribers.Unlock()
close(r.shutdownCh)

if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success {
Expand All @@ -166,10 +166,12 @@ func (r *ring) Lookup(
) (HostInfo, error) {
addr, found := r.ring().Lookup(key)
if !found {
r.signalSelf()
select {
case r.refreshChan <- emptyEvent:
default:
}
return HostInfo{}, ErrInsufficientHosts
}

r.members.RLock()
defer r.members.RUnlock()
host, ok := r.members.keys[addr]
Expand All @@ -193,25 +195,12 @@ func (r *ring) Subscribe(watcher string, notifyChannel chan<- *ChangedEvent) err
return nil
}

func (r *ring) handleUpdates(event ChangedEvent) {
r.signalSelf()
}

func (r *ring) signalSelf() {
var event struct{}
select {
case r.refreshChan <- event:
default: // channel already has an event, don't block
}
}

func (r *ring) notifySubscribers(msg ChangedEvent) {
func (r *ring) notifySubscribers(msg *ChangedEvent) {
r.subscribers.Lock()
defer r.subscribers.Unlock()

for name, ch := range r.subscribers.keys {
select {
case ch <- &msg:
case ch <- msg:
default:
r.logger.Error("subscriber notification failed", tag.Name(name))
}
Expand Down Expand Up @@ -251,61 +240,59 @@ func (r *ring) Members() []HostInfo {
return hosts
}

func (r *ring) refresh() error {
func (r *ring) refresh() (refreshed bool, err error) {
if r.members.refreshed.After(r.timeSource.Now().Add(-minRefreshInternal)) {
// refreshed too frequently
return nil
return false, nil
}

members, err := r.peerProvider.GetMembers(r.service)
if err != nil {
return fmt.Errorf("getting members from peer provider: %w", err)
return false, fmt.Errorf("getting members from peer provider: %w", err)
}

newMembersMap := r.makeMembersMap(members)

diff := r.diffMembers(newMembersMap)
if diff.Empty() {
return nil
r.members.Lock()
defer r.members.Unlock()
newMembersMap, changed := r.compareMembers(members)
if !changed {
return false, nil
}

ring := emptyHashring()
ring.AddMembers(castToMembers(members)...)

r.members.keys = newMembersMap
r.members.refreshed = r.timeSource.Now()
r.value.Store(ring)
r.logger.Info("refreshed ring members", tag.Value(members))

r.updateMembersMap(newMembersMap)

r.emitHashIdentifier()
r.notifySubscribers(diff)

return nil
return true, nil
}

func (r *ring) updateMembersMap(newMembers map[string]HostInfo) {
r.members.Lock()
defer r.members.Unlock()

r.members.keys = newMembers
r.members.refreshed = r.timeSource.Now()
func (r *ring) refreshAndNotifySubscribers(event *ChangedEvent) {
refreshed, err := r.refresh()
if err != nil {
r.logger.Error("refreshing ring", tag.Error(err))
}
if refreshed {
r.notifySubscribers(event)
}
}

func (r *ring) refreshRingWorker() {
defer r.shutdownWG.Done()

refreshTicker := r.timeSource.NewTicker(defaultRefreshInterval)
defer refreshTicker.Stop()

for {
select {
case <-r.shutdownCh:
return
case <-r.refreshChan: // local signal or signal from provider
if err := r.refresh(); err != nil {
r.logger.Error("failed to refresh ring", tag.Error(err))
}
case <-refreshTicker.Chan(): // periodically force refreshing membership
r.signalSelf()
case event := <-r.refreshChan: // local signal or signal from provider
r.refreshAndNotifySubscribers(event)
case <-refreshTicker.Chan(): // periodically refresh membership
r.emitHashIdentifier()
r.refreshAndNotifySubscribers(emptyEvent)
}
}
}
Expand All @@ -315,7 +302,11 @@ func (r *ring) ring() *hashring.HashRing {
}

func (r *ring) emitHashIdentifier() float64 {
members := r.Members()
members, err := r.peerProvider.GetMembers(r.service)
if err != nil {
r.logger.Error("Observed a problem getting peer members while emitting hash identifier metrics", tag.Error(err))
return -1
}
self, err := r.peerProvider.WhoAmI()
if err != nil {
r.logger.Error("Observed a problem looking up self from the membership provider while emitting hash identifier metrics", tag.Error(err))
Expand Down Expand Up @@ -352,38 +343,22 @@ func (r *ring) emitHashIdentifier() float64 {
return trimmedForMetric
}

func (r *ring) makeMembersMap(members []HostInfo) map[string]HostInfo {
membersMap := make(map[string]HostInfo, len(members))
for _, m := range members {
membersMap[m.GetAddress()] = m
}
return membersMap
}

func (r *ring) diffMembers(newMembers map[string]HostInfo) ChangedEvent {
r.members.RLock()
defer r.members.RUnlock()

var combinedChange ChangedEvent

// find newly added hosts
for addr := range newMembers {
if _, found := r.members.keys[addr]; !found {
combinedChange.HostsAdded = append(combinedChange.HostsAdded, addr)
func (r *ring) compareMembers(members []HostInfo) (map[string]HostInfo, bool) {
changed := false
newMembersMap := make(map[string]HostInfo, len(members))
for _, member := range members {
newMembersMap[member.GetAddress()] = member
if _, ok := r.members.keys[member.GetAddress()]; !ok {
changed = true
}
}
// find removed hosts
for addr := range r.members.keys {
if _, found := newMembers[addr]; !found {
combinedChange.HostsRemoved = append(combinedChange.HostsRemoved, addr)
if _, ok := newMembersMap[addr]; !ok {
changed = true
break
}
}

// order since it will most probably used in logs
slices.Sort(combinedChange.HostsAdded)
slices.Sort(combinedChange.HostsUpdated)
slices.Sort(combinedChange.HostsRemoved)
return combinedChange
return newMembersMap, changed
}

func castToMembers[T membership.Member](members []T) []membership.Member {
Expand Down
Loading

0 comments on commit 31a46d5

Please sign in to comment.