Skip to content

Commit

Permalink
kv/concurrency: compute contention event duration from (key,txn) wait…
Browse files Browse the repository at this point in the history
… start time

Fixes #98104.

This commit resolves the bug identified in #98104 where multiple contention
events could be emitted with overlapping durations, such that when these
durations were added together (e.g. by `GetCumulativeContentionTime`), their sum
was larger than the runtime of the entire query. This was possible because as of
70ef641, we were failing to reset the waiting start time on each new lock
holder transaction for the same key.

This commit fixes this by computing the contention event duration using
`contentionTag.waitStart` instead of `waitingState.lockWaitStart`. It also
cleans up some of this code and makes it harder to make such a mistake in the
future.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 21, 2023
1 parent 16c984e commit fb5074c
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 72 deletions.
11 changes: 0 additions & 11 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ type waitingState struct {
// Represents the action that the request was trying to perform when
// it hit the conflict. E.g. was it trying to read or write?
guardAccess spanset.SpanAccess

// lockWaitStart represents the timestamp when the request started waiting on
// the lock that this waitingState refers to. If multiple consecutive states
// refer to the same lock, they share the same lockWaitStart.
lockWaitStart time.Time
}

// String implements the fmt.Stringer interface.
Expand Down Expand Up @@ -536,12 +531,6 @@ func (g *lockTableGuardImpl) CurState() waitingState {

func (g *lockTableGuardImpl) updateStateLocked(newState waitingState) {
g.mu.state = newState
switch newState.kind {
case waitFor, waitForDistinguished, waitSelf, waitElsewhere:
g.mu.state.lockWaitStart = g.mu.curLockWaitStart
default:
g.mu.state.lockWaitStart = time.Time{}
}
}

func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(spanSet *spanset.SpanSet) (ok bool) {
Expand Down
71 changes: 38 additions & 33 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ const tagContentionTracer = "locks"
const tagWaitKey = "wait_key"

// tagWaitStart is the tracing span tag indicating when the request started
// waiting on the lock it's currently waiting on.
// waiting on the lock (a unique key,txn pair) it's currently waiting on.
const tagWaitStart = "wait_start"

// tagLockHolderTxn is the tracing span tag indicating the ID of the txn holding
Expand Down Expand Up @@ -1010,15 +1010,18 @@ type contentionTag struct {
waiting bool

// waitStart represents the timestamp when the request started waiting on
// locks in the current iteration of the contentionEventTracer. The wait
// time in previous iterations is accumulated in lockWait. When not waiting
// any more, timeutil.Since(waitStart) is added to lockWait.
// a lock, as defined by a unique (key,txn) pair, in the current iteration
// of the contentionEventTracer. The wait time in previous iterations is
// accumulated in lockWait. When not waiting anymore or when waiting on a
// new (key,txn), timeutil.Since(waitStart) is added to lockWait.
waitStart time.Time

// curState is the current wait state, if any. It is overwritten every time
// the lock table notify()s the contentionEventTracer of a new state. It is
// not set if waiting is false.
curState waitingState
// curStateKey and curStateTxn are the current waitingState's key and txn,
// if any. They are overwritten every time the lock table notify()s the
// contentionEventTracer of a new state. They are not set if waiting is
// false.
curStateKey roachpb.Key
curStateTxn *enginepb.TxnMeta

// numLocks counts the number of locks this contentionEventTracer has seen so
// far, including the one we're currently waiting on (if any).
Expand Down Expand Up @@ -1109,9 +1112,9 @@ func (tag *contentionTag) generateEventLocked() *kvpb.ContentionEvent {
}

return &kvpb.ContentionEvent{
Key: tag.mu.curState.key,
TxnMeta: *tag.mu.curState.txn,
Duration: tag.clock.PhysicalTime().Sub(tag.mu.curState.lockWaitStart),
Key: tag.mu.curStateKey,
TxnMeta: *tag.mu.curStateTxn,
Duration: tag.clock.PhysicalTime().Sub(tag.mu.waitStart),
}
}

Expand All @@ -1128,33 +1131,35 @@ func (tag *contentionTag) notify(ctx context.Context, s waitingState) *kvpb.Cont
// If we're tracking an event and see a different txn/key, the event is
// done and we initialize the new event tracking the new txn/key.
//
// NB: we're guaranteed to have `s.{txn,key}` populated here.
var differentLock bool
if !tag.mu.waiting {
differentLock = true
} else {
curLockHolder, curKey := tag.mu.curState.txn.ID, tag.mu.curState.key
differentLock = !curLockHolder.Equal(s.txn.ID) || !curKey.Equal(s.key)
}
var res *kvpb.ContentionEvent
if differentLock {
res = tag.generateEventLocked()
// NB: we're guaranteed to have `curState{Txn,Key}` populated here.
if tag.mu.waiting {
curLockTxn, curLockKey := tag.mu.curStateTxn.ID, tag.mu.curStateKey
differentLock := !curLockTxn.Equal(s.txn.ID) || !curLockKey.Equal(s.key)
if !differentLock {
return nil
}
}
tag.mu.curState = s
res := tag.generateEventLocked()
tag.mu.waiting = true
if differentLock {
tag.mu.waitStart = tag.clock.PhysicalTime()
tag.mu.numLocks++
return res
tag.mu.curStateKey = s.key
tag.mu.curStateTxn = s.txn
// Accumulate the wait time.
now := tag.clock.PhysicalTime()
if !tag.mu.waitStart.IsZero() {
tag.mu.lockWait += now.Sub(tag.mu.waitStart)
}
return nil
tag.mu.waitStart = now
tag.mu.numLocks++
return res
case doneWaiting, waitQueueMaxLengthExceeded:
// There will be no more state updates; we're done waiting.
res := tag.generateEventLocked()
tag.mu.waiting = false
tag.mu.curState = waitingState{}
tag.mu.lockWait += tag.clock.PhysicalTime().Sub(tag.mu.waitStart)
tag.mu.curStateKey = nil
tag.mu.curStateTxn = nil
// Accumulate the wait time.
now := tag.clock.PhysicalTime()
tag.mu.lockWait += now.Sub(tag.mu.waitStart)
tag.mu.waitStart = time.Time{}
return res
default:
Expand Down Expand Up @@ -1191,15 +1196,15 @@ func (tag *contentionTag) Render() []attribute.KeyValue {
if tag.mu.waiting {
tags = append(tags, attribute.KeyValue{
Key: tagWaitKey,
Value: attribute.StringValue(tag.mu.curState.key.String()),
Value: attribute.StringValue(tag.mu.curStateKey.String()),
})
tags = append(tags, attribute.KeyValue{
Key: tagLockHolderTxn,
Value: attribute.StringValue(tag.mu.curState.txn.ID.String()),
Value: attribute.StringValue(tag.mu.curStateTxn.ID.String()),
})
tags = append(tags, attribute.KeyValue{
Key: tagWaitStart,
Value: attribute.StringValue(tag.mu.curState.lockWaitStart.Format("15:04:05.123")),
Value: attribute.StringValue(tag.mu.waitStart.Format("15:04:05.123")),
})
}
return tags
Expand Down
74 changes: 49 additions & 25 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,19 +942,22 @@ func TestContentionEventTracer(t *testing.T) {
tr := tracing.NewTracer()
ctx, sp := tr.StartSpanCtx(context.Background(), "foo", tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()
clock := hlc.NewClockForTesting(nil)
manual := timeutil.NewManualTime(timeutil.Unix(0, 123))
clock := hlc.NewClockForTesting(manual)

txn1 := makeTxnProto("foo")
txn2 := makeTxnProto("bar")

var events []*kvpb.ContentionEvent

h := newContentionEventTracer(sp, clock)
h.SetOnContentionEvent(func(ev *kvpb.ContentionEvent) {
events = append(events, ev)
})
txn := makeTxnProto("foo")
h.notify(ctx, waitingState{
kind: waitForDistinguished,
key: roachpb.Key("a"),
txn: &txn.TxnMeta,
txn: &txn1.TxnMeta,
})
require.Zero(t, h.tag.mu.lockWait)
require.NotZero(t, h.tag.mu.waitStart)
Expand All @@ -966,63 +969,84 @@ func TestContentionEventTracer(t *testing.T) {
require.True(t, ok)
require.Equal(t, "1", val)
_, ok = lockTagGroup.FindTag(tagWaited)
require.False(t, ok)
val, ok = lockTagGroup.FindTag(tagWaitKey)
require.True(t, ok)
_, ok = lockTagGroup.FindTag(tagWaitKey)
require.True(t, ok)
_, ok = lockTagGroup.FindTag(tagWaitStart)
require.Equal(t, "\"a\"", val)
val, ok = lockTagGroup.FindTag(tagWaitStart)
require.True(t, ok)
_, ok = lockTagGroup.FindTag(tagLockHolderTxn)
require.Equal(t, "00:00:00.1112", val)
val, ok = lockTagGroup.FindTag(tagLockHolderTxn)
require.True(t, ok)
require.Equal(t, txn1.ID.String(), val)

// Another event for the same txn/key should not mutate
// or emitLocked an event.
// or emit an event.
prevNumLocks := h.tag.mu.numLocks
manual.Advance(10)
h.notify(ctx, waitingState{
kind: waitFor,
key: roachpb.Key("a"),
txn: &txn.TxnMeta,
txn: &txn1.TxnMeta,
})
require.Empty(t, events)
require.Zero(t, h.tag.mu.lockWait)
require.Equal(t, prevNumLocks, h.tag.mu.numLocks)

// Another event for the same key but different txn should
// emitLocked an event.
manual.Advance(11)
h.notify(ctx, waitingState{
kind: waitFor,
key: roachpb.Key("a"),
txn: &txn2.TxnMeta,
})
require.Equal(t, time.Duration(21) /* 10+11 */, h.tag.mu.lockWait)
require.Len(t, events, 1)
ev := events[0]
require.Equal(t, txn1.TxnMeta, ev.TxnMeta)
require.Equal(t, roachpb.Key("a"), ev.Key)
require.Equal(t, time.Duration(21) /* 10+11 */, ev.Duration)

// Another event for the same txn but different key should
// emit an event.
manual.Advance(12)
h.notify(ctx, waitingState{
kind: waitForDistinguished,
key: roachpb.Key("b"),
txn: &txn.TxnMeta,
txn: &txn2.TxnMeta,
})
require.Zero(t, h.tag.mu.lockWait)
require.Len(t, events, 1)
require.Equal(t, txn.TxnMeta, events[0].TxnMeta)
require.Equal(t, roachpb.Key("a"), events[0].Key)
require.NotZero(t, events[0].Duration)
require.Equal(t, time.Duration(33) /* 10+11+12 */, h.tag.mu.lockWait)
require.Len(t, events, 2)
ev = events[1]
require.Equal(t, txn2.TxnMeta, ev.TxnMeta)
require.Equal(t, roachpb.Key("a"), ev.Key)
require.Equal(t, time.Duration(12), ev.Duration)

manual.Advance(13)
h.notify(ctx, waitingState{kind: doneWaiting})
require.NotZero(t, h.tag.mu.lockWait)
require.Len(t, events, 2)
require.Equal(t, time.Duration(46) /* 10+11+12+13 */, h.tag.mu.lockWait)
require.Len(t, events, 3)

lockWaitBefore := h.tag.mu.lockWait
h.notify(ctx, waitingState{
kind: waitFor,
key: roachpb.Key("b"),
txn: &txn.TxnMeta,
txn: &txn1.TxnMeta,
})
manual.Advance(14)
h.notify(ctx, waitingState{
kind: doneWaiting,
})
require.Len(t, events, 3)
require.Less(t, lockWaitBefore, h.tag.mu.lockWait)
require.Equal(t, time.Duration(60) /* 10+11+12+13+14 */, h.tag.mu.lockWait)
require.Len(t, events, 4)
rec = sp.GetRecording(tracingpb.RecordingVerbose)
lockTagGroup = rec[0].FindTagGroup(tagContentionTracer)
require.NotNil(t, lockTagGroup)

val, ok = lockTagGroup.FindTag(tagNumLocks)
require.True(t, ok)
require.Equal(t, "3", val)

require.Equal(t, "4", val)
_, ok = lockTagGroup.FindTag(tagWaited)
require.True(t, ok)

_, ok = lockTagGroup.FindTag(tagWaitKey)
require.False(t, ok)
_, ok = lockTagGroup.FindTag(tagWaitStart)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ finish req=reqTxn1
----
[-] finish reqTxn1: finishing request
[3] sequence reqTxnMiddle: lock wait-queue event: done waiting
[3] sequence reqTxnMiddle: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 123.000s
[3] sequence reqTxnMiddle: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[3] sequence reqTxnMiddle: acquiring latches
[3] sequence reqTxnMiddle: scanning lock table for conflicting locks
[3] sequence reqTxnMiddle: sequencing complete, returned guard
[4] sequence reqTxn2: lock wait-queue event: wait for (distinguished) txn 00000003 running request @ key "k" (queuedWriters: 1, queuedReaders: 0)
[4] sequence reqTxn2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 123.000s
[4] sequence reqTxn2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[4] sequence reqTxn2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false
[4] sequence reqTxn2: pushing txn 00000003 to detect request deadlock
[4] sequence reqTxn2: blocked on select in concurrency_test.(*cluster).PushTransaction
Expand All @@ -132,7 +132,7 @@ finish req=reqTxnMiddle
----
[-] finish reqTxnMiddle: finishing request
[4] sequence reqTxn2: lock wait-queue event: done waiting
[4] sequence reqTxn2: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 123.000s
[4] sequence reqTxn2: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 0.000s
[4] sequence reqTxn2: acquiring latches
[4] sequence reqTxn2: scanning lock table for conflicting locks
[4] sequence reqTxn2: sequencing complete, returned guard
Expand Down

0 comments on commit fb5074c

Please sign in to comment.