Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reset redistribution delay on peer membership change #1403

Merged
merged 8 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 0 additions & 103 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand/v2"
"os"
"runtime"
"sort"
Expand Down Expand Up @@ -1298,107 +1296,6 @@ func (i *InMemCollector) sendTraces() {
}
}

type redistributeNotifier struct {
clock clockwork.Clock
logger logger.Logger
initialDelay time.Duration
maxAttempts int
maxDelay time.Duration
metrics metrics.Metrics

reset chan struct{}
done chan struct{}
triggered chan struct{}
once sync.Once
}

func newRedistributeNotifier(logger logger.Logger, met metrics.Metrics, clock clockwork.Clock) *redistributeNotifier {
r := &redistributeNotifier{
initialDelay: 3 * time.Second,
maxDelay: 30 * time.Second,
maxAttempts: 5,
done: make(chan struct{}),
clock: clock,
logger: logger,
metrics: met,
triggered: make(chan struct{}),
reset: make(chan struct{}),
}

return r
}

func (r *redistributeNotifier) Notify() <-chan struct{} {
return r.triggered
}

func (r *redistributeNotifier) Reset() {
var started bool
r.once.Do(func() {
go r.run()
started = true
})

if started {
return
}

select {
case r.reset <- struct{}{}:
case <-r.done:
return
default:
r.logger.Debug().Logf("A trace redistribution is ongoing. Ignoring reset.")
}
}

func (r *redistributeNotifier) Stop() {
close(r.done)
}

func (r *redistributeNotifier) run() {
var attempts int
lastBackoff := r.initialDelay
for {
// if we've reached the max attempts, reset the backoff and attempts
// only when the reset signal is received.
if attempts >= r.maxAttempts {
r.metrics.Gauge("trace_redistribution_count", 0)
<-r.reset
lastBackoff = r.initialDelay
attempts = 0
}
select {
case <-r.done:
return
case r.triggered <- struct{}{}:
}

attempts++
r.metrics.Gauge("trace_redistribution_count", attempts)

// Calculate the backoff interval using exponential backoff with a base time.
backoff := time.Duration(math.Min(float64(lastBackoff)*2, float64(r.maxDelay)))
// Add jitter to the backoff to avoid retry collisions.
jitter := time.Duration(rand.Float64() * float64(backoff) * 0.5)
nextBackoff := backoff + jitter
lastBackoff = nextBackoff

timer := r.clock.NewTimer(nextBackoff)
select {
case <-timer.Chan():
timer.Stop()
case <-r.reset:
lastBackoff = r.initialDelay
attempts = 0
timer.Stop()
case <-r.done:
timer.Stop()
return
}
}
}

func (i *InMemCollector) signalKeptTraceDecisions(ctx context.Context, msg string) {
if len(msg) == 0 {
return
Expand Down
22 changes: 18 additions & 4 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission, pe
Metrics: s,
}
localPubSub.Start()
redistributeNotifier := newRedistributeNotifier(&logger.NullLogger{}, &metrics.NullMetrics{}, clock)
redistributeNotifier.initialDelay = 2 * time.Millisecond
redistributeNotifier.maxDelay = 10 * time.Millisecond

c := &InMemCollector{
Config: conf,
Expand Down Expand Up @@ -90,7 +93,7 @@ func newTestCollector(conf config.Config, transmission transmit.Transmission, pe
TraceIDs: peerTraceIDs,
},
},
redistributeTimer: newRedistributeNotifier(&logger.NullLogger{}, &metrics.NullMetrics{}, clock),
redistributeTimer: redistributeNotifier,
}

if !conf.GetCollectionConfig().EnableTraceLocality {
Expand Down Expand Up @@ -1748,9 +1751,20 @@ func TestRedistributeTraces(t *testing.T) {
}

coll.Sharder = s
coll.incoming = make(chan *types.Span, 5)
coll.fromPeer = make(chan *types.Span, 5)
coll.outgoingTraces = make(chan sendableTrace, 5)
coll.datasetSamplers = make(map[string]sample.Sampler)

c := cache.NewInMemCache(3, &metrics.NullMetrics{}, &logger.NullLogger{})
coll.cache = c
stc, err := newCache()
assert.NoError(t, err, "lru cache should start")
coll.sampleTraceCache = stc

go coll.collect()
go coll.sendTraces()

err := coll.Start()
assert.NoError(t, err)
defer coll.Stop()

dataset := "aoeu"
Expand Down Expand Up @@ -1803,7 +1817,7 @@ func TestRedistributeTraces(t *testing.T) {
coll.mutex.Lock()
coll.cache.Set(trace)
coll.mutex.Unlock()
coll.Peers.RegisterUpdatedPeersCallback(coll.redistributeTimer.Reset)
coll.redistributeTimer.Reset()

peerEvents := peerTransmission.GetBlock(1)
assert.Len(t, peerEvents, 1)
Expand Down
153 changes: 153 additions & 0 deletions collect/trace_redistributer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package collect

import (
"math"
"math/rand/v2"
"sync"
"time"

"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/jonboulle/clockwork"
)

type redistributeNotifier struct {
clock clockwork.Clock
logger logger.Logger
initialDelay time.Duration
maxAttempts int
maxDelay time.Duration
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
metrics metrics.Metrics

reset chan struct{}
done chan struct{}
triggered chan struct{}
once sync.Once
}

func newRedistributeNotifier(logger logger.Logger, met metrics.Metrics, clock clockwork.Clock) *redistributeNotifier {
r := &redistributeNotifier{
initialDelay: 3 * time.Second,
maxDelay: 30 * time.Second,
maxAttempts: 5,
done: make(chan struct{}),
clock: clock,
logger: logger,
metrics: met,
triggered: make(chan struct{}),
reset: make(chan struct{}),
}

return r
}

func (r *redistributeNotifier) Notify() <-chan struct{} {
return r.triggered
}

func (r *redistributeNotifier) Reset() {
var started bool
r.once.Do(func() {
go r.run()
started = true
})

if started {
return
}

select {
case r.reset <- struct{}{}:
case <-r.done:
return
default:
r.logger.Debug().Logf("A trace redistribution is ongoing. Ignoring reset.")
}
}

func (r *redistributeNotifier) Stop() {
close(r.done)
}

// run runs the redistribution notifier loop.
// It will notify the trigger channel when it's time to redistribute traces, which we want
// to happen when the number of peers changes. But we don't want to do it immediately,
// because peer membership changes often happen in bunches, so we wait a while
// before triggering the redistribution.
//
// The redistribution is run 3 times, with increasing durations, so that we properly redistribute
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
// anything that was arriving near the same time as the peer change.
// A notification will be sent every time the backoff timer expires.
// The backoff timer is reset when a reset signal is received.
func (r *redistributeNotifier) run() {
var attempts int
currentBackOff := r.initialDelay
lastBackoff := currentBackOff

// start a back off timer with the initial delay
timer := r.clock.NewTimer(currentBackOff)
for {

// only reset the timer if we have received
// a reset signal or we are in the middle of
// a redistribution cycle.
if currentBackOff != lastBackoff {
if !timer.Stop() {
// drain the timer channel
select {
case <-timer.Chan():
default:
}
}
timer.Reset(currentBackOff)
lastBackoff = currentBackOff
}

select {
case <-r.done:
timer.Stop()
return
case <-r.reset:
// reset the backoff timer and attempts
// if we receive a reset signal.
currentBackOff = r.initialDelay
attempts = 0
case <-timer.Chan():
if attempts >= r.maxAttempts {
// if we've reached the max attempts,
// we will block the goroutine here until
// we receive a reset signal or refinery starts to shutdown.
r.metrics.Gauge("trace_redistribution_count", 0)
select {
case <-r.done:
return
case <-r.reset:
}
currentBackOff = r.initialDelay
attempts = 0
currentBackOff = r.calculateBackoff(currentBackOff)
continue
}

select {
case <-r.done:
return
case r.triggered <- struct{}{}:
}

attempts++
r.metrics.Gauge("trace_redistribution_count", attempts)
currentBackOff = r.calculateBackoff(currentBackOff)
}
}
}

// calculateBackoff calculates the backoff interval for the next redistribution cycle.
// It uses exponential backoff with a base time and adds jitter to avoid retry collisions.
func (r *redistributeNotifier) calculateBackoff(lastBackoff time.Duration) time.Duration {
// Calculate the backoff interval using exponential backoff with a base time.
backoff := time.Duration(math.Min(float64(lastBackoff)*2, float64(r.maxDelay)))
// Add jitter to the backoff to avoid retry collisions.
jitter := time.Duration(rand.Float64() * float64(backoff) * 0.5)
return backoff + jitter
}
68 changes: 68 additions & 0 deletions collect/trace_redistributer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package collect

import (
"testing"
"time"

"github.com/honeycombio/refinery/metrics"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
)

// TestRedistributeNotifier tests the timer logic in redistributeNotifier
func TestRedistributeNotifier(t *testing.T) {
// Set up the notifier with a mock clock

clock := clockwork.NewFakeClock()
r := &redistributeNotifier{
clock: clock,
initialDelay: 50 * time.Millisecond, // Set the initial delay
maxAttempts: 3,
metrics: &metrics.NullMetrics{},
maxDelay: 200 * time.Millisecond,
reset: make(chan struct{}),
done: make(chan struct{}),
triggered: make(chan struct{}, 4), // Buffered to allow easier testing
}

defer r.Stop()

go r.run()

// Test that the notifier is not triggered before the initial delay
clock.BlockUntil(1)
clock.Advance(20 * time.Millisecond)

assert.Eventually(t, func() bool {
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
return len(r.triggered) == 0
}, 200*time.Millisecond, 10*time.Millisecond)

// Test that the notifier is triggered after the initial delay
currentBackOff := r.initialDelay
for i := 0; i < r.maxAttempts; i++ {
clock.BlockUntil(1)
currentBackOff = r.calculateBackoff(currentBackOff)
clock.Advance(currentBackOff + 100*time.Millisecond) // Advance the clock by the backoff time plus a little extra

// Check that the notifier has been triggered
assert.Eventually(t, func() bool {
return len(r.triggered) == i+1
}, 200*time.Millisecond, 10*time.Millisecond, "Expected to be triggered %d times", i+1)
}

// Make sure once we hit maxAttempts, we stop trying to trigger
clock.BlockUntil(1)
clock.Advance(500 * time.Millisecond)
assert.Eventually(t, func() bool {
VinozzZ marked this conversation as resolved.
Show resolved Hide resolved
return len(r.triggered) == r.maxAttempts
}, 200*time.Millisecond, 10*time.Millisecond, "Expected to be triggered 3 times")

// Once we receive another reset signal, the timer should start again
r.Reset()
clock.BlockUntil(1)
clock.Advance(500 * time.Millisecond)
assert.Eventually(t, func() bool {
return len(r.triggered) == r.maxAttempts+1
}, 200*time.Millisecond, 10*time.Millisecond, "Expected to be triggered 4 times")

}
Loading