diff --git a/pkg/storage/intentresolver/intent_resolver.go b/pkg/storage/intentresolver/intent_resolver.go index 15e27c5d9927..5663f40ce029 100644 --- a/pkg/storage/intentresolver/intent_resolver.go +++ b/pkg/storage/intentresolver/intent_resolver.go @@ -90,11 +90,11 @@ const ( // for other contending reads or writes. The chosen value was selected based // on some light experimentation to ensure that performance does not degrade // in the face of highly contended workloads. - intentResolutionBatchWait = 10 * time.Millisecond + defaultIntentResolutionBatchWait = 10 * time.Millisecond // intentResolutionBatchIdle is similar to the above setting but is used when // when no additional traffic hits the batch. - intentResolutionBatchIdle = 5 * time.Millisecond + defaultIntentResolutionBatchIdle = 5 * time.Millisecond ) // Config contains the dependencies to construct an IntentResolver. @@ -106,9 +106,11 @@ type Config struct { TestingKnobs storagebase.IntentResolverTestingKnobs RangeDescriptorCache kvbase.RangeDescriptorCache - TaskLimit int - MaxGCBatchWait time.Duration - MaxGCBatchIdle time.Duration + TaskLimit int + MaxGCBatchWait time.Duration + MaxGCBatchIdle time.Duration + MaxIntentResolutionBatchWait time.Duration + MaxIntentResolutionBatchIdle time.Duration } // IntentResolver manages the process of pushing transactions and @@ -154,6 +156,12 @@ func setConfigDefaults(c *Config) { if c.MaxGCBatchWait == 0 { c.MaxGCBatchWait = defaultGCBatchWait } + if c.MaxIntentResolutionBatchIdle == 0 { + c.MaxIntentResolutionBatchIdle = defaultIntentResolutionBatchIdle + } + if c.MaxIntentResolutionBatchWait == 0 { + c.MaxIntentResolutionBatchWait = defaultIntentResolutionBatchWait + } if c.RangeDescriptorCache == nil { c.RangeDescriptorCache = nopRangeDescriptorCache{} } @@ -200,8 +208,8 @@ func New(c Config) *IntentResolver { ir.irBatcher = requestbatcher.New(requestbatcher.Config{ Name: "intent_resolver_ir_batcher", MaxMsgsPerBatch: batchSize, - MaxWait: intentResolutionBatchWait, - MaxIdle: intentResolutionBatchIdle, + MaxWait: c.MaxIntentResolutionBatchWait, + MaxIdle: c.MaxIntentResolutionBatchIdle, Stopper: c.Stopper, Sender: c.DB.NonTransactionalSender(), }) diff --git a/pkg/storage/intentresolver/intent_resolver_test.go b/pkg/storage/intentresolver/intent_resolver_test.go index 2b690248242e..c8e02ef323fe 100644 --- a/pkg/storage/intentresolver/intent_resolver_test.go +++ b/pkg/storage/intentresolver/intent_resolver_test.go @@ -87,7 +87,10 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) { stopper := stop.NewStopper() defer stopper.Stop(ctx) clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) - + cfg := Config{ + Stopper: stopper, + Clock: clock, + } type testCase struct { txn *roachpb.Transaction intents []roachpb.Intent @@ -184,7 +187,7 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) { for _, c := range cases { t.Run("", func(t *testing.T) { sf := &sendFuncs{sendFuncs: c.sendFuncs} - ir := newIntentResolverWithSendFuncs(stopper, clock, sf) + ir := newIntentResolverWithSendFuncs(cfg, sf) var didPush, didSucceed bool done := make(chan struct{}) onComplete := func(pushed, succeeded bool) { @@ -429,12 +432,16 @@ func TestCleanupIntentsAsyncThrottled(t *testing.T) { clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) stopper := stop.NewStopper() defer stopper.Stop(context.Background()) + cfg := Config{ + Stopper: stopper, + Clock: clock, + } txn := beginTransaction(t, clock, 1, roachpb.Key("a"), true /* putKey */) sf := newSendFuncs( pushTxnSendFunc(1), resolveIntentsSendFunc, ) - ir := newIntentResolverWithSendFuncs(stopper, clock, sf) + ir := newIntentResolverWithSendFuncs(cfg, sf) // Run defaultTaskLimit tasks which will block until blocker is closed. blocker := make(chan struct{}) defer close(blocker) @@ -507,7 +514,11 @@ func TestCleanupIntentsAsync(t *testing.T) { t.Run("", func(t *testing.T) { stopper := stop.NewStopper() sf := newSendFuncs(c.sendFuncs...) - ir := newIntentResolverWithSendFuncs(stopper, clock, sf) + cfg := Config{ + Stopper: stopper, + Clock: clock, + } + ir := newIntentResolverWithSendFuncs(cfg, sf) err := ir.CleanupIntentsAsync(context.Background(), c.intents, true) testutils.SucceedsSoon(t, func() error { if l := sf.len(); l > 0 { @@ -591,10 +602,14 @@ func TestCleanupTxnIntentsAsync(t *testing.T) { t.Run("", func(t *testing.T) { stopper := stop.NewStopper() clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + cfg := Config{ + Stopper: stopper, + Clock: clock, + } var sendFuncCalled int64 numSendFuncs := int64(len(c.sendFuncs)) sf := newSendFuncs(counterSendFuncs(&sendFuncCalled, c.sendFuncs)...) - ir := newIntentResolverWithSendFuncs(stopper, clock, sf) + ir := newIntentResolverWithSendFuncs(cfg, sf) if c.before != nil { defer c.before(&c, ir)() } @@ -630,9 +645,6 @@ func counterSendFunc(counter *int64, f sendFunc) sendFunc { // and returns the appropriate errors. func TestCleanupIntents(t *testing.T) { defer leaktest.AfterTest(t)() - - t.Skip("https://github.com/cockroachdb/cockroach/pull/35085") - clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) txn := beginTransaction(t, clock, roachpb.MinUserPriority, roachpb.Key("a"), true) // Set txn.ID to a very small value so it's sorted deterministically first. @@ -645,8 +657,8 @@ func TestCleanupIntents(t *testing.T) { sendFuncs []sendFunc expectedErr bool expectedNum int + cfg Config } - cases := []testCase{ { intents: testIntents, @@ -680,13 +692,23 @@ func TestCleanupIntents(t *testing.T) { resolveIntentsSendFunc, }, expectedNum: 3*intentResolverBatchSize + 3, + // Under stress sometimes it can take more than 10ms to even call send. + // The batch wait is disabled and batch idle increased during this test + // to eliminate flakiness and ensure that all requests make it to the + // batcher in a timely manner. + cfg: Config{ + MaxIntentResolutionBatchWait: -1, // disabled + MaxIntentResolutionBatchIdle: 20 * time.Millisecond, + }, }, } stopper := stop.NewStopper() defer stopper.Stop(context.Background()) for _, c := range cases { t.Run("", func(t *testing.T) { - ir := newIntentResolverWithSendFuncs(stopper, clock, newSendFuncs(c.sendFuncs...)) + c.cfg.Stopper = stopper + c.cfg.Clock = clock + ir := newIntentResolverWithSendFuncs(c.cfg, newSendFuncs(c.sendFuncs...)) num, err := ir.CleanupIntents(context.Background(), c.intents, clock.Now(), roachpb.PUSH_ABORT) assert.Equal(t, num, c.expectedNum, "number of resolved intents") assert.Equal(t, err != nil, c.expectedErr, "error during CleanupIntents: %v", err) @@ -773,9 +795,7 @@ func makeTxnIntents(t *testing.T, clock *hlc.Clock, numIntents int) []roachpb.In // A library of useful sendFuncs are defined below. type sendFunc func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) -func newIntentResolverWithSendFuncs( - stopper *stop.Stopper, clock *hlc.Clock, sf *sendFuncs, -) *IntentResolver { +func newIntentResolverWithSendFuncs(c Config, sf *sendFuncs) *IntentResolver { txnSenderFactory := client.NonTransactionalFactoryFunc( func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { f := sf.pop() @@ -783,13 +803,10 @@ func newIntentResolverWithSendFuncs( }) db := client.NewDB(log.AmbientContext{ Tracer: tracing.NewTracer(), - }, txnSenderFactory, clock) - return New(Config{ - Stopper: stopper, - DB: db, - Clock: clock, - MaxGCBatchWait: time.Nanosecond, - }) + }, txnSenderFactory, c.Clock) + c.DB = db + c.MaxGCBatchWait = time.Nanosecond + return New(c) } var ( @@ -813,7 +830,6 @@ var ( return resp, nil } } - singlePushTxnSendFunc = pushTxnSendFunc(1) resolveIntentsSendFunc sendFunc = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { resp := &roachpb.BatchResponse{}