Skip to content

Commit

Permalink
Merge #35085
Browse files Browse the repository at this point in the history
35085: intentresolver: fix testrace flake by extending timeouts r=ajwerner a=ajwerner

The unit tests which failed assumed that all messages intents in a single call
to ResolveIntents could be queued within the batch timeout and idle window which
is by default rather short (10 and 5 ms respectively). The testing validation
logic is rather strict and assumes that the batch will fire due to size rather
than time constraints. The fix is to allow the test to increase these values to
make the test more robust to load. Before this change the flake was reproducible
running testrace within several hundred iterations. After the test it has not
failed after running over 20k test race iterations from two concurrently running
executions.

Fixes #35064.

Release note: None

Co-authored-by: Andrew Werner <ajwerner@cockroachlabs.com>
  • Loading branch information
craig[bot] and ajwerner committed Feb 21, 2019
2 parents 22bb753 + 7fdc815 commit 98172ca
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 28 deletions.
22 changes: 15 additions & 7 deletions pkg/storage/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ const (
// for other contending reads or writes. The chosen value was selected based
// on some light experimentation to ensure that performance does not degrade
// in the face of highly contended workloads.
intentResolutionBatchWait = 10 * time.Millisecond
defaultIntentResolutionBatchWait = 10 * time.Millisecond

// intentResolutionBatchIdle is similar to the above setting but is used when
// when no additional traffic hits the batch.
intentResolutionBatchIdle = 5 * time.Millisecond
defaultIntentResolutionBatchIdle = 5 * time.Millisecond
)

// Config contains the dependencies to construct an IntentResolver.
Expand All @@ -106,9 +106,11 @@ type Config struct {
TestingKnobs storagebase.IntentResolverTestingKnobs
RangeDescriptorCache kvbase.RangeDescriptorCache

TaskLimit int
MaxGCBatchWait time.Duration
MaxGCBatchIdle time.Duration
TaskLimit int
MaxGCBatchWait time.Duration
MaxGCBatchIdle time.Duration
MaxIntentResolutionBatchWait time.Duration
MaxIntentResolutionBatchIdle time.Duration
}

// IntentResolver manages the process of pushing transactions and
Expand Down Expand Up @@ -154,6 +156,12 @@ func setConfigDefaults(c *Config) {
if c.MaxGCBatchWait == 0 {
c.MaxGCBatchWait = defaultGCBatchWait
}
if c.MaxIntentResolutionBatchIdle == 0 {
c.MaxIntentResolutionBatchIdle = defaultIntentResolutionBatchIdle
}
if c.MaxIntentResolutionBatchWait == 0 {
c.MaxIntentResolutionBatchWait = defaultIntentResolutionBatchWait
}
if c.RangeDescriptorCache == nil {
c.RangeDescriptorCache = nopRangeDescriptorCache{}
}
Expand Down Expand Up @@ -200,8 +208,8 @@ func New(c Config) *IntentResolver {
ir.irBatcher = requestbatcher.New(requestbatcher.Config{
Name: "intent_resolver_ir_batcher",
MaxMsgsPerBatch: batchSize,
MaxWait: intentResolutionBatchWait,
MaxIdle: intentResolutionBatchIdle,
MaxWait: c.MaxIntentResolutionBatchWait,
MaxIdle: c.MaxIntentResolutionBatchIdle,
Stopper: c.Stopper,
Sender: c.DB.NonTransactionalSender(),
})
Expand Down
58 changes: 37 additions & 21 deletions pkg/storage/intentresolver/intent_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)

cfg := Config{
Stopper: stopper,
Clock: clock,
}
type testCase struct {
txn *roachpb.Transaction
intents []roachpb.Intent
Expand Down Expand Up @@ -184,7 +187,7 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
for _, c := range cases {
t.Run("", func(t *testing.T) {
sf := &sendFuncs{sendFuncs: c.sendFuncs}
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
ir := newIntentResolverWithSendFuncs(cfg, sf)
var didPush, didSucceed bool
done := make(chan struct{})
onComplete := func(pushed, succeeded bool) {
Expand Down Expand Up @@ -429,12 +432,16 @@ func TestCleanupIntentsAsyncThrottled(t *testing.T) {
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
cfg := Config{
Stopper: stopper,
Clock: clock,
}
txn := beginTransaction(t, clock, 1, roachpb.Key("a"), true /* putKey */)
sf := newSendFuncs(
pushTxnSendFunc(1),
resolveIntentsSendFunc,
)
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
ir := newIntentResolverWithSendFuncs(cfg, sf)
// Run defaultTaskLimit tasks which will block until blocker is closed.
blocker := make(chan struct{})
defer close(blocker)
Expand Down Expand Up @@ -507,7 +514,11 @@ func TestCleanupIntentsAsync(t *testing.T) {
t.Run("", func(t *testing.T) {
stopper := stop.NewStopper()
sf := newSendFuncs(c.sendFuncs...)
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
cfg := Config{
Stopper: stopper,
Clock: clock,
}
ir := newIntentResolverWithSendFuncs(cfg, sf)
err := ir.CleanupIntentsAsync(context.Background(), c.intents, true)
testutils.SucceedsSoon(t, func() error {
if l := sf.len(); l > 0 {
Expand Down Expand Up @@ -591,10 +602,14 @@ func TestCleanupTxnIntentsAsync(t *testing.T) {
t.Run("", func(t *testing.T) {
stopper := stop.NewStopper()
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
cfg := Config{
Stopper: stopper,
Clock: clock,
}
var sendFuncCalled int64
numSendFuncs := int64(len(c.sendFuncs))
sf := newSendFuncs(counterSendFuncs(&sendFuncCalled, c.sendFuncs)...)
ir := newIntentResolverWithSendFuncs(stopper, clock, sf)
ir := newIntentResolverWithSendFuncs(cfg, sf)
if c.before != nil {
defer c.before(&c, ir)()
}
Expand Down Expand Up @@ -630,9 +645,6 @@ func counterSendFunc(counter *int64, f sendFunc) sendFunc {
// and returns the appropriate errors.
func TestCleanupIntents(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Skip("https://github.com/cockroachdb/cockroach/pull/35085")

clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
txn := beginTransaction(t, clock, roachpb.MinUserPriority, roachpb.Key("a"), true)
// Set txn.ID to a very small value so it's sorted deterministically first.
Expand All @@ -645,8 +657,8 @@ func TestCleanupIntents(t *testing.T) {
sendFuncs []sendFunc
expectedErr bool
expectedNum int
cfg Config
}

cases := []testCase{
{
intents: testIntents,
Expand Down Expand Up @@ -680,13 +692,23 @@ func TestCleanupIntents(t *testing.T) {
resolveIntentsSendFunc,
},
expectedNum: 3*intentResolverBatchSize + 3,
// Under stress sometimes it can take more than 10ms to even call send.
// The batch wait is disabled and batch idle increased during this test
// to eliminate flakiness and ensure that all requests make it to the
// batcher in a timely manner.
cfg: Config{
MaxIntentResolutionBatchWait: -1, // disabled
MaxIntentResolutionBatchIdle: 20 * time.Millisecond,
},
},
}
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
for _, c := range cases {
t.Run("", func(t *testing.T) {
ir := newIntentResolverWithSendFuncs(stopper, clock, newSendFuncs(c.sendFuncs...))
c.cfg.Stopper = stopper
c.cfg.Clock = clock
ir := newIntentResolverWithSendFuncs(c.cfg, newSendFuncs(c.sendFuncs...))
num, err := ir.CleanupIntents(context.Background(), c.intents, clock.Now(), roachpb.PUSH_ABORT)
assert.Equal(t, num, c.expectedNum, "number of resolved intents")
assert.Equal(t, err != nil, c.expectedErr, "error during CleanupIntents: %v", err)
Expand Down Expand Up @@ -773,23 +795,18 @@ func makeTxnIntents(t *testing.T, clock *hlc.Clock, numIntents int) []roachpb.In
// A library of useful sendFuncs are defined below.
type sendFunc func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)

func newIntentResolverWithSendFuncs(
stopper *stop.Stopper, clock *hlc.Clock, sf *sendFuncs,
) *IntentResolver {
func newIntentResolverWithSendFuncs(c Config, sf *sendFuncs) *IntentResolver {
txnSenderFactory := client.NonTransactionalFactoryFunc(
func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
f := sf.pop()
return f(ba)
})
db := client.NewDB(log.AmbientContext{
Tracer: tracing.NewTracer(),
}, txnSenderFactory, clock)
return New(Config{
Stopper: stopper,
DB: db,
Clock: clock,
MaxGCBatchWait: time.Nanosecond,
})
}, txnSenderFactory, c.Clock)
c.DB = db
c.MaxGCBatchWait = time.Nanosecond
return New(c)
}

var (
Expand All @@ -813,7 +830,6 @@ var (
return resp, nil
}
}

singlePushTxnSendFunc = pushTxnSendFunc(1)
resolveIntentsSendFunc sendFunc = func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
resp := &roachpb.BatchResponse{}
Expand Down

0 comments on commit 98172ca

Please sign in to comment.