Skip to content

Commit

Permalink
Report context cancel cause in DoBatchWithOptions instead of ctx.Err().
Browse files Browse the repository at this point in the history
Closes grafana#576

Signed-off-by: Daniel Strobusch <1847260+dastrobu@users.noreply.github.com>
  • Loading branch information
dastrobu committed Aug 26, 2024
1 parent 5353c68 commit 129206f
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Changelog

* [CHANGE] `ring.DoBatchWithOptions` (and `ring.DoBatch`) reports the cancellation cause when the context is cancelled instead of `context.Cancelled`.
* [CHANGE] Multierror: Implement `Unwrap() []error`. This allows to use `multierror.MultiError` with both `errors.Is()` and `errors.As()`. Previously implemented `Is(error) bool` has been removed. #522
* [CHANGE] Add a new `grpc_server_recv_buffer_pools_enabled` option that enables recv buffer pools in the gRPC server (assuming `grpc_server_stats_tracking_enabled` is disabled). #510
* [CHANGE] Add a new `grpc_server_stats_tracking_enabled` option that allows us to disable stats tracking and potentially improve server memory reuse. #507
Expand Down
6 changes: 3 additions & 3 deletions ring/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys [
// Get call below takes ~1 microsecond for ~500 instances.
// Checking every 10K calls would be every 10ms.
if i%10e3 == 0 {
if err := ctx.Err(); err != nil {
if err := context.Cause(ctx); err != nil {
o.Cleanup()
return err
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys [
}

// One last check before calling the callbacks: it doesn't make sense if context is canceled.
if err := ctx.Err(); err != nil {
if err := context.Cause(ctx); err != nil {
o.Cleanup()
return err
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func DoBatchWithOptions(ctx context.Context, op Operation, r DoBatchRing, keys [
case <-tracker.done:
return nil
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
}
}

Expand Down
144 changes: 114 additions & 30 deletions ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ring
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -212,41 +213,124 @@ func TestDoBatchWithOptionsContextCancellation(t *testing.T) {
numInstances = 100
numZones = 3
)
keys := make([]uint32, numKeys)
generateKeys(rand.New(rand.NewSource(0)), numKeys, keys)

callback := func(InstanceDesc, []int) error { return nil }
desc := &Desc{Ingesters: generateRingInstances(NewRandomTokenGeneratorWithSeed(0), numInstances, numZones, numTokens)}
r := newRingForTesting(Config{
HeartbeatTimeout: time.Hour,
ZoneAwarenessEnabled: true,
SubringCacheDisabled: true,
ReplicationFactor: numZones,
}, true)
r.setRingStateFromDesc(desc, false, false, false)
// Measure how long does it take for a call to succeed.
t0 := time.Now()
err := DoBatchWithOptions(context.Background(), Write, r, keys, callback, DoBatchOptions{})
duration := time.Since(t0)
require.NoError(t, err)
t.Logf("Call took %s", duration)
cancelCause := errors.New("cancel cause")

// Make a second call cancelling after a hundredth of duration of the first one.
// For a 4s first call, this is 40ms: should be enough for this test to not be flaky.
ctx, cancel := context.WithTimeout(context.Background(), duration/100)
defer cancel()
measureDuration := func(r *Ring, keys []uint32) time.Duration {
callback := func(InstanceDesc, []int) error { return nil }
t0 := time.Now()
err := DoBatchWithOptions(context.Background(), Write, r, keys, callback, DoBatchOptions{})
duration := time.Since(t0)
require.NoError(t, err)
t.Logf("Call took %s", duration)
return duration
}

wg := sync.WaitGroup{}
wg.Add(1)
err = DoBatchWithOptions(ctx, Write, r, keys, func(_ InstanceDesc, _ []int) error {
type callbackFunc = func(InstanceDesc, []int) error
never := func(_ InstanceDesc, _ []int) error {
t.Errorf("should not be called.")
return nil
}, DoBatchOptions{Cleanup: wg.Done})
require.Error(t, err)
require.ErrorIs(t, err, context.DeadlineExceeded)
}
tests := []struct {
name string
setup func(*Ring, []uint32) (context.Context, callbackFunc)
expectedErr error
}{
{
name: "context deadline exceeded",
setup: func(r *Ring, keys []uint32) (context.Context, callbackFunc) {
duration := measureDuration(r, keys)

// Make a second call cancelling after a hundredth of duration of the first one.
// For a 4s first call, this is 40ms: should be enough for this test to not be flaky.
ctx, _ := context.WithTimeout(context.Background(), duration/100)
return ctx, never
},
expectedErr: context.DeadlineExceeded,
},
{
name: "context deadline exceeded with cause",
setup: func(r *Ring, keys []uint32) (context.Context, callbackFunc) {
duration := measureDuration(r, keys)

// Make a second call cancelling after a hundredth of duration of the first one.
// For a 4s first call, this is 40ms: should be enough for this test to not be flaky.
ctx, _ := context.WithTimeoutCause(context.Background(), duration/100, cancelCause)
return ctx, never
},
expectedErr: cancelCause,
},
{
name: "context initially cancelled without cause",
setup: func(r *Ring, keys []uint32) (context.Context, callbackFunc) {
ctx, cancelFunc := context.WithCancel(context.Background())
// start batch with cancelled context
cancelFunc()
return ctx, never
},
expectedErr: context.Canceled,
},
{
name: "context initially cancelled with cause",
setup: func(r *Ring, keys []uint32) (context.Context, callbackFunc) {
ctx, cancelFunc := context.WithCancelCause(context.Background())
// start batch with cancelled context
cancelFunc(cancelCause)
return ctx, never
},
expectedErr: cancelCause,
},
{
name: "context cancelled during batch processing",
setup: func(r *Ring, keys []uint32) (context.Context, callbackFunc) {
ctx, cancel := context.WithCancelCause(context.Background())

wg := sync.WaitGroup{}
wg.Add(numInstances)

callback := func(_ InstanceDesc, _ []int) error {
wg.Done()
// let the call to the instance hang until context is cancelled
time.Sleep(5 * time.Minute)
t.Errorf("Context should have been cancelled before.")
return nil
}
go func() {
// wait until all instances hang, then cancel the context
wg.Wait()
cancel(cancelCause)
}()
return ctx, callback
},
expectedErr: cancelCause,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
keys := make([]uint32, numKeys)
generateKeys(rand.New(rand.NewSource(0)), numKeys, keys)

// Wait until cleanup to make sure that callback was never called.
wg.Wait()
desc := &Desc{Ingesters: generateRingInstances(NewRandomTokenGeneratorWithSeed(0), numInstances, numZones, numTokens)}
r := newRingForTesting(Config{
HeartbeatTimeout: time.Hour,
ZoneAwarenessEnabled: true,
SubringCacheDisabled: true,
ReplicationFactor: numZones,
}, true)
r.setRingStateFromDesc(desc, false, false, false)

ctx, callback := tt.setup(r, keys)

wg := sync.WaitGroup{}
wg.Add(1)
err := DoBatchWithOptions(ctx, Write, r, keys, callback, DoBatchOptions{Cleanup: wg.Done})
require.Error(t, err)
require.ErrorIs(t, err, tt.expectedErr)

// Wait until cleanup to make sure that callback was never called.
wg.Wait()
})
}
}

func TestDoBatch_QuorumError(t *testing.T) {
Expand Down

0 comments on commit 129206f

Please sign in to comment.