diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index a6a5d870681f..30a0058f275b 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -97,11 +97,6 @@ type waitingState struct { // Represents the action that the request was trying to perform when // it hit the conflict. E.g. was it trying to read or write? guardAccess spanset.SpanAccess - - // lockWaitStart represents the timestamp when the request started waiting on - // the lock that this waitingState refers to. If multiple consecutive states - // refer to the same lock, they share the same lockWaitStart. - lockWaitStart time.Time } // String implements the fmt.Stringer interface. @@ -536,12 +531,6 @@ func (g *lockTableGuardImpl) CurState() waitingState { func (g *lockTableGuardImpl) updateStateLocked(newState waitingState) { g.mu.state = newState - switch newState.kind { - case waitFor, waitForDistinguished, waitSelf, waitElsewhere: - g.mu.state.lockWaitStart = g.mu.curLockWaitStart - default: - g.mu.state.lockWaitStart = time.Time{} - } } func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(spanSet *spanset.SpanSet) (ok bool) { diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index 07d23a0c2ede..7d1de8cc2f45 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -962,7 +962,7 @@ const tagContentionTracer = "locks" const tagWaitKey = "wait_key" // tagWaitStart is the tracing span tag indicating when the request started -// waiting on the lock it's currently waiting on. +// waiting on the lock (a unique key,txn pair) it's currently waiting on. const tagWaitStart = "wait_start" // tagLockHolderTxn is the tracing span tag indicating the ID of the txn holding @@ -1010,15 +1010,18 @@ type contentionTag struct { waiting bool // waitStart represents the timestamp when the request started waiting on - // locks in the current iteration of the contentionEventTracer. The wait - // time in previous iterations is accumulated in lockWait. When not waiting - // any more, timeutil.Since(waitStart) is added to lockWait. + // a lock, as defined by a unique (key,txn) pair, in the current iteration + // of the contentionEventTracer. The wait time in previous iterations is + // accumulated in lockWait. When not waiting anymore or when waiting on a + // new (key,txn), timeutil.Since(waitStart) is added to lockWait. waitStart time.Time - // curState is the current wait state, if any. It is overwritten every time - // the lock table notify()s the contentionEventTracer of a new state. It is - // not set if waiting is false. - curState waitingState + // curStateKey and curStateTxn are the current waitingState's key and txn, + // if any. They are overwritten every time the lock table notify()s the + // contentionEventTracer of a new state. They are not set if waiting is + // false. + curStateKey roachpb.Key + curStateTxn *enginepb.TxnMeta // numLocks counts the number of locks this contentionEventTracer has seen so // far, including the one we're currently waiting on (if any). @@ -1109,9 +1112,9 @@ func (tag *contentionTag) generateEventLocked() *kvpb.ContentionEvent { } return &kvpb.ContentionEvent{ - Key: tag.mu.curState.key, - TxnMeta: *tag.mu.curState.txn, - Duration: tag.clock.PhysicalTime().Sub(tag.mu.curState.lockWaitStart), + Key: tag.mu.curStateKey, + TxnMeta: *tag.mu.curStateTxn, + Duration: tag.clock.PhysicalTime().Sub(tag.mu.waitStart), } } @@ -1128,33 +1131,35 @@ func (tag *contentionTag) notify(ctx context.Context, s waitingState) *kvpb.Cont // If we're tracking an event and see a different txn/key, the event is // done and we initialize the new event tracking the new txn/key. // - // NB: we're guaranteed to have `s.{txn,key}` populated here. - var differentLock bool - if !tag.mu.waiting { - differentLock = true - } else { - curLockHolder, curKey := tag.mu.curState.txn.ID, tag.mu.curState.key - differentLock = !curLockHolder.Equal(s.txn.ID) || !curKey.Equal(s.key) - } - var res *kvpb.ContentionEvent - if differentLock { - res = tag.generateEventLocked() + // NB: we're guaranteed to have `curState{Txn,Key}` populated here. + if tag.mu.waiting { + curLockTxn, curLockKey := tag.mu.curStateTxn.ID, tag.mu.curStateKey + differentLock := !curLockTxn.Equal(s.txn.ID) || !curLockKey.Equal(s.key) + if !differentLock { + return nil + } } - tag.mu.curState = s + res := tag.generateEventLocked() tag.mu.waiting = true - if differentLock { - tag.mu.waitStart = tag.clock.PhysicalTime() - tag.mu.numLocks++ - return res + tag.mu.curStateKey = s.key + tag.mu.curStateTxn = s.txn + // Accumulate the wait time. + now := tag.clock.PhysicalTime() + if !tag.mu.waitStart.IsZero() { + tag.mu.lockWait += now.Sub(tag.mu.waitStart) } - return nil + tag.mu.waitStart = now + tag.mu.numLocks++ + return res case doneWaiting, waitQueueMaxLengthExceeded: // There will be no more state updates; we're done waiting. res := tag.generateEventLocked() tag.mu.waiting = false - tag.mu.curState = waitingState{} - tag.mu.lockWait += tag.clock.PhysicalTime().Sub(tag.mu.waitStart) + tag.mu.curStateKey = nil + tag.mu.curStateTxn = nil // Accumulate the wait time. + now := tag.clock.PhysicalTime() + tag.mu.lockWait += now.Sub(tag.mu.waitStart) tag.mu.waitStart = time.Time{} return res default: @@ -1191,15 +1196,15 @@ func (tag *contentionTag) Render() []attribute.KeyValue { if tag.mu.waiting { tags = append(tags, attribute.KeyValue{ Key: tagWaitKey, - Value: attribute.StringValue(tag.mu.curState.key.String()), + Value: attribute.StringValue(tag.mu.curStateKey.String()), }) tags = append(tags, attribute.KeyValue{ Key: tagLockHolderTxn, - Value: attribute.StringValue(tag.mu.curState.txn.ID.String()), + Value: attribute.StringValue(tag.mu.curStateTxn.ID.String()), }) tags = append(tags, attribute.KeyValue{ Key: tagWaitStart, - Value: attribute.StringValue(tag.mu.curState.lockWaitStart.Format("15:04:05.123")), + Value: attribute.StringValue(tag.mu.waitStart.Format("15:04:05.123")), }) } return tags diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index 14282c1b3429..1d2eb7afd6e9 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -942,7 +942,11 @@ func TestContentionEventTracer(t *testing.T) { tr := tracing.NewTracer() ctx, sp := tr.StartSpanCtx(context.Background(), "foo", tracing.WithRecording(tracingpb.RecordingVerbose)) defer sp.Finish() - clock := hlc.NewClockForTesting(nil) + manual := timeutil.NewManualTime(timeutil.Unix(0, 123)) + clock := hlc.NewClockForTesting(manual) + + txn1 := makeTxnProto("foo") + txn2 := makeTxnProto("bar") var events []*kvpb.ContentionEvent @@ -950,11 +954,10 @@ func TestContentionEventTracer(t *testing.T) { h.SetOnContentionEvent(func(ev *kvpb.ContentionEvent) { events = append(events, ev) }) - txn := makeTxnProto("foo") h.notify(ctx, waitingState{ kind: waitForDistinguished, key: roachpb.Key("a"), - txn: &txn.TxnMeta, + txn: &txn1.TxnMeta, }) require.Zero(t, h.tag.mu.lockWait) require.NotZero(t, h.tag.mu.waitStart) @@ -966,63 +969,84 @@ func TestContentionEventTracer(t *testing.T) { require.True(t, ok) require.Equal(t, "1", val) _, ok = lockTagGroup.FindTag(tagWaited) + require.False(t, ok) + val, ok = lockTagGroup.FindTag(tagWaitKey) require.True(t, ok) - _, ok = lockTagGroup.FindTag(tagWaitKey) - require.True(t, ok) - _, ok = lockTagGroup.FindTag(tagWaitStart) + require.Equal(t, "\"a\"", val) + val, ok = lockTagGroup.FindTag(tagWaitStart) require.True(t, ok) - _, ok = lockTagGroup.FindTag(tagLockHolderTxn) + require.Equal(t, "00:00:00.1112", val) + val, ok = lockTagGroup.FindTag(tagLockHolderTxn) require.True(t, ok) + require.Equal(t, txn1.ID.String(), val) // Another event for the same txn/key should not mutate - // or emitLocked an event. + // or emit an event. prevNumLocks := h.tag.mu.numLocks + manual.Advance(10) h.notify(ctx, waitingState{ kind: waitFor, key: roachpb.Key("a"), - txn: &txn.TxnMeta, + txn: &txn1.TxnMeta, }) require.Empty(t, events) require.Zero(t, h.tag.mu.lockWait) require.Equal(t, prevNumLocks, h.tag.mu.numLocks) + // Another event for the same key but different txn should + // emitLocked an event. + manual.Advance(11) + h.notify(ctx, waitingState{ + kind: waitFor, + key: roachpb.Key("a"), + txn: &txn2.TxnMeta, + }) + require.Equal(t, time.Duration(21) /* 10+11 */, h.tag.mu.lockWait) + require.Len(t, events, 1) + ev := events[0] + require.Equal(t, txn1.TxnMeta, ev.TxnMeta) + require.Equal(t, roachpb.Key("a"), ev.Key) + require.Equal(t, time.Duration(21) /* 10+11 */, ev.Duration) + + // Another event for the same txn but different key should + // emit an event. + manual.Advance(12) h.notify(ctx, waitingState{ kind: waitForDistinguished, key: roachpb.Key("b"), - txn: &txn.TxnMeta, + txn: &txn2.TxnMeta, }) - require.Zero(t, h.tag.mu.lockWait) - require.Len(t, events, 1) - require.Equal(t, txn.TxnMeta, events[0].TxnMeta) - require.Equal(t, roachpb.Key("a"), events[0].Key) - require.NotZero(t, events[0].Duration) + require.Equal(t, time.Duration(33) /* 10+11+12 */, h.tag.mu.lockWait) + require.Len(t, events, 2) + ev = events[1] + require.Equal(t, txn2.TxnMeta, ev.TxnMeta) + require.Equal(t, roachpb.Key("a"), ev.Key) + require.Equal(t, time.Duration(12), ev.Duration) + manual.Advance(13) h.notify(ctx, waitingState{kind: doneWaiting}) - require.NotZero(t, h.tag.mu.lockWait) - require.Len(t, events, 2) + require.Equal(t, time.Duration(46) /* 10+11+12+13 */, h.tag.mu.lockWait) + require.Len(t, events, 3) - lockWaitBefore := h.tag.mu.lockWait h.notify(ctx, waitingState{ kind: waitFor, key: roachpb.Key("b"), - txn: &txn.TxnMeta, + txn: &txn1.TxnMeta, }) + manual.Advance(14) h.notify(ctx, waitingState{ kind: doneWaiting, }) - require.Len(t, events, 3) - require.Less(t, lockWaitBefore, h.tag.mu.lockWait) + require.Equal(t, time.Duration(60) /* 10+11+12+13+14 */, h.tag.mu.lockWait) + require.Len(t, events, 4) rec = sp.GetRecording(tracingpb.RecordingVerbose) lockTagGroup = rec[0].FindTagGroup(tagContentionTracer) require.NotNil(t, lockTagGroup) - val, ok = lockTagGroup.FindTag(tagNumLocks) require.True(t, ok) - require.Equal(t, "3", val) - + require.Equal(t, "4", val) _, ok = lockTagGroup.FindTag(tagWaited) require.True(t, ok) - _, ok = lockTagGroup.FindTag(tagWaitKey) require.False(t, ok) _, ok = lockTagGroup.FindTag(tagWaitStart) diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self index 7a90b9125e54..3aa835923c69 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self @@ -118,12 +118,12 @@ finish req=reqTxn1 ---- [-] finish reqTxn1: finishing request [3] sequence reqTxnMiddle: lock wait-queue event: done waiting -[3] sequence reqTxnMiddle: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 123.000s +[3] sequence reqTxnMiddle: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s [3] sequence reqTxnMiddle: acquiring latches [3] sequence reqTxnMiddle: scanning lock table for conflicting locks [3] sequence reqTxnMiddle: sequencing complete, returned guard [4] sequence reqTxn2: lock wait-queue event: wait for (distinguished) txn 00000003 running request @ key "k" (queuedWriters: 1, queuedReaders: 0) -[4] sequence reqTxn2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 123.000s +[4] sequence reqTxn2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s [4] sequence reqTxn2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence reqTxn2: pushing txn 00000003 to detect request deadlock [4] sequence reqTxn2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -132,7 +132,7 @@ finish req=reqTxnMiddle ---- [-] finish reqTxnMiddle: finishing request [4] sequence reqTxn2: lock wait-queue event: done waiting -[4] sequence reqTxn2: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 123.000s +[4] sequence reqTxn2: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 0.000s [4] sequence reqTxn2: acquiring latches [4] sequence reqTxn2: scanning lock table for conflicting locks [4] sequence reqTxn2: sequencing complete, returned guard