Skip to content

Commit

Permalink
intentresolver: fix testrace flake by extending timeouts
Browse files Browse the repository at this point in the history
The unit tests which failed assumed that all messages intents in a single call
to ResolveIntents could be queued within the batch timeout and idle window which
is by default rather short (10 and 5 ms respectively). The testing validation
logic is rather strict and assumes that the batch will fire due to size rather
than time constraints. The fix is to allow the test to increase these values to
make the test more robust to load. Before this change the flake was reproducible
running testrace within several hundred iterations. Running testrace now seems
to not provoke any failures.

All that being said, I can see an argument that the testing logic should be
made less rigid and should accept that the batches may be split up.

Fixes #35064.

Release note: None
  • Loading branch information
ajwerner committed Feb 20, 2019
1 parent 2e16b73 commit 7fdc815
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 28 deletions.
22 changes: 15 additions & 7 deletions pkg/storage/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ const (
// for other contending reads or writes. The chosen value was selected based
// on some light experimentation to ensure that performance does not degrade
// in the face of highly contended workloads.
intentResolutionBatchWait = 10 * time.Millisecond
defaultIntentResolutionBatchWait = 10 * time.Millisecond

// intentResolutionBatchIdle is similar to the above setting but is used when
// when no additional traffic hits the batch.
intentResolutionBatchIdle = 5 * time.Millisecond
defaultIntentResolutionBatchIdle = 5 * time.Millisecond
)

// Config contains the dependencies to construct an IntentResolver.
Expand All @@ -106,9 +106,11 @@ type Config struct {
TestingKnobs storagebase.IntentResolverTestingKnobs
RangeDescriptorCache kvbase.RangeDescriptorCache

TaskLimit int
MaxGCBatchWait time.Duration
MaxGCBatchIdle time.Duration
TaskLimit int
MaxGCBatchWait time.Duration
MaxGCBatchIdle time.Duration
MaxIntentResolutionBatchWait time.Duration
MaxIntentResolutionBatchIdle time.Duration
}

// IntentResolver manages the process of pushing transactions and
Expand Down Expand Up @@ -154,6 +156,12 @@ func setConfigDefaults(c *Config) {
if c.MaxGCBatchWait == 0 {
c.MaxGCBatchWait = defaultGCBatchWait
}
if c.MaxIntentResolutionBatchIdle == 0 {
c.MaxIntentResolutionBatchIdle = defaultIntentResolutionBatchIdle
}
if c.MaxIntentResolutionBatchWait == 0 {
c.MaxIntentResolutionBatchWait = defaultIntentResolutionBatchWait
}
if c.RangeDescriptorCache == nil {
c.RangeDescriptorCache = nopRangeDescriptorCache{}
}
Expand Down Expand Up @@ -200,8 +208,8 @@ func New(c Config) *IntentResolver {
ir.irBatcher = requestbatcher.New(requestbatcher.Config{
Name: "intent_resolver_ir_batcher",
MaxMsgsPerBatch: batchSize,
MaxWait: intentResolutionBatchWait,
MaxIdle: intentResolutionBatchIdle,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
Expand Down
58 changes: 37 additions & 21 deletions pkg/storage/intentresolver/intent_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)

cfg := Config{
Stopper: stopper,
Clock: clock,
}
type testCase struct {
txn *roachpb.Transaction
intents []roachpb.Intent
Expand Down Expand Up @@ -184,7 +187,7 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
for _, c := range cases {
t.Run("", func(t *testing.T) {
sf := &sendFuncs{sendFuncs: c.sendFuncs}
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
ir := newIntentResolverWithSendFuncs(cfg, sf)
var didPush, didSucceed bool
done := make(chan struct{})
onComplete := func(pushed, succeeded bool) {
Expand Down Expand Up @@ -429,12 +432,16 @@ func TestCleanupIntentsAsyncThrottled(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
cfg := Config{
Stopper: stopper,
Clock: clock,
}
txn := beginTransaction(t, clock, 1, roachpb.Key("a"), true /* putKey */)
sf := newSendFuncs(
pushTxnSendFunc(1),
resolveIntentsSendFunc,
)
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
ir := newIntentResolverWithSendFuncs(cfg, sf)
// Run defaultTaskLimit tasks which will block until blocker is closed.
blocker := make(chan struct{})
defer close(blocker)
Expand Down Expand Up @@ -507,7 +514,11 @@ func TestCleanupIntentsAsync(t *testing.T) {
t.Run("", func(t *testing.T) {
stopper := stop.NewStopper()
sf := newSendFuncs(c.sendFuncs...)
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
cfg := Config{
Stopper: stopper,
Clock: clock,
}
ir := newIntentResolverWithSendFuncs(cfg, sf)
err := ir.CleanupIntentsAsync(context.Background(), c.intents, true)
testutils.SucceedsSoon(t, func() error {
if l := sf.len(); l > 0 {
Expand Down Expand Up @@ -591,10 +602,14 @@ func TestCleanupTxnIntentsAsync(t *testing.T) {
t.Run("", func(t *testing.T) {
stopper := stop.NewStopper()
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
cfg := Config{
Stopper: stopper,
Clock: clock,
}
var sendFuncCalled int64
numSendFuncs := int64(len(c.sendFuncs))
sf := newSendFuncs(counterSendFuncs(&sendFuncCalled, c.sendFuncs)...)
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
ir := newIntentResolverWithSendFuncs(cfg, sf)
if c.before != nil {
defer c.before(&c, ir)()
}
Expand Down Expand Up @@ -630,9 +645,6 @@ func counterSendFunc(counter *int64, f sendFunc) sendFunc {
// and returns the appropriate errors.
func TestCleanupIntents(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Skip("https://github.com/cockroachdb/cockroach/pull/35085")

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
txn := beginTransaction(t, clock, roachpb.MinUserPriority, roachpb.Key("a"), true)
// Set txn.ID to a very small value so it's sorted deterministically first.
Expand All @@ -645,8 +657,8 @@ func TestCleanupIntents(t *testing.T) {
sendFuncs []sendFunc
expectedErr bool
expectedNum int
cfg Config
}

cases := []testCase{
{
intents: testIntents,
Expand Down Expand Up @@ -680,13 +692,23 @@ func TestCleanupIntents(t *testing.T) {
resolveIntentsSendFunc,
},
expectedNum: 3*intentResolverBatchSize + 3,
// Under stress sometimes it can take more than 10ms to even call send.
// The batch wait is disabled and batch idle increased during this test
// to eliminate flakiness and ensure that all requests make it to the
// batcher in a timely manner.
cfg: Config{
MaxIntentResolutionBatchWait: -1, // disabled
MaxIntentResolutionBatchIdle: 20 * time.Millisecond,
},
},
}
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
for _, c := range cases {
t.Run("", func(t *testing.T) {
ir := newIntentResolverWithSendFuncs(stopper, clock, newSendFuncs(c.sendFuncs...))
c.cfg.Stopper = stopper
c.cfg.Clock = clock
ir := newIntentResolverWithSendFuncs(c.cfg, newSendFuncs(c.sendFuncs...))
num, err := ir.CleanupIntents(context.Background(), c.intents, clock.Now(), roachpb.PUSH_ABORT)
assert.Equal(t, num, c.expectedNum, "number of resolved intents")
assert.Equal(t, err != nil, c.expectedErr, "error during CleanupIntents: %v", err)
Expand Down Expand Up @@ -773,23 +795,18 @@ func makeTxnIntents(t *testing.T, clock *hlc.Clock, numIntents int) []roachpb.In
// A library of useful sendFuncs are defined below.
type sendFunc func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)

func newIntentResolverWithSendFuncs(
stopper *stop.Stopper, clock *hlc.Clock, sf *sendFuncs,
) *IntentResolver {
func newIntentResolverWithSendFuncs(c Config, sf *sendFuncs) *IntentResolver {
txnSenderFactory := client.NonTransactionalFactoryFunc(
func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
f := sf.pop()
return f(ba)
})
db := client.NewDB(log.AmbientContext{
Tracer: tracing.NewTracer(),
}, txnSenderFactory, clock)
return New(Config{
Stopper: stopper,
DB: db,
Clock: clock,
MaxGCBatchWait: time.Nanosecond,
})
}, txnSenderFactory, c.Clock)
c.DB = db
c.MaxGCBatchWait = time.Nanosecond
return New(c)
}

var (
Expand All @@ -813,7 +830,6 @@ var (
return resp, nil
}
}

singlePushTxnSendFunc = pushTxnSendFunc(1)
resolveIntentsSendFunc sendFunc = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
resp := &roachpb.BatchResponse{}
Expand Down

0 comments on commit 7fdc815

Please sign in to comment.