Skip to content

Commit

Permalink
kv: detect context cancellation in limitBulkIOWrite, avoid log spam
Browse files Browse the repository at this point in the history
This commit adds logic to propagate context cancellation in `limitBulkIOWrite`.
This function is used in two places, 1) when ingesting ssts, and 2) when
receiving a snapshot. The first case uses the Raft scheduler goroutine's
context, so it never gets cancelled. The second case uses the context of the
sender of a Raft snapshot, so it can get cancelled.

In customer clusters, we were seeing Raft snapshots hit their deadline and begin
spamming `error rate limiting bulk io write: context deadline exceeded` errors
messages. This was bad for two reasons. First, it was very noisy. Second, it
meant that a Raft snapshot that was no longer going to succeed was still writing
out full SSTs while holding on to the `snapshotApplySem`. This contributed to
the snapshot starvation we saw in the issue.

With this commit, `limitBulkIOWrite` will eagerly detect context cancellation
and will propagate the cancellation up to the caller, allowing the caller to
quickly release resources.

Release notes (bug fix): Raft snapshots now detect timeouts earlier and avoid
spamming the logs with `context deadline exceeded` errors.
  • Loading branch information
nvanbenschoten committed Nov 29, 2021
1 parent 2c014c4 commit a3fd4fb
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 11 deletions.
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/replica_sst_snapshot_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ func (f *SSTSnapshotStorageFile) Write(contents []byte) (int, error) {
if err := f.ensureFile(); err != nil {
return 0, err
}
limitBulkIOWrite(f.ctx, f.scratch.storage.limiter, len(contents))
if err := limitBulkIOWrite(f.ctx, f.scratch.storage.limiter, len(contents)); err != nil {
return 0, err
}
return f.file.Write(contents)
}

Expand Down
34 changes: 34 additions & 0 deletions pkg/kv/kvserver/replica_sst_snapshot_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,40 @@ func TestSSTSnapshotStorage(t *testing.T) {
}
}

// TestSSTSnapshotStorageContextCancellation verifies that writing to an
// SSTSnapshotStorage is reactive to context cancellation.
func TestSSTSnapshotStorageContextCancellation(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testRangeID := roachpb.RangeID(1)
testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890")))
testLimiter := rate.NewLimiter(rate.Inf, 0)

cleanup, eng := newOnDiskEngine(t)
defer cleanup()
defer eng.Close()

sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter)
scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID)

ctx, cancel := context.WithCancel(context.Background())
f, err := scratch.NewFile(ctx, 0)
require.NoError(t, err)
defer func() {
require.NoError(t, f.Close())
}()

// Before context cancellation.
_, err = f.Write([]byte("foo"))
require.NoError(t, err)

// After context cancellation.
cancel()
_, err = f.Write([]byte("bar"))
require.ErrorIs(t, err, context.Canceled)
}

// TestMultiSSTWriterInitSST tests that multiSSTWriter initializes each of the
// SST files associated with the replicated key ranges by writing a range
// deletion tombstone that spans the entire range of each respectively.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
return noSnap, errors.Wrap(err, "failed to decode mvcc key")
}
if err := msstw.Put(ctx, key, batchReader.Value()); err != nil {
return noSnap, err
return noSnap, errors.Wrapf(err, "writing sst for raft snapshot")
}
}
}
Expand All @@ -276,7 +276,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
// we must still construct SSTs with range deletion tombstones to remove
// the data.
if err := msstw.Finish(ctx); err != nil {
return noSnap, err
return noSnap, errors.Wrapf(err, "finishing sst for raft snapshot")
}

msstw.Close()
Expand Down
23 changes: 15 additions & 8 deletions pkg/kv/kvserver/syncing_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"golang.org/x/time/rate"
)

Expand All @@ -32,7 +33,10 @@ const bulkIOWriteBurst = 512 << 10 // 512 KB

const bulkIOWriteLimiterLongWait = 500 * time.Millisecond

func limitBulkIOWrite(ctx context.Context, limiter *rate.Limiter, cost int) {
// limitBulkIOWrite blocks until the provided limiter permits the specified cost
// to happen. It returns an error if the Context is canceled or the expected
// wait time exceeds the Context's Deadline.
func limitBulkIOWrite(ctx context.Context, limiter *rate.Limiter, cost int) error {
// The limiter disallows anything greater than its burst (set to
// BulkIOWriteLimiterBurst), so cap the batch size if it would overflow.
//
Expand All @@ -47,13 +51,14 @@ func limitBulkIOWrite(ctx context.Context, limiter *rate.Limiter, cost int) {

begin := timeutil.Now()
if err := limiter.WaitN(ctx, cost); err != nil {
log.Errorf(ctx, "error rate limiting bulk io write: %+v", err)
return errors.Wrapf(err, "error rate limiting bulk io write")
}

if d := timeutil.Since(begin); d > bulkIOWriteLimiterLongWait {
log.Warningf(ctx, "bulk io write limiter took %s (>%s):\n%s",
d, bulkIOWriteLimiterLongWait, debug.Stack())
}
return nil
}

// sstWriteSyncRate wraps "kv.bulk_sst.sync_size". 0 disables syncing.
Expand Down Expand Up @@ -101,15 +106,17 @@ func writeFileSyncing(
}
chunk := data[i:end]

// rate limit
limitBulkIOWrite(ctx, limiter, len(chunk))
_, err = f.Write(chunk)
if err == nil && sync {
err = f.Sync()
if err = limitBulkIOWrite(ctx, limiter, len(chunk)); err != nil {
break
}
if err != nil {
if _, err = f.Write(chunk); err != nil {
break
}
if sync {
if err = f.Sync(); err != nil {
break
}
}
}

closeErr := f.Close()
Expand Down

0 comments on commit a3fd4fb

Please sign in to comment.