Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v14] reduce flake during backend interruptions & slow auth cache init #44696

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions lib/backend/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import (
)

type bufferConfig struct {
gracePeriod time.Duration
capacity int
clock clockwork.Clock
gracePeriod time.Duration
creationGracePeriod time.Duration
capacity int
clock clockwork.Clock
}

type BufferOption func(*bufferConfig)
Expand All @@ -59,6 +60,16 @@ func BacklogGracePeriod(d time.Duration) BufferOption {
}
}

// CreationGracePeriod sets the amount of time delay after watcher creation before
// it will be considered for removal due to backlog.
func CreationGracePeriod(d time.Duration) BufferOption {
return func(cfg *bufferConfig) {
if d > 0 {
cfg.creationGracePeriod = d
}
}
}

// BufferClock sets a custom clock for the buffer (used in tests).
func BufferClock(c clockwork.Clock) BufferOption {
return func(cfg *bufferConfig) {
Expand All @@ -81,9 +92,10 @@ type CircularBuffer struct {
// NewCircularBuffer returns a new uninitialized instance of circular buffer.
func NewCircularBuffer(opts ...BufferOption) *CircularBuffer {
cfg := bufferConfig{
gracePeriod: DefaultBacklogGracePeriod,
capacity: DefaultBufferCapacity,
clock: clockwork.NewRealClock(),
gracePeriod: DefaultBacklogGracePeriod,
creationGracePeriod: DefaultCreationGracePeriod,
capacity: DefaultBufferCapacity,
clock: clockwork.NewRealClock(),
}
for _, opt := range opts {
opt(&cfg)
Expand Down Expand Up @@ -255,6 +267,7 @@ func (c *CircularBuffer) NewWatcher(ctx context.Context, watch Watch) (Watcher,
buffer: c,
Watch: watch,
eventsC: make(chan Event, watch.QueueSize),
created: c.cfg.clock.Now(),
ctx: closeCtx,
cancel: cancel,
capacity: watch.QueueSize,
Expand Down Expand Up @@ -294,6 +307,7 @@ type BufferWatcher struct {
bmu sync.Mutex
backlog []Event
backlogSince time.Time
created time.Time

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -349,7 +363,7 @@ func (w *BufferWatcher) emit(e Event) (ok bool) {
defer w.bmu.Unlock()

if !w.flushBacklog() {
if w.buffer.cfg.clock.Now().After(w.backlogSince.Add(w.buffer.cfg.gracePeriod)) {
if now := w.buffer.cfg.clock.Now(); now.After(w.backlogSince.Add(w.buffer.cfg.gracePeriod)) && now.After(w.created.Add(w.buffer.cfg.creationGracePeriod)) {
// backlog has existed for longer than grace period,
// this watcher needs to be removed.
return false
Expand Down
66 changes: 66 additions & 0 deletions lib/backend/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestWatcherCapacity(t *testing.T) {
BufferCapacity(1),
BufferClock(clock),
BacklogGracePeriod(gracePeriod),
CreationGracePeriod(time.Nanosecond),
)
defer b.Close()
b.SetInit()
Expand Down Expand Up @@ -142,6 +143,71 @@ func TestWatcherCapacity(t *testing.T) {
}
}

func TestWatcherCreationGracePeriod(t *testing.T) {
const backlogGracePeriod = time.Second
const creationGracePeriod = backlogGracePeriod * 3
const queueSize = 1
clock := clockwork.NewFakeClock()

ctx := context.Background()
b := NewCircularBuffer(
BufferCapacity(1),
BufferClock(clock),
BacklogGracePeriod(backlogGracePeriod),
CreationGracePeriod(creationGracePeriod),
)
defer b.Close()
b.SetInit()

w, err := b.NewWatcher(ctx, Watch{
QueueSize: queueSize,
})
require.NoError(t, err)
defer w.Close()

select {
case e := <-w.Events():
require.Equal(t, types.OpInit, e.Type)
default:
t.Fatalf("Expected immediate OpInit.")
}

// emit enough events to create a backlog
for i := 0; i < queueSize*2; i++ {
b.Emit(Event{Item: Item{Key: []byte{Separator}}})
}

select {
case <-w.Done():
t.Fatal("watcher closed unexpectedly")
default:
}

// sanity-check
require.Greater(t, creationGracePeriod, backlogGracePeriod*2)

// advance well past the backlog grace period, but not past the creation grace period
clock.Advance(backlogGracePeriod * 2)

b.Emit(Event{Item: Item{Key: []byte{Separator}}})

select {
case <-w.Done():
t.Fatal("watcher closed unexpectedly")
default:
}

// advance well past creation grace period
clock.Advance(creationGracePeriod)

b.Emit(Event{Item: Item{Key: []byte{Separator}}})
select {
case <-w.Done():
default:
t.Fatal("watcher did not close after creation grace period exceeded")
}
}

// TestWatcherClose makes sure that closed watcher
// will be removed
func TestWatcherClose(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions lib/backend/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ const (
// (e.g. heartbeats) are be created. If a watcher can't catch up in under a minute,
// it probably won't catch up.
DefaultBacklogGracePeriod = time.Second * 59
// DefaultCreationGracePeriod is the default amount of time time that the circular buffer
// will wait before enforcing the backlog grace period. This is intended to give downstream
// caches time to initialize before they start receiving events. Without this, large caches
// may be unable to successfully initialize even if they would otherwise be able to keep up
// with the event stream once established.
DefaultCreationGracePeriod = DefaultBacklogGracePeriod * 3
// DefaultPollStreamPeriod is a default event poll stream period
DefaultPollStreamPeriod = time.Second
// DefaultEventsTTL is a default events TTL period
Expand Down
19 changes: 17 additions & 2 deletions lib/backend/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/time/rate"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -82,6 +83,8 @@ type Reporter struct {
// This will keep an upper limit on our memory usage while still always
// reporting the most active keys.
topRequestsCache *lru.Cache[topRequestsCacheKey, struct{}]

slowRangeLogLimiter *rate.Limiter
}

// NewReporter returns a new Reporter.
Expand All @@ -103,8 +106,9 @@ func NewReporter(cfg ReporterConfig) (*Reporter, error) {
return nil, trace.Wrap(err)
}
r := &Reporter{
ReporterConfig: cfg,
topRequestsCache: cache,
ReporterConfig: cfg,
topRequestsCache: cache,
slowRangeLogLimiter: rate.NewLimiter(rate.Every(time.Minute), 12),
}
return r, nil
}
Expand Down Expand Up @@ -134,6 +138,17 @@ func (s *Reporter) GetRange(ctx context.Context, startKey []byte, endKey []byte,
batchReadRequestsFailed.WithLabelValues(s.Component).Inc()
}
s.trackRequest(types.OpGet, startKey, endKey)
end := s.Clock().Now()
if d := end.Sub(start); d > time.Second*3 {
if s.slowRangeLogLimiter.AllowN(end, 1) {
log.WithFields(log.Fields{
"start_key": string(startKey),
"end_key": string(endKey),
"limit": limit,
"duration": d.String(),
}).Warn("slow GetRange request")
}
}
return res, err
}

Expand Down
23 changes: 22 additions & 1 deletion lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,8 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
return trace.ConnectionProblem(nil, "timeout waiting for watcher init")
}

fetchAndApplyStart := time.Now()

confirmedKindsMap := make(map[resourceKind]types.WatchKind, len(confirmedKinds))
for _, kind := range confirmedKinds {
confirmedKindsMap[resourceKind{kind: kind.Kind, subkind: kind.SubKind}] = kind
Expand Down Expand Up @@ -1310,6 +1312,19 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer

c.notify(c.ctx, Event{Type: WatcherStarted})

fetchAndApplyDuration := time.Since(fetchAndApplyStart)
if fetchAndApplyDuration > time.Second*20 {
c.Logger.WithFields(log.Fields{
"cache_target": c.Config.target,
"duration": fetchAndApplyDuration.String(),
}).Warn("slow fetch and apply")
} else {
c.Logger.WithFields(log.Fields{
"cache_target": c.Config.target,
"duration": fetchAndApplyDuration.String(),
}).Debug("fetch and apply")
}

var lastStalenessWarning time.Time
var staleEventCount int
for {
Expand Down Expand Up @@ -1394,7 +1409,13 @@ func (c *Cache) fetchAndWatch(ctx context.Context, retry retryutils.Retry, timer
// cannot run concurrently with event processing.
func (c *Cache) performRelativeNodeExpiry(ctx context.Context) error {
// TODO(fspmarshall): Start using dynamic value once it is implemented.
gracePeriod := apidefaults.ServerAnnounceTTL

// because event streams are not necessarily ordered across keys expiring on the
// server announce TTL may sometimes generate false positives. Using the watcher
// creation grace period as our safety buffer is mostly an arbitrary choice, but
// since it approximates our expected worst-case staleness of the event stream its
// a fairly reasonable one.
gracePeriod := apidefaults.ServerAnnounceTTL + backend.DefaultCreationGracePeriod

// latestExp will be the value that we choose to consider the most recent "expired"
// timestamp. This will either end up being the most recently seen node expiry, or
Expand Down
2 changes: 1 addition & 1 deletion lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2618,7 +2618,7 @@ func TestRelativeExpiryLimit(t *testing.T) {
require.Len(t, nodes, nodeCount)

clock.Advance(time.Hour * 24)
for expired := nodeCount - expiryLimit; expired > 0; expired -= expiryLimit {
for expired := nodeCount - expiryLimit; expired > expiryLimit; expired -= expiryLimit {
// get rid of events that were emitted before clock advanced
drainEvents(p.eventsC)
// wait for next relative expiry check to run
Expand Down
Loading