Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/concurrency: compute contention event duration from (key,txn) wait start time #99166

Merged
merged 1 commit into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ type waitingState struct {
// Represents the action that the request was trying to perform when
// it hit the conflict. E.g. was it trying to read or write?
guardAccess spanset.SpanAccess

// lockWaitStart represents the timestamp when the request started waiting on
// the lock that this waitingState refers to. If multiple consecutive states
// refer to the same lock, they share the same lockWaitStart.
lockWaitStart time.Time
}

// String implements the fmt.Stringer interface.
Expand Down Expand Up @@ -536,12 +531,6 @@ func (g *lockTableGuardImpl) CurState() waitingState {

func (g *lockTableGuardImpl) updateStateLocked(newState waitingState) {
g.mu.state = newState
switch newState.kind {
case waitFor, waitForDistinguished, waitSelf, waitElsewhere:
g.mu.state.lockWaitStart = g.mu.curLockWaitStart
default:
g.mu.state.lockWaitStart = time.Time{}
}
}

func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(spanSet *spanset.SpanSet) (ok bool) {
Expand Down
71 changes: 38 additions & 33 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ const tagContentionTracer = "locks"
const tagWaitKey = "wait_key"

// tagWaitStart is the tracing span tag indicating when the request started
// waiting on the lock it's currently waiting on.
// waiting on the lock (a unique key,txn pair) it's currently waiting on.
const tagWaitStart = "wait_start"

// tagLockHolderTxn is the tracing span tag indicating the ID of the txn holding
Expand Down Expand Up @@ -1010,15 +1010,18 @@ type contentionTag struct {
waiting bool

// waitStart represents the timestamp when the request started waiting on
// locks in the current iteration of the contentionEventTracer. The wait
// time in previous iterations is accumulated in lockWait. When not waiting
// any more, timeutil.Since(waitStart) is added to lockWait.
// a lock, as defined by a unique (key,txn) pair, in the current iteration
// of the contentionEventTracer. The wait time in previous iterations is
// accumulated in lockWait. When not waiting anymore or when waiting on a
// new (key,txn), timeutil.Since(waitStart) is added to lockWait.
waitStart time.Time

// curState is the current wait state, if any. It is overwritten every time
// the lock table notify()s the contentionEventTracer of a new state. It is
// not set if waiting is false.
curState waitingState
// curStateKey and curStateTxn are the current waitingState's key and txn,
// if any. They are overwritten every time the lock table notify()s the
// contentionEventTracer of a new state. They are not set if waiting is
// false.
curStateKey roachpb.Key
curStateTxn *enginepb.TxnMeta

// numLocks counts the number of locks this contentionEventTracer has seen so
// far, including the one we're currently waiting on (if any).
Expand Down Expand Up @@ -1109,9 +1112,9 @@ func (tag *contentionTag) generateEventLocked() *kvpb.ContentionEvent {
}

return &kvpb.ContentionEvent{
Key: tag.mu.curState.key,
TxnMeta: *tag.mu.curState.txn,
Duration: tag.clock.PhysicalTime().Sub(tag.mu.curState.lockWaitStart),
Key: tag.mu.curStateKey,
TxnMeta: *tag.mu.curStateTxn,
Duration: tag.clock.PhysicalTime().Sub(tag.mu.waitStart),
}
}

Expand All @@ -1128,33 +1131,35 @@ func (tag *contentionTag) notify(ctx context.Context, s waitingState) *kvpb.Cont
// If we're tracking an event and see a different txn/key, the event is
// done and we initialize the new event tracking the new txn/key.
//
// NB: we're guaranteed to have `s.{txn,key}` populated here.
var differentLock bool
if !tag.mu.waiting {
differentLock = true
} else {
curLockHolder, curKey := tag.mu.curState.txn.ID, tag.mu.curState.key
differentLock = !curLockHolder.Equal(s.txn.ID) || !curKey.Equal(s.key)
}
var res *kvpb.ContentionEvent
if differentLock {
res = tag.generateEventLocked()
// NB: we're guaranteed to have `curState{Txn,Key}` populated here.
if tag.mu.waiting {
curLockTxn, curLockKey := tag.mu.curStateTxn.ID, tag.mu.curStateKey
differentLock := !curLockTxn.Equal(s.txn.ID) || !curLockKey.Equal(s.key)
if !differentLock {
return nil
}
}
tag.mu.curState = s
res := tag.generateEventLocked()
tag.mu.waiting = true
if differentLock {
tag.mu.waitStart = tag.clock.PhysicalTime()
tag.mu.numLocks++
return res
tag.mu.curStateKey = s.key
tag.mu.curStateTxn = s.txn
// Accumulate the wait time.
now := tag.clock.PhysicalTime()
if !tag.mu.waitStart.IsZero() {
tag.mu.lockWait += now.Sub(tag.mu.waitStart)
}
return nil
tag.mu.waitStart = now
tag.mu.numLocks++
return res
case doneWaiting, waitQueueMaxLengthExceeded:
// There will be no more state updates; we're done waiting.
res := tag.generateEventLocked()
tag.mu.waiting = false
tag.mu.curState = waitingState{}
tag.mu.lockWait += tag.clock.PhysicalTime().Sub(tag.mu.waitStart)
tag.mu.curStateKey = nil
tag.mu.curStateTxn = nil
// Accumulate the wait time.
now := tag.clock.PhysicalTime()
tag.mu.lockWait += now.Sub(tag.mu.waitStart)
tag.mu.waitStart = time.Time{}
return res
default:
Expand Down Expand Up @@ -1191,15 +1196,15 @@ func (tag *contentionTag) Render() []attribute.KeyValue {
if tag.mu.waiting {
tags = append(tags, attribute.KeyValue{
Key: tagWaitKey,
Value: attribute.StringValue(tag.mu.curState.key.String()),
Value: attribute.StringValue(tag.mu.curStateKey.String()),
})
tags = append(tags, attribute.KeyValue{
Key: tagLockHolderTxn,
Value: attribute.StringValue(tag.mu.curState.txn.ID.String()),
Value: attribute.StringValue(tag.mu.curStateTxn.ID.String()),
})
tags = append(tags, attribute.KeyValue{
Key: tagWaitStart,
Value: attribute.StringValue(tag.mu.curState.lockWaitStart.Format("15:04:05.123")),
Value: attribute.StringValue(tag.mu.waitStart.Format("15:04:05.123")),
})
}
return tags
Expand Down
74 changes: 49 additions & 25 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,19 +942,22 @@ func TestContentionEventTracer(t *testing.T) {
tr := tracing.NewTracer()
ctx, sp := tr.StartSpanCtx(context.Background(), "foo", tracing.WithRecording(tracingpb.RecordingVerbose))
defer sp.Finish()
clock := hlc.NewClockForTesting(nil)
manual := timeutil.NewManualTime(timeutil.Unix(0, 123))
clock := hlc.NewClockForTesting(manual)

txn1 := makeTxnProto("foo")
txn2 := makeTxnProto("bar")

var events []*kvpb.ContentionEvent

h := newContentionEventTracer(sp, clock)
h.SetOnContentionEvent(func(ev *kvpb.ContentionEvent) {
events = append(events, ev)
})
txn := makeTxnProto("foo")
h.notify(ctx, waitingState{
kind: waitForDistinguished,
key: roachpb.Key("a"),
txn: &txn.TxnMeta,
txn: &txn1.TxnMeta,
})
require.Zero(t, h.tag.mu.lockWait)
require.NotZero(t, h.tag.mu.waitStart)
Expand All @@ -966,63 +969,84 @@ func TestContentionEventTracer(t *testing.T) {
require.True(t, ok)
require.Equal(t, "1", val)
_, ok = lockTagGroup.FindTag(tagWaited)
require.False(t, ok)
val, ok = lockTagGroup.FindTag(tagWaitKey)
require.True(t, ok)
_, ok = lockTagGroup.FindTag(tagWaitKey)
require.True(t, ok)
_, ok = lockTagGroup.FindTag(tagWaitStart)
require.Equal(t, "\"a\"", val)
val, ok = lockTagGroup.FindTag(tagWaitStart)
require.True(t, ok)
_, ok = lockTagGroup.FindTag(tagLockHolderTxn)
require.Equal(t, "00:00:00.1112", val)
val, ok = lockTagGroup.FindTag(tagLockHolderTxn)
require.True(t, ok)
require.Equal(t, txn1.ID.String(), val)

// Another event for the same txn/key should not mutate
// or emitLocked an event.
// or emit an event.
prevNumLocks := h.tag.mu.numLocks
manual.Advance(10)
h.notify(ctx, waitingState{
kind: waitFor,
key: roachpb.Key("a"),
txn: &txn.TxnMeta,
txn: &txn1.TxnMeta,
})
require.Empty(t, events)
require.Zero(t, h.tag.mu.lockWait)
require.Equal(t, prevNumLocks, h.tag.mu.numLocks)

// Another event for the same key but different txn should
// emitLocked an event.
manual.Advance(11)
h.notify(ctx, waitingState{
kind: waitFor,
key: roachpb.Key("a"),
txn: &txn2.TxnMeta,
})
require.Equal(t, time.Duration(21) /* 10+11 */, h.tag.mu.lockWait)
require.Len(t, events, 1)
ev := events[0]
require.Equal(t, txn1.TxnMeta, ev.TxnMeta)
require.Equal(t, roachpb.Key("a"), ev.Key)
require.Equal(t, time.Duration(21) /* 10+11 */, ev.Duration)

// Another event for the same txn but different key should
// emit an event.
manual.Advance(12)
h.notify(ctx, waitingState{
kind: waitForDistinguished,
key: roachpb.Key("b"),
txn: &txn.TxnMeta,
txn: &txn2.TxnMeta,
})
require.Zero(t, h.tag.mu.lockWait)
require.Len(t, events, 1)
require.Equal(t, txn.TxnMeta, events[0].TxnMeta)
require.Equal(t, roachpb.Key("a"), events[0].Key)
require.NotZero(t, events[0].Duration)
require.Equal(t, time.Duration(33) /* 10+11+12 */, h.tag.mu.lockWait)
require.Len(t, events, 2)
ev = events[1]
require.Equal(t, txn2.TxnMeta, ev.TxnMeta)
require.Equal(t, roachpb.Key("a"), ev.Key)
require.Equal(t, time.Duration(12), ev.Duration)

manual.Advance(13)
h.notify(ctx, waitingState{kind: doneWaiting})
require.NotZero(t, h.tag.mu.lockWait)
require.Len(t, events, 2)
require.Equal(t, time.Duration(46) /* 10+11+12+13 */, h.tag.mu.lockWait)
require.Len(t, events, 3)

lockWaitBefore := h.tag.mu.lockWait
h.notify(ctx, waitingState{
kind: waitFor,
key: roachpb.Key("b"),
txn: &txn.TxnMeta,
txn: &txn1.TxnMeta,
})
manual.Advance(14)
h.notify(ctx, waitingState{
kind: doneWaiting,
})
require.Len(t, events, 3)
require.Less(t, lockWaitBefore, h.tag.mu.lockWait)
require.Equal(t, time.Duration(60) /* 10+11+12+13+14 */, h.tag.mu.lockWait)
require.Len(t, events, 4)
rec = sp.GetRecording(tracingpb.RecordingVerbose)
lockTagGroup = rec[0].FindTagGroup(tagContentionTracer)
require.NotNil(t, lockTagGroup)

val, ok = lockTagGroup.FindTag(tagNumLocks)
require.True(t, ok)
require.Equal(t, "3", val)

require.Equal(t, "4", val)
_, ok = lockTagGroup.FindTag(tagWaited)
require.True(t, ok)

_, ok = lockTagGroup.FindTag(tagWaitKey)
require.False(t, ok)
_, ok = lockTagGroup.FindTag(tagWaitStart)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ finish req=reqTxn1
----
[-] finish reqTxn1: finishing request
[3] sequence reqTxnMiddle: lock wait-queue event: done waiting
[3] sequence reqTxnMiddle: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 123.000s
[3] sequence reqTxnMiddle: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[3] sequence reqTxnMiddle: acquiring latches
[3] sequence reqTxnMiddle: scanning lock table for conflicting locks
[3] sequence reqTxnMiddle: sequencing complete, returned guard
[4] sequence reqTxn2: lock wait-queue event: wait for (distinguished) txn 00000003 running request @ key "k" (queuedWriters: 1, queuedReaders: 0)
[4] sequence reqTxn2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 123.000s
[4] sequence reqTxn2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s
[4] sequence reqTxn2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false
[4] sequence reqTxn2: pushing txn 00000003 to detect request deadlock
[4] sequence reqTxn2: blocked on select in concurrency_test.(*cluster).PushTransaction
Expand All @@ -132,7 +132,7 @@ finish req=reqTxnMiddle
----
[-] finish reqTxnMiddle: finishing request
[4] sequence reqTxn2: lock wait-queue event: done waiting
[4] sequence reqTxn2: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 123.000s
[4] sequence reqTxn2: conflicted with 00000003-0000-0000-0000-000000000000 on "k" for 0.000s
[4] sequence reqTxn2: acquiring latches
[4] sequence reqTxn2: scanning lock table for conflicting locks
[4] sequence reqTxn2: sequencing complete, returned guard
Expand Down