Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

intentresolver: fix testrace flake by extending timeouts #35085

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions pkg/storage/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ const (
// for other contending reads or writes. The chosen value was selected based
// on some light experimentation to ensure that performance does not degrade
// in the face of highly contended workloads.
intentResolutionBatchWait = 10 * time.Millisecond
defaultIntentResolutionBatchWait = 10 * time.Millisecond

// intentResolutionBatchIdle is similar to the above setting but is used when
// when no additional traffic hits the batch.
intentResolutionBatchIdle = 5 * time.Millisecond
defaultIntentResolutionBatchIdle = 5 * time.Millisecond
)

// Config contains the dependencies to construct an IntentResolver.
Expand All @@ -106,9 +106,11 @@ type Config struct {
TestingKnobs storagebase.IntentResolverTestingKnobs
RangeDescriptorCache kvbase.RangeDescriptorCache

TaskLimit int
MaxGCBatchWait time.Duration
MaxGCBatchIdle time.Duration
TaskLimit int
MaxGCBatchWait time.Duration
MaxGCBatchIdle time.Duration
MaxIntentResolutionBatchWait time.Duration
MaxIntentResolutionBatchIdle time.Duration
}

// IntentResolver manages the process of pushing transactions and
Expand Down Expand Up @@ -154,6 +156,12 @@ func setConfigDefaults(c *Config) {
if c.MaxGCBatchWait == 0 {
c.MaxGCBatchWait = defaultGCBatchWait
}
if c.MaxIntentResolutionBatchIdle == 0 {
c.MaxIntentResolutionBatchIdle = defaultIntentResolutionBatchIdle
}
if c.MaxIntentResolutionBatchWait == 0 {
c.MaxIntentResolutionBatchWait = defaultIntentResolutionBatchWait
}
if c.RangeDescriptorCache == nil {
c.RangeDescriptorCache = nopRangeDescriptorCache{}
}
Expand Down Expand Up @@ -200,8 +208,8 @@ func New(c Config) *IntentResolver {
ir.irBatcher = requestbatcher.New(requestbatcher.Config{
Name: "intent_resolver_ir_batcher",
MaxMsgsPerBatch: batchSize,
MaxWait: intentResolutionBatchWait,
MaxIdle: intentResolutionBatchIdle,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
Expand Down
58 changes: 37 additions & 21 deletions pkg/storage/intentresolver/intent_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)

cfg := Config{
Stopper: stopper,
Clock: clock,
}
type testCase struct {
txn *roachpb.Transaction
intents []roachpb.Intent
Expand Down Expand Up @@ -184,7 +187,7 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
for _, c := range cases {
t.Run("", func(t *testing.T) {
sf := &sendFuncs{sendFuncs: c.sendFuncs}
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
ir := newIntentResolverWithSendFuncs(cfg, sf)
var didPush, didSucceed bool
done := make(chan struct{})
onComplete := func(pushed, succeeded bool) {
Expand Down Expand Up @@ -429,12 +432,16 @@ func TestCleanupIntentsAsyncThrottled(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
cfg := Config{
Stopper: stopper,
Clock: clock,
}
txn := beginTransaction(t, clock, 1, roachpb.Key("a"), true /* putKey */)
sf := newSendFuncs(
pushTxnSendFunc(1),
resolveIntentsSendFunc,
)
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
ir := newIntentResolverWithSendFuncs(cfg, sf)
// Run defaultTaskLimit tasks which will block until blocker is closed.
blocker := make(chan struct{})
defer close(blocker)
Expand Down Expand Up @@ -507,7 +514,11 @@ func TestCleanupIntentsAsync(t *testing.T) {
t.Run("", func(t *testing.T) {
stopper := stop.NewStopper()
sf := newSendFuncs(c.sendFuncs...)
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
cfg := Config{
Stopper: stopper,
Clock: clock,
}
ir := newIntentResolverWithSendFuncs(cfg, sf)
err := ir.CleanupIntentsAsync(context.Background(), c.intents, true)
testutils.SucceedsSoon(t, func() error {
if l := sf.len(); l > 0 {
Expand Down Expand Up @@ -591,10 +602,14 @@ func TestCleanupTxnIntentsAsync(t *testing.T) {
t.Run("", func(t *testing.T) {
stopper := stop.NewStopper()
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
cfg := Config{
Stopper: stopper,
Clock: clock,
}
var sendFuncCalled int64
numSendFuncs := int64(len(c.sendFuncs))
sf := newSendFuncs(counterSendFuncs(&sendFuncCalled, c.sendFuncs)...)
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
ir := newIntentResolverWithSendFuncs(cfg, sf)
if c.before != nil {
defer c.before(&c, ir)()
}
Expand Down Expand Up @@ -630,9 +645,6 @@ func counterSendFunc(counter *int64, f sendFunc) sendFunc {
// and returns the appropriate errors.
func TestCleanupIntents(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Skip("https://github.com/cockroachdb/cockroach/pull/35085")

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
txn := beginTransaction(t, clock, roachpb.MinUserPriority, roachpb.Key("a"), true)
// Set txn.ID to a very small value so it's sorted deterministically first.
Expand All @@ -645,8 +657,8 @@ func TestCleanupIntents(t *testing.T) {
sendFuncs []sendFunc
expectedErr bool
expectedNum int
cfg Config
}

cases := []testCase{
{
intents: testIntents,
Expand Down Expand Up @@ -680,13 +692,23 @@ func TestCleanupIntents(t *testing.T) {
resolveIntentsSendFunc,
},
expectedNum: 3*intentResolverBatchSize + 3,
// Under stress sometimes it can take more than 10ms to even call send.
// The batch wait is disabled and batch idle increased during this test
// to eliminate flakiness and ensure that all requests make it to the
// batcher in a timely manner.
cfg: Config{
MaxIntentResolutionBatchWait: -1, // disabled
MaxIntentResolutionBatchIdle: 20 * time.Millisecond,
},
},
}
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
for _, c := range cases {
t.Run("", func(t *testing.T) {
ir := newIntentResolverWithSendFuncs(stopper, clock, newSendFuncs(c.sendFuncs...))
c.cfg.Stopper = stopper
c.cfg.Clock = clock
ir := newIntentResolverWithSendFuncs(c.cfg, newSendFuncs(c.sendFuncs...))
num, err := ir.CleanupIntents(context.Background(), c.intents, clock.Now(), roachpb.PUSH_ABORT)
assert.Equal(t, num, c.expectedNum, "number of resolved intents")
assert.Equal(t, err != nil, c.expectedErr, "error during CleanupIntents: %v", err)
Expand Down Expand Up @@ -773,23 +795,18 @@ func makeTxnIntents(t *testing.T, clock *hlc.Clock, numIntents int) []roachpb.In
// A library of useful sendFuncs are defined below.
type sendFunc func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)

func newIntentResolverWithSendFuncs(
stopper *stop.Stopper, clock *hlc.Clock, sf *sendFuncs,
) *IntentResolver {
func newIntentResolverWithSendFuncs(c Config, sf *sendFuncs) *IntentResolver {
txnSenderFactory := client.NonTransactionalFactoryFunc(
func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
f := sf.pop()
return f(ba)
})
db := client.NewDB(log.AmbientContext{
Tracer: tracing.NewTracer(),
}, txnSenderFactory, clock)
return New(Config{
Stopper: stopper,
DB: db,
Clock: clock,
MaxGCBatchWait: time.Nanosecond,
})
}, txnSenderFactory, c.Clock)
c.DB = db
c.MaxGCBatchWait = time.Nanosecond
return New(c)
}

var (
Expand All @@ -813,7 +830,6 @@ var (
return resp, nil
}
}

singlePushTxnSendFunc = pushTxnSendFunc(1)
resolveIntentsSendFunc sendFunc = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
resp := &roachpb.BatchResponse{}
Expand Down