From 1201c01a0b66749ff8004331f013df63988a786b Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 21 Mar 2023 17:49:26 -0400 Subject: [PATCH] kv/concurrency: compute contention event duration from (key,txn) wait start time Fixes #98104. This commit resolves the bug identified in #98104 where multiple contention events could be emitted with overlapping durations, such that when these durations were added together (e.g. by `GetCumulativeContentionTime`), their sum was larger than the runtime of the entire query. This was possible because as of 70ef6410, we were failing to reset the waiting start time on each new lock holder transaction for the same key. This commit fixes this by computing the contention event duration using `contentionTag.waitStart` instead of `waitingState.lockWaitStart`. It also cleans up some of this code and makes it harder to make such a mistake in the future. Release note: None --- pkg/kv/kvserver/concurrency/lock_table.go | 11 --- .../kvserver/concurrency/lock_table_waiter.go | 71 +++++++++--------- .../concurrency/lock_table_waiter_test.go | 74 ++++++++++++------- .../testdata/concurrency_manager/wait_self | 6 +- 4 files changed, 90 insertions(+), 72 deletions(-) diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index a6a5d870681f..30a0058f275b 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -97,11 +97,6 @@ type waitingState struct { // Represents the action that the request was trying to perform when // it hit the conflict. E.g. was it trying to read or write? guardAccess spanset.SpanAccess - - // lockWaitStart represents the timestamp when the request started waiting on - // the lock that this waitingState refers to. If multiple consecutive states - // refer to the same lock, they share the same lockWaitStart. - lockWaitStart time.Time } // String implements the fmt.Stringer interface. @@ -536,12 +531,6 @@ func (g *lockTableGuardImpl) CurState() waitingState { func (g *lockTableGuardImpl) updateStateLocked(newState waitingState) { g.mu.state = newState - switch newState.kind { - case waitFor, waitForDistinguished, waitSelf, waitElsewhere: - g.mu.state.lockWaitStart = g.mu.curLockWaitStart - default: - g.mu.state.lockWaitStart = time.Time{} - } } func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(spanSet *spanset.SpanSet) (ok bool) { diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index 07d23a0c2ede..7d1de8cc2f45 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -962,7 +962,7 @@ const tagContentionTracer = "locks" const tagWaitKey = "wait_key" // tagWaitStart is the tracing span tag indicating when the request started -// waiting on the lock it's currently waiting on. +// waiting on the lock (a unique key,txn pair) it's currently waiting on. const tagWaitStart = "wait_start" // tagLockHolderTxn is the tracing span tag indicating the ID of the txn holding @@ -1010,15 +1010,18 @@ type contentionTag struct { waiting bool // waitStart represents the timestamp when the request started waiting on - // locks in the current iteration of the contentionEventTracer. The wait - // time in previous iterations is accumulated in lockWait. When not waiting - // any more, timeutil.Since(waitStart) is added to lockWait. + // a lock, as defined by a unique (key,txn) pair, in the current iteration + // of the contentionEventTracer. The wait time in previous iterations is + // accumulated in lockWait. When not waiting anymore or when waiting on a + // new (key,txn), timeutil.Since(waitStart) is added to lockWait. waitStart time.Time - // curState is the current wait state, if any. It is overwritten every time - // the lock table notify()s the contentionEventTracer of a new state. It is - // not set if waiting is false. - curState waitingState + // curStateKey and curStateTxn are the current waitingState's key and txn, + // if any. They are overwritten every time the lock table notify()s the + // contentionEventTracer of a new state. They are not set if waiting is + // false. + curStateKey roachpb.Key + curStateTxn *enginepb.TxnMeta // numLocks counts the number of locks this contentionEventTracer has seen so // far, including the one we're currently waiting on (if any). @@ -1109,9 +1112,9 @@ func (tag *contentionTag) generateEventLocked() *kvpb.ContentionEvent { } return &kvpb.ContentionEvent{ - Key: tag.mu.curState.key, - TxnMeta: *tag.mu.curState.txn, - Duration: tag.clock.PhysicalTime().Sub(tag.mu.curState.lockWaitStart), + Key: tag.mu.curStateKey, + TxnMeta: *tag.mu.curStateTxn, + Duration: tag.clock.PhysicalTime().Sub(tag.mu.waitStart), } } @@ -1128,33 +1131,35 @@ func (tag *contentionTag) notify(ctx context.Context, s waitingState) *kvpb.Cont // If we're tracking an event and see a different txn/key, the event is // done and we initialize the new event tracking the new txn/key. // - // NB: we're guaranteed to have `s.{txn,key}` populated here. - var differentLock bool - if !tag.mu.waiting { - differentLock = true - } else { - curLockHolder, curKey := tag.mu.curState.txn.ID, tag.mu.curState.key - differentLock = !curLockHolder.Equal(s.txn.ID) || !curKey.Equal(s.key) - } - var res *kvpb.ContentionEvent - if differentLock { - res = tag.generateEventLocked() + // NB: we're guaranteed to have `curState{Txn,Key}` populated here. + if tag.mu.waiting { + curLockTxn, curLockKey := tag.mu.curStateTxn.ID, tag.mu.curStateKey + differentLock := !curLockTxn.Equal(s.txn.ID) || !curLockKey.Equal(s.key) + if !differentLock { + return nil + } } - tag.mu.curState = s + res := tag.generateEventLocked() tag.mu.waiting = true - if differentLock { - tag.mu.waitStart = tag.clock.PhysicalTime() - tag.mu.numLocks++ - return res + tag.mu.curStateKey = s.key + tag.mu.curStateTxn = s.txn + // Accumulate the wait time. + now := tag.clock.PhysicalTime() + if !tag.mu.waitStart.IsZero() { + tag.mu.lockWait += now.Sub(tag.mu.waitStart) } - return nil + tag.mu.waitStart = now + tag.mu.numLocks++ + return res case doneWaiting, waitQueueMaxLengthExceeded: // There will be no more state updates; we're done waiting. res := tag.generateEventLocked() tag.mu.waiting = false - tag.mu.curState = waitingState{} - tag.mu.lockWait += tag.clock.PhysicalTime().Sub(tag.mu.waitStart) + tag.mu.curStateKey = nil + tag.mu.curStateTxn = nil // Accumulate the wait time. + now := tag.clock.PhysicalTime() + tag.mu.lockWait += now.Sub(tag.mu.waitStart) tag.mu.waitStart = time.Time{} return res default: @@ -1191,15 +1196,15 @@ func (tag *contentionTag) Render() []attribute.KeyValue { if tag.mu.waiting { tags = append(tags, attribute.KeyValue{ Key: tagWaitKey, - Value: attribute.StringValue(tag.mu.curState.key.String()), + Value: attribute.StringValue(tag.mu.curStateKey.String()), }) tags = append(tags, attribute.KeyValue{ Key: tagLockHolderTxn, - Value: attribute.StringValue(tag.mu.curState.txn.ID.String()), + Value: attribute.StringValue(tag.mu.curStateTxn.ID.String()), }) tags = append(tags, attribute.KeyValue{ Key: tagWaitStart, - Value: attribute.StringValue(tag.mu.curState.lockWaitStart.Format("15:04:05.123")), + Value: attribute.StringValue(tag.mu.waitStart.Format("15:04:05.123")), }) } return tags diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index 14282c1b3429..1d2eb7afd6e9 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -942,7 +942,11 @@ func TestContentionEventTracer(t *testing.T) { tr := tracing.NewTracer() ctx, sp := tr.StartSpanCtx(context.Background(), "foo", tracing.WithRecording(tracingpb.RecordingVerbose)) defer sp.Finish() - clock := hlc.NewClockForTesting(nil) + manual := timeutil.NewManualTime(timeutil.Unix(0, 123)) + clock := hlc.NewClockForTesting(manual) + + txn1 := makeTxnProto("foo") + txn2 := makeTxnProto("bar") var events []*kvpb.ContentionEvent @@ -950,11 +954,10 @@ func TestContentionEventTracer(t *testing.T) { h.SetOnContentionEvent(func(ev *kvpb.ContentionEvent) { events = append(events, ev) }) - txn := makeTxnProto("foo") h.notify(ctx, waitingState{ kind: waitForDistinguished, key: roachpb.Key("a"), - txn: &txn.TxnMeta, + txn: &txn1.TxnMeta, }) require.Zero(t, h.tag.mu.lockWait) require.NotZero(t, h.tag.mu.waitStart) @@ -966,63 +969,84 @@ func TestContentionEventTracer(t *testing.T) { require.True(t, ok) require.Equal(t, "1", val) _, ok = lockTagGroup.FindTag(tagWaited) + require.False(t, ok) + val, ok = lockTagGroup.FindTag(tagWaitKey) require.True(t, ok) - _, ok = lockTagGroup.FindTag(tagWaitKey) - require.True(t, ok) - _, ok = lockTagGroup.FindTag(tagWaitStart) + require.Equal(t, "\"a\"", val) + val, ok = lockTagGroup.FindTag(tagWaitStart) require.True(t, ok) - _, ok = lockTagGroup.FindTag(tagLockHolderTxn) + require.Equal(t, "00:00:00.1112", val) + val, ok = lockTagGroup.FindTag(tagLockHolderTxn) require.True(t, ok) + require.Equal(t, txn1.ID.String(), val) // Another event for the same txn/key should not mutate - // or emitLocked an event. + // or emit an event. prevNumLocks := h.tag.mu.numLocks + manual.Advance(10) h.notify(ctx, waitingState{ kind: waitFor, key: roachpb.Key("a"), - txn: &txn.TxnMeta, + txn: &txn1.TxnMeta, }) require.Empty(t, events) require.Zero(t, h.tag.mu.lockWait) require.Equal(t, prevNumLocks, h.tag.mu.numLocks) + // Another event for the same key but different txn should + // emitLocked an event. + manual.Advance(11) + h.notify(ctx, waitingState{ + kind: waitFor, + key: roachpb.Key("a"), + txn: &txn2.TxnMeta, + }) + require.Equal(t, time.Duration(21) /* 10+11 */, h.tag.mu.lockWait) + require.Len(t, events, 1) + ev := events[0] + require.Equal(t, txn1.TxnMeta, ev.TxnMeta) + require.Equal(t, roachpb.Key("a"), ev.Key) + require.Equal(t, time.Duration(21) /* 10+11 */, ev.Duration) + + // Another event for the same txn but different key should + // emit an event. + manual.Advance(12) h.notify(ctx, waitingState{ kind: waitForDistinguished, key: roachpb.Key("b"), - txn: &txn.TxnMeta, + txn: &txn2.TxnMeta, }) - require.Zero(t, h.tag.mu.lockWait) - require.Len(t, events, 1) - require.Equal(t, txn.TxnMeta, events[0].TxnMeta) - require.Equal(t, roachpb.Key("a"), events[0].Key) - require.NotZero(t, events[0].Duration) + require.Equal(t, time.Duration(33) /* 10+11+12 */, h.tag.mu.lockWait) + require.Len(t, events, 2) + ev = events[1] + require.Equal(t, txn2.TxnMeta, ev.TxnMeta) + require.Equal(t, roachpb.Key("a"), ev.Key) + require.Equal(t, time.Duration(12), ev.Duration) + manual.Advance(13) h.notify(ctx, waitingState{kind: doneWaiting}) - require.NotZero(t, h.tag.mu.lockWait) - require.Len(t, events, 2) + require.Equal(t, time.Duration(46) /* 10+11+12+13 */, h.tag.mu.lockWait) + require.Len(t, events, 3) - lockWaitBefore := h.tag.mu.lockWait h.notify(ctx, waitingState{ kind: waitFor, key: roachpb.Key("b"), - txn: &txn.TxnMeta, + txn: &txn1.TxnMeta, }) + manual.Advance(14) h.notify(ctx, waitingState{ kind: doneWaiting, }) - require.Len(t, events, 3) - require.Less(t, lockWaitBefore, h.tag.mu.lockWait) + require.Equal(t, time.Duration(60) /* 10+11+12+13+14 */, h.tag.mu.lockWait) + require.Len(t, events, 4) rec = sp.GetRecording(tracingpb.RecordingVerbose) lockTagGroup = rec[0].FindTagGroup(tagContentionTracer) require.NotNil(t, lockTagGroup) - val, ok = lockTagGroup.FindTag(tagNumLocks) require.True(t, ok) - require.Equal(t, "3", val) - + require.Equal(t, "4", val) _, ok = lockTagGroup.FindTag(tagWaited) require.True(t, ok) - _, ok = lockTagGroup.FindTag(tagWaitKey) require.False(t, ok) _, ok = lockTagGroup.FindTag(tagWaitStart) diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self index 7a90b9125e54..3aa835923c69 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self @@ -118,12 +118,12 @@ finish req=reqTxn1 ---- [-] finish reqTxn1: finishing request [3] sequence reqTxnMiddle: lock wait-queue event: done waiting -[3] sequence reqTxnMiddle: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 123.000s +[3] sequence reqTxnMiddle: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s [3] sequence reqTxnMiddle: acquiring latches [3] sequence reqTxnMiddle: scanning lock table for conflicting locks [3] sequence reqTxnMiddle: sequencing complete, returned guard [4] sequence reqTxn2: lock wait-queue event: wait for (distinguished) txn 00000003 running request @ key "k" (queuedWriters: 1, queuedReaders: 0) -[4] sequence reqTxn2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 123.000s +[4] sequence reqTxn2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s [4] sequence reqTxn2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence reqTxn2: pushing txn 00000003 to detect request deadlock [4] sequence reqTxn2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -132,7 +132,7 @@ finish req=reqTxnMiddle ---- [-] finish reqTxnMiddle: finishing request [4] sequence reqTxn2: lock wait-queue event: done waiting -[4] sequence reqTxn2: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 123.000s +[4] sequence reqTxn2: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 0.000s [4] sequence reqTxn2: acquiring latches [4] sequence reqTxn2: scanning lock table for conflicting locks [4] sequence reqTxn2: sequencing complete, returned guard