Skip to content

Commit

Permalink
Merge #99166
Browse files Browse the repository at this point in the history
99166: kv/concurrency: compute contention event duration from (key,txn) wait start time r=andreimatei a=nvanbenschoten

Fixes #98104.

This commit resolves the bug identified in #98104 where multiple contention events could be emitted with overlapping durations, such that when these durations were added together (e.g. by `GetCumulativeContentionTime`), their sum was larger than the runtime of the entire query. This was possible because as of 70ef641, we were failing to reset the waiting start time on each new lock holder transaction for the same key.

This commit fixes this by computing the contention event duration using `contentionTag.waitStart` instead of `waitingState.lockWaitStart`. It also cleans up some of this code and makes it harder to make such a mistake in the future.

Release note: None

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Mar 29, 2023
2 parents bb5c97b + 1201c01 commit 61d1dce
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 72 deletions.
11 changes: 0 additions & 11 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ type waitingState struct {
// Represents the action that the request was trying to perform when
// it hit the conflict. E.g. was it trying to read or write?
guardAccess spanset.SpanAccess

// lockWaitStart represents the timestamp when the request started waiting on
// the lock that this waitingState refers to. If multiple consecutive states
// refer to the same lock, they share the same lockWaitStart.
lockWaitStart time.Time
}

// String implements the fmt.Stringer interface.
Expand Down Expand Up @@ -536,12 +531,6 @@ func (g *lockTableGuardImpl) CurState() waitingState {

func (g *lockTableGuardImpl) updateStateLocked(newState waitingState) {
g.mu.state = newState
switch newState.kind {
case waitFor, waitForDistinguished, waitSelf, waitElsewhere:
g.mu.state.lockWaitStart = g.mu.curLockWaitStart
default:
g.mu.state.lockWaitStart = time.Time{}
}
}

func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(spanSet *spanset.SpanSet) (ok bool) {
Expand Down
71 changes: 38 additions & 33 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ const tagContentionTracer = "locks"
const tagWaitKey = "wait_key"

// tagWaitStart is the tracing span tag indicating when the request started
// waiting on the lock it's currently waiting on.
// waiting on the lock (a unique key,txn pair) it's currently waiting on.
const tagWaitStart = "wait_start"

// tagLockHolderTxn is the tracing span tag indicating the ID of the txn holding
Expand Down Expand Up @@ -1010,15 +1010,18 @@ type contentionTag struct {
waiting bool

// waitStart represents the timestamp when the request started waiting on
// locks in the current iteration of the contentionEventTracer. The wait
// time in previous iterations is accumulated in lockWait. When not waiting
// any more, timeutil.Since(waitStart) is added to lockWait.
// a lock, as defined by a unique (key,txn) pair, in the current iteration
// of the contentionEventTracer. The wait time in previous iterations is
// accumulated in lockWait. When not waiting anymore or when waiting on a
// new (key,txn), timeutil.Since(waitStart) is added to lockWait.
waitStart time.Time

// curState is the current wait state, if any. It is overwritten every time
// the lock table notify()s the contentionEventTracer of a new state. It is
// not set if waiting is false.
curState waitingState
// curStateKey and curStateTxn are the current waitingState's key and txn,
// if any. They are overwritten every time the lock table notify()s the
// contentionEventTracer of a new state. They are not set if waiting is
// false.
curStateKey roachpb.Key
curStateTxn *enginepb.TxnMeta

// numLocks counts the number of locks this contentionEventTracer has seen so
// far, including the one we're currently waiting on (if any).
Expand Down Expand Up @@ -1109,9 +1112,9 @@ func (tag *contentionTag) generateEventLocked() *kvpb.ContentionEvent {
}

return &kvpb.ContentionEvent{
Key: tag.mu.curState.key,
TxnMeta: *tag.mu.curState.txn,
Duration: tag.clock.PhysicalTime().Sub(tag.mu.curState.lockWaitStart),
Key: tag.mu.curStateKey,
TxnMeta: *tag.mu.curStateTxn,
Duration: tag.clock.PhysicalTime().Sub(tag.mu.waitStart),
}
}

Expand All @@ -1128,33 +1131,35 @@ func (tag *contentionTag) notify(ctx context.Context, s waitingState) *kvpb.Cont
// If we're tracking an event and see a different txn/key, the event is
// done and we initialize the new event tracking the new txn/key.
//
// NB: we're guaranteed to have `s.{txn,key}` populated here.
var differentLock bool
if !tag.mu.waiting {
differentLock = true
} else {
curLockHolder, curKey := tag.mu.curState.txn.ID, tag.mu.curState.key
differentLock = !curLockHolder.Equal(s.txn.ID) || !curKey.Equal(s.key)
}
var res *kvpb.ContentionEvent
if differentLock {
res = tag.generateEventLocked()
// NB: we're guaranteed to have `curState{Txn,Key}` populated here.
if tag.mu.waiting {
curLockTxn, curLockKey := tag.mu.curStateTxn.ID, tag.mu.curStateKey
differentLock := !curLockTxn.Equal(s.txn.ID) || !curLockKey.Equal(s.key)
if !differentLock {
return nil
}
}
tag.mu.curState = s
res := tag.generateEventLocked()
tag.mu.waiting = true
if differentLock {
tag.mu.waitStart = tag.clock.PhysicalTime()
tag.mu.numLocks++
return res
tag.mu.curStateKey = s.key
tag.mu.curStateTxn = s.txn
// Accumulate the wait time.
now := tag.clock.PhysicalTime()
if !tag.mu.waitStart.IsZero() {
tag.mu.lockWait += now.Sub(tag.mu.waitStart)
}
return nil
tag.mu.waitStart = now
tag.mu.numLocks++
return res
case doneWaiting, waitQueueMaxLengthExceeded:
// There will be no more state updates; we're done waiting.
res := tag.generateEventLocked()
tag.mu.waiting = false
tag.mu.curState = waitingState{}
tag.mu.lockWait += tag.clock.PhysicalTime().Sub(tag.mu.waitStart)
tag.mu.curStateKey = nil
tag.mu.curStateTxn = nil
// Accumulate the wait time.
now := tag.clock.PhysicalTime()
tag.mu.lockWait += now.Sub(tag.mu.waitStart)
tag.mu.waitStart = time.Time{}
return res
default:
Expand Down Expand Up @@ -1191,15 +1196,15 @@ func (tag *contentionTag) Render() []attribute.KeyValue {
if tag.mu.waiting {
tags = append(tags, attribute.KeyValue{
Key: tagWaitKey,
Value: attribute.StringValue(tag.mu.curState.key.String()),
Value: attribute.StringValue(tag.mu.curStateKey.String()),
})
tags = append(tags, attribute.KeyValue{
Key: tagLockHolderTxn,
Value: attribute.StringValue(tag.mu.curState.txn.ID.String()),
Value: attribute.StringValue(tag.mu.curStateTxn.ID.String()),
})
tags = append(tags, attribute.KeyValue{
Key: tagWaitStart,
Value: attribute.StringValue(tag.mu.curState.lockWaitStart.Format("15:04:05.123")),
Value: attribute.StringValue(tag.mu.waitStart.Format("15:04:05.123")),
})
}
return tags
Expand Down
74 changes: 49 additions & 25 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,19 +942,22 @@ func TestContentionEventTracer(t *testing.T) {
tr := tracing.NewTracer()
ctx, sp := tr.StartSpanCtx(context.Background(), "foo", tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()
clock := hlc.NewClockForTesting(nil)
manual := timeutil.NewManualTime(timeutil.Unix(0, 123))
clock := hlc.NewClockForTesting(manual)

txn1 := makeTxnProto("foo")
txn2 := makeTxnProto("bar")

var events []*kvpb.ContentionEvent

h := newContentionEventTracer(sp, clock)
h.SetOnContentionEvent(func(ev *kvpb.ContentionEvent) {
events = append(events, ev)
})
txn := makeTxnProto("foo")
h.notify(ctx, waitingState{
kind: waitForDistinguished,
key: roachpb.Key("a"),
txn: &txn.TxnMeta,
txn: &txn1.TxnMeta,
})
require.Zero(t, h.tag.mu.lockWait)
require.NotZero(t, h.tag.mu.waitStart)
Expand All @@ -966,63 +969,84 @@ func TestContentionEventTracer(t *testing.T) {
require.True(t, ok)
require.Equal(t, "1", val)
_, ok = lockTagGroup.FindTag(tagWaited)
require.False(t, ok)
val, ok = lockTagGroup.FindTag(tagWaitKey)
require.True(t, ok)
_, ok = lockTagGroup.FindTag(tagWaitKey)
require.True(t, ok)
_, ok = lockTagGroup.FindTag(tagWaitStart)
require.Equal(t, "\"a\"", val)
val, ok = lockTagGroup.FindTag(tagWaitStart)
require.True(t, ok)
_, ok = lockTagGroup.FindTag(tagLockHolderTxn)
require.Equal(t, "00:00:00.1112", val)
val, ok = lockTagGroup.FindTag(tagLockHolderTxn)
require.True(t, ok)
require.Equal(t, txn1.ID.String(), val)

// Another event for the same txn/key should not mutate
// or emitLocked an event.
// or emit an event.
prevNumLocks := h.tag.mu.numLocks
manual.Advance(10)
h.notify(ctx, waitingState{
kind: waitFor,
key: roachpb.Key("a"),
txn: &txn.TxnMeta,
txn: &txn1.TxnMeta,
})
require.Empty(t, events)
require.Zero(t, h.tag.mu.lockWait)
require.Equal(t, prevNumLocks, h.tag.mu.numLocks)

// Another event for the same key but different txn should
// emitLocked an event.
manual.Advance(11)
h.notify(ctx, waitingState{
kind: waitFor,
key: roachpb.Key("a"),
txn: &txn2.TxnMeta,
})
require.Equal(t, time.Duration(21) /* 10+11 */, h.tag.mu.lockWait)
require.Len(t, events, 1)
ev := events[0]
require.Equal(t, txn1.TxnMeta, ev.TxnMeta)
require.Equal(t, roachpb.Key("a"), ev.Key)
require.Equal(t, time.Duration(21) /* 10+11 */, ev.Duration)

// Another event for the same txn but different key should
// emit an event.
manual.Advance(12)
h.notify(ctx, waitingState{
kind: waitForDistinguished,
key: roachpb.Key("b"),
txn: &txn.TxnMeta,
txn: &txn2.TxnMeta,
})
require.Zero(t, h.tag.mu.lockWait)
require.Len(t, events, 1)
require.Equal(t, txn.TxnMeta, events[0].TxnMeta)
require.Equal(t, roachpb.Key("a"), events[0].Key)
require.NotZero(t, events[0].Duration)
require.Equal(t, time.Duration(33) /* 10+11+12 */, h.tag.mu.lockWait)
require.Len(t, events, 2)
ev = events[1]
require.Equal(t, txn2.TxnMeta, ev.TxnMeta)
require.Equal(t, roachpb.Key("a"), ev.Key)
require.Equal(t, time.Duration(12), ev.Duration)

manual.Advance(13)
h.notify(ctx, waitingState{kind: doneWaiting})
require.NotZero(t, h.tag.mu.lockWait)
require.Len(t, events, 2)
require.Equal(t, time.Duration(46) /* 10+11+12+13 */, h.tag.mu.lockWait)
require.Len(t, events, 3)

lockWaitBefore := h.tag.mu.lockWait
h.notify(ctx, waitingState{
kind: waitFor,
key: roachpb.Key("b"),
txn: &txn.TxnMeta,
txn: &txn1.TxnMeta,
})
manual.Advance(14)
h.notify(ctx, waitingState{
kind: doneWaiting,
})
require.Len(t, events, 3)
require.Less(t, lockWaitBefore, h.tag.mu.lockWait)
require.Equal(t, time.Duration(60) /* 10+11+12+13+14 */, h.tag.mu.lockWait)
require.Len(t, events, 4)
rec = sp.GetRecording(tracingpb.RecordingVerbose)
lockTagGroup = rec[0].FindTagGroup(tagContentionTracer)
require.NotNil(t, lockTagGroup)

val, ok = lockTagGroup.FindTag(tagNumLocks)
require.True(t, ok)
require.Equal(t, "3", val)

require.Equal(t, "4", val)
_, ok = lockTagGroup.FindTag(tagWaited)
require.True(t, ok)

_, ok = lockTagGroup.FindTag(tagWaitKey)
require.False(t, ok)
_, ok = lockTagGroup.FindTag(tagWaitStart)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ finish req=reqTxn1
----
[-] finish reqTxn1: finishing request
[3] sequence reqTxnMiddle: lock wait-queue event: done waiting
[3] sequence reqTxnMiddle: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 123.000s
[3] sequence reqTxnMiddle: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[3] sequence reqTxnMiddle: acquiring latches
[3] sequence reqTxnMiddle: scanning lock table for conflicting locks
[3] sequence reqTxnMiddle: sequencing complete, returned guard
[4] sequence reqTxn2: lock wait-queue event: wait for (distinguished) txn 00000003 running request @ key "k" (queuedWriters: 1, queuedReaders: 0)
[4] sequence reqTxn2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 123.000s
[4] sequence reqTxn2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[4] sequence reqTxn2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false
[4] sequence reqTxn2: pushing txn 00000003 to detect request deadlock
[4] sequence reqTxn2: blocked on select in concurrency_test.(*cluster).PushTransaction
Expand All @@ -132,7 +132,7 @@ finish req=reqTxnMiddle
----
[-] finish reqTxnMiddle: finishing request
[4] sequence reqTxn2: lock wait-queue event: done waiting
[4] sequence reqTxn2: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 123.000s
[4] sequence reqTxn2: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 0.000s
[4] sequence reqTxn2: acquiring latches
[4] sequence reqTxn2: scanning lock table for conflicting locks
[4] sequence reqTxn2: sequencing complete, returned guard
Expand Down

0 comments on commit 61d1dce

Please sign in to comment.