Skip to content

Commit

Permalink
maint: Add jitter to peer traffic, fix startup (#1227)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- Add jitter to peer traffic so we don't get thundering herds
- Inject the right things for proper startup
  • Loading branch information
kentquirk authored Jul 17, 2024
1 parent 8f61ee8 commit 876d8c5
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
10 changes: 7 additions & 3 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/facebookgo/startstop"
libhoney "github.com/honeycombio/libhoney-go"
"github.com/honeycombio/libhoney-go/transmission"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"

"github.com/honeycombio/refinery/app"
Expand Down Expand Up @@ -121,8 +122,9 @@ func main() {
}
switch ptype {
case "file":
peers = &peer.FilePeers{Cfg: c}
// we know FilePeers doesn't need to be Started, so we can ask it how many peers we have.
// we want to use the file peers object to ask if we have only one peer, so we need to instantiate it
// with a dummy metrics object. we'll replace it with the real one later.
peers = &peer.FilePeers{Cfg: c, Metrics: &metrics.NullMetrics{}}
// if we only have one, we can use the local pubsub implementation.
peerList, err := peers.GetPeers()
if err != nil {
Expand All @@ -133,6 +135,8 @@ func main() {
} else {
pubsubber = &pubsub.GoRedisPubSub{}
}
// now erase the peers Metrics object so that it will get injected with the right one later
peers.(*peer.FilePeers).Metrics = nil
case "redis":
pubsubber = &pubsub.GoRedisPubSub{}
peers = &peer.RedisPubsubPeers{}
Expand Down Expand Up @@ -252,7 +256,7 @@ func main() {
{Value: promMetrics, Name: "promMetrics"},
{Value: oTelMetrics, Name: "otelMetrics"},
{Value: tracer, Name: "tracer"},

{Value: clockwork.NewRealClock()},
{Value: metricsSingleton, Name: "metrics"},
{Value: genericMetricsRecorder, Name: "genericMetrics"},
{Value: upstreamMetricsRecorder, Name: "upstreamMetrics"},
Expand Down
23 changes: 12 additions & 11 deletions internal/peer/pubsub_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"os"
"strings"
Expand All @@ -20,18 +21,14 @@ import (

const (
// refreshCacheInterval is how frequently this host will re-register itself
// with Redis. This should happen about 3x during each timeout phase in order
// to allow multiple timeouts to fail and yet still keep the host in the mix.
// Falling out of Redis will result in re-hashing the host-trace affinity and
// will cause broken traces for those that fall on both sides of the rehashing.
// This is why it's important to ensure hosts stay in the pool.
// by publishing their address. This should happen about 3x during each
// timeout phase in order to allow multiple timeouts to fail and yet still
// keep the host in the mix.
refreshCacheInterval = 3 * time.Second

// peerEntryTimeout is how long redis will wait before expiring a peer that
// doesn't check in. The ratio of refresh to peer timeout should be 1/3. Redis
// timeouts are in seconds and entries can last up to 2 seconds longer than
// their expected timeout (in my load testing), so the lower bound for this
// timer should be ... 5sec?
// peerEntryTimeout is how long we will wait before expiring a peer that
// doesn't check in. The ratio of refresh to peer timeout should be about
// 1/3; we overshoot because we add a jitter to the refresh interval.
peerEntryTimeout = 10 * time.Second
)

Expand Down Expand Up @@ -135,7 +132,11 @@ func (p *RedisPubsubPeers) Start() error {

// periodically refresh our presence in the list of peers, and update peers as they come in
go func() {
ticker := p.Clock.NewTicker(refreshCacheInterval)
// we want our refresh cache interval to vary from peer to peer so they
// don't always hit redis at the same time, so we add a random jitter of up
// to 20% of the interval
interval := refreshCacheInterval + time.Duration(rand.Int63n(int64(refreshCacheInterval/5)))
ticker := p.Clock.NewTicker(interval)
for {
select {
case <-p.done:
Expand Down

0 comments on commit 876d8c5

Please sign in to comment.