-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
registry.go
580 lines (541 loc) · 21.3 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package contention
import (
"bytes"
"fmt"
"sort"
"strings"
"time"
"unsafe"
"github.com/biogo/store/llrb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)
// Registry is an object that keeps track of aggregated contention information.
// It can be thought of as three maps:
// 1) The top-level map is a mapping from a tableID/indexID which uniquely
// describes an index to that index's contention information.
// 2) The contention information is itself a map from keys in that index
// (maintained in sorted order for better observability) to a list of
// transactions that caused contention events on the keys.
// 3) These transactions map the transaction ID to the number of times that
// transaction was observed (i.e. how many contention events that txn
// generated).
// It also tracks the information about contention on non-SQL keys separately.
// The datadriven test contains string representations of this struct which make
// it easier to visualize.
type Registry struct {
// globalLock is a coarse-grained lock over the registry which allows for
// concurrent calls to AddContentionEvent. Note that this is not optimal since
// all calls to AddContentionEvent will need to acquire this global lock.
// This can be but is not optimized since this mutex only has the potential to
// be a bottleneck if there is a lot of contention. In this case, there are
// probably bigger problems to worry about.
globalLock syncutil.Mutex
// indexMap is an LRU cache that keeps track of up to indexMapMaxSize
// contended indexes.
indexMap *indexMap
// nonSQLKeysMap is an LRU cache that keeps track of up to
// orderedKeyMapMaxSize non-SQL contended keys.
nonSQLKeysMap *nonSQLKeysMap
}
var (
// indexMapMaxSize specifies the maximum number of indexes a Registry should
// keep track of contention events for.
indexMapMaxSize = 50
// orderedKeyMapMaxSize specifies the maximum number of keys in a given table
// to keep track of contention events for.
orderedKeyMapMaxSize = 50
// maxNumTxns specifies the maximum number of txns that caused contention
// events to keep track of.
maxNumTxns = 10
)
// TODO(asubiotto): Remove once used.
var _ = GetRegistryEstimatedMaxMemoryFootprintInBytes
// GetRegistryEstimatedMaxMemoryFootprintInBytes returns the estimated memory
// footprint of a contention.Registry given an average key size of
// estimatedAverageKeySize bytes.
// Serves to reserve a reasonable amount of memory for this object.
// Around ~4.5MB at the time of writing.
func GetRegistryEstimatedMaxMemoryFootprintInBytes() int {
const estimatedAverageKeySize = 64
txnsMapSize := maxNumTxns * int(unsafe.Sizeof(uuid.UUID{})+unsafe.Sizeof(int(0)))
orderedKeyMapSize := orderedKeyMapMaxSize * (int(unsafe.Sizeof(comparableKey{})+estimatedAverageKeySize) + txnsMapSize)
sqlInfoSize := indexMapMaxSize * (int(unsafe.Sizeof(indexMapKey{})+unsafe.Sizeof(indexMapValue{})) + orderedKeyMapSize)
nonSQLInfoSize := orderedKeyMapSize
return sqlInfoSize + nonSQLInfoSize
}
var orderedKeyMapCfg = cache.Config{
Policy: cache.CacheLRU,
ShouldEvict: func(size int, _, _ interface{}) bool {
return size > orderedKeyMapMaxSize
},
}
var txnCacheCfg = cache.Config{
Policy: cache.CacheLRU,
ShouldEvict: func(size int, _, _ interface{}) bool {
return size > maxNumTxns
},
}
type comparableKey roachpb.Key
// Compare implements llrb.Comparable.
func (c comparableKey) Compare(other llrb.Comparable) int {
return roachpb.Key(c).Compare(roachpb.Key(other.(comparableKey)))
}
type indexMapKey struct {
tableID descpb.ID
indexID descpb.IndexID
}
// indexMapValue is the value associated with an indexMapKey. It contains
// contention information about the tableID/indexID pair.
type indexMapValue struct {
// numContentionEvents is the number of contention events that have happened.
numContentionEvents uint64
// cumulativeContentionTime is the total duration that transactions touching
// this index have spent contended.
cumulativeContentionTime time.Duration
// orderedKeyMap is an LRU cache that keeps track of up to
// orderedKeyMapMaxSize keys that were the subject of contention in the given
// index. The keys are roachpb.Keys and the values are *cache.UnorderedCaches,
// which in turn is a cache of txn IDs of txns that caused contention events
// and the number of times that each txn ID was observed.
orderedKeyMap *cache.OrderedCache
}
// newIndexMapValue creates a new indexMapValue for a contention event
// initialized with that event's data.
func newIndexMapValue(c roachpb.ContentionEvent) *indexMapValue {
txnCache := cache.NewUnorderedCache(txnCacheCfg)
txnCache.Add(c.TxnMeta.ID, uint64(1))
keyMap := cache.NewOrderedCache(orderedKeyMapCfg)
keyMap.Add(comparableKey(c.Key), txnCache)
return &indexMapValue{
numContentionEvents: 1,
cumulativeContentionTime: c.Duration,
orderedKeyMap: keyMap,
}
}
// addContentionEvent adds the given contention event to previously aggregated
// contention data. It assumes that c.Key is a SQL key.
func (v *indexMapValue) addContentionEvent(c roachpb.ContentionEvent) {
v.numContentionEvents++
v.cumulativeContentionTime += c.Duration
var numTimesThisTxnWasEncountered uint64
txnCache, ok := v.orderedKeyMap.Get(comparableKey(c.Key))
if ok {
if txnVal, ok := txnCache.(*cache.UnorderedCache).Get(c.TxnMeta.ID); ok {
numTimesThisTxnWasEncountered = txnVal.(uint64)
}
} else {
// This key was not found in the map. Create a new txn cache for this key.
txnCache = cache.NewUnorderedCache(txnCacheCfg)
v.orderedKeyMap.Add(comparableKey(c.Key), txnCache)
}
txnCache.(*cache.UnorderedCache).Add(c.TxnMeta.ID, numTimesThisTxnWasEncountered+1)
}
// indexMap is a helper struct that wraps an LRU cache sized up to
// indexMapMaxSize and maps tableID/indexID pairs to indexMapValues.
type indexMap struct {
internalCache *cache.UnorderedCache
scratchKey indexMapKey
}
func newIndexMap() *indexMap {
return &indexMap{
internalCache: cache.NewUnorderedCache(cache.Config{
Policy: cache.CacheLRU,
ShouldEvict: func(size int, _, _ interface{}) bool {
return size > indexMapMaxSize
},
}),
}
}
// get gets the indexMapValue keyed by the tableID/indexID pair and returns it
// and true if it exists, nil and false if not. This counts as a cache access.
func (m *indexMap) get(tableID descpb.ID, indexID descpb.IndexID) (*indexMapValue, bool) {
m.scratchKey.tableID = tableID
m.scratchKey.indexID = indexID
v, ok := m.internalCache.Get(m.scratchKey)
if !ok {
return nil, false
}
return v.(*indexMapValue), true
}
// add adds the indexMapValue keyed by the tableID/indexID pair. This counts as
// a cache access.
func (m *indexMap) add(tableID descpb.ID, indexID descpb.IndexID, v *indexMapValue) {
m.scratchKey.tableID = tableID
m.scratchKey.indexID = indexID
m.internalCache.Add(m.scratchKey, v)
}
// nonSQLKeyMapValue is the value associated with a non-SQL key. It contains
// contention information about that key.
type nonSQLKeyMapValue struct {
// numContentionEvents is the number of contention events that have
// happened on the key.
numContentionEvents uint64
// cumulativeContentionTime is the total duration that transactions touching
// the key have spent contended.
cumulativeContentionTime time.Duration
// txnCache is an LRU cache of txn IDs of txns that caused contention events
// and the number of times that each txn ID was observed.
txnCache *cache.UnorderedCache
}
// newNonSQLKeyMapValue creates a new nonSQLKeyMapValue for a contention event
// initialized with that event's data.
func newNonSQLKeyMapValue(c roachpb.ContentionEvent) *nonSQLKeyMapValue {
txnCache := cache.NewUnorderedCache(txnCacheCfg)
txnCache.Add(c.TxnMeta.ID, uint64(1))
return &nonSQLKeyMapValue{
numContentionEvents: 1,
cumulativeContentionTime: c.Duration,
txnCache: txnCache,
}
}
// addContentionEvent adds the given contention event to previously aggregated
// contention data. It assumes that c.Key is a non-SQL key.
func (v *nonSQLKeyMapValue) addContentionEvent(c roachpb.ContentionEvent) {
v.numContentionEvents++
v.cumulativeContentionTime += c.Duration
var numTimesThisTxnWasEncountered uint64
if numTimes, ok := v.txnCache.Get(c.TxnMeta.ID); ok {
numTimesThisTxnWasEncountered = numTimes.(uint64)
}
v.txnCache.Add(c.TxnMeta.ID, numTimesThisTxnWasEncountered+1)
}
// nonSQLKeysMap is a helper struct that wraps an LRU cache sized up to
// orderedKeyMapMaxSize mapping non-SQL keys to nonSQLKeyMapValues.
type nonSQLKeysMap struct {
*cache.OrderedCache
}
func newNonSQLKeysMap() *nonSQLKeysMap {
return &nonSQLKeysMap{
OrderedCache: cache.NewOrderedCache(orderedKeyMapCfg),
}
}
// NewRegistry creates a new Registry.
func NewRegistry() *Registry {
return &Registry{
indexMap: newIndexMap(),
nonSQLKeysMap: newNonSQLKeysMap(),
}
}
// AddContentionEvent adds a new ContentionEvent to the Registry.
func (r *Registry) AddContentionEvent(c roachpb.ContentionEvent) {
_, rawTableID, rawIndexID, err := keys.DecodeTableIDIndexID(c.Key)
if err != nil {
// The key is not a valid SQL key, so we store it in a separate cache.
if v, ok := r.nonSQLKeysMap.Get(comparableKey(c.Key)); !ok {
// This is the first contention event seen for this key.
r.nonSQLKeysMap.Add(comparableKey(c.Key), newNonSQLKeyMapValue(c))
} else {
v.(*nonSQLKeyMapValue).addContentionEvent(c)
}
return
}
tableID := descpb.ID(rawTableID)
indexID := descpb.IndexID(rawIndexID)
r.globalLock.Lock()
defer r.globalLock.Unlock()
if v, ok := r.indexMap.get(tableID, indexID); !ok {
// This is the first contention event seen for the given tableID/indexID
// pair.
r.indexMap.add(tableID, indexID, newIndexMapValue(c))
} else {
v.addContentionEvent(c)
}
}
func serializeTxnCache(txnCache *cache.UnorderedCache) []contentionpb.SingleTxnContention {
txns := make([]contentionpb.SingleTxnContention, txnCache.Len())
var txnCount int
txnCache.Do(func(e *cache.Entry) {
txns[txnCount].TxnID = e.Key.(uuid.UUID)
txns[txnCount].Count = e.Value.(uint64)
txnCount++
})
sortSingleTxnContention(txns)
return txns
}
// Serialize returns the serialized representation of the registry. Refer to
// comments on contentionpb.SerializedRegistry for the orderings that are
// maintained at different levels of objects.
func (r *Registry) Serialize() contentionpb.SerializedRegistry {
r.globalLock.Lock()
defer r.globalLock.Unlock()
var resp contentionpb.SerializedRegistry
// Process all SQL keys information.
resp.IndexContentionEvents = make([]contentionpb.IndexContentionEvents, r.indexMap.internalCache.Len())
var iceCount int
r.indexMap.internalCache.Do(func(e *cache.Entry) {
ice := &resp.IndexContentionEvents[iceCount]
key := e.Key.(indexMapKey)
ice.TableID = key.tableID
ice.IndexID = key.indexID
v := e.Value.(*indexMapValue)
ice.NumContentionEvents = v.numContentionEvents
ice.CumulativeContentionTime = v.cumulativeContentionTime
ice.Events = make([]contentionpb.SingleKeyContention, v.orderedKeyMap.Len())
var skcCount int
v.orderedKeyMap.Do(func(k, txnCacheInterface interface{}) bool {
txnCache := txnCacheInterface.(*cache.UnorderedCache)
skc := &ice.Events[skcCount]
skc.Key = roachpb.Key(k.(comparableKey))
skc.Txns = serializeTxnCache(txnCache)
skcCount++
return false
})
iceCount++
})
sortIndexContentionEvents(resp.IndexContentionEvents)
// Process all non-SQL keys information.
resp.NonSQLKeysContention = make([]contentionpb.SingleNonSQLKeyContention, r.nonSQLKeysMap.Len())
var snkcCount int
r.nonSQLKeysMap.Do(func(k, nonSQLKeyMapVal interface{}) bool {
snkc := &resp.NonSQLKeysContention[snkcCount]
snkc.Key = roachpb.Key(k.(comparableKey))
v := nonSQLKeyMapVal.(*nonSQLKeyMapValue)
snkc.NumContentionEvents = v.numContentionEvents
snkc.CumulativeContentionTime = v.cumulativeContentionTime
snkc.Txns = serializeTxnCache(v.txnCache)
snkcCount++
return false
})
return resp
}
// sortIndexContentionEvents sorts all of the index contention events in-place
// according to their importance (as defined by the total number of contention
// events).
func sortIndexContentionEvents(ice []contentionpb.IndexContentionEvents) {
sort.Slice(ice, func(i, j int) bool {
if ice[i].NumContentionEvents != ice[j].NumContentionEvents {
return ice[i].NumContentionEvents > ice[j].NumContentionEvents
}
return ice[i].CumulativeContentionTime > ice[j].CumulativeContentionTime
})
}
// sortSingleTxnContention sorts the transactions in-place according to the
// frequency of their occurrence in DESC order.
func sortSingleTxnContention(txns []contentionpb.SingleTxnContention) {
sort.Slice(txns, func(i, j int) bool {
return txns[i].Count > txns[j].Count
})
}
// String returns a string representation of the Registry.
func (r *Registry) String() string {
var b strings.Builder
serialized := r.Serialize()
for i := range serialized.IndexContentionEvents {
b.WriteString(serialized.IndexContentionEvents[i].String())
}
for i := range serialized.NonSQLKeysContention {
b.WriteString(serialized.NonSQLKeysContention[i].String())
}
return b.String()
}
// MergeSerializedRegistries merges the serialized representations of two
// Registries into one. first is modified in-place.
//
// The result will contain at most indexMapMaxSize number of
// IndexContentionEvents objects (with the most important objects - as defined
// by the total number of contention events - kept from both arguments) and at
// most orderedKeyMapMaxSize number of SingleNonSQLKeyContention objects (those
// with smaller lexicographically keys are kept from both arguments). Other
// constants (orderedKeyMapMaxSize and maxNumTxns) are also respected by the
// internal slices.
//
// The returned representation has the same ordering guarantees as described for
// contentionpb.SerializedRegistry.
func MergeSerializedRegistries(
first, second contentionpb.SerializedRegistry,
) contentionpb.SerializedRegistry {
// Merge IndexContentionEvents.
firstICE, secondICE := first.IndexContentionEvents, second.IndexContentionEvents
for s := range secondICE {
found := false
for f := range firstICE {
if firstICE[f].TableID == secondICE[s].TableID && firstICE[f].IndexID == secondICE[s].IndexID {
firstICE[f] = mergeIndexContentionEvents(firstICE[f], secondICE[s])
found = true
break
}
}
if !found {
firstICE = append(firstICE, secondICE[s])
}
}
// Sort all of the index contention events so that more frequent ones are at
// the front and then truncate if needed.
sortIndexContentionEvents(firstICE)
if len(firstICE) > indexMapMaxSize {
firstICE = firstICE[:indexMapMaxSize]
}
first.IndexContentionEvents = firstICE
// Merge SingleNonSQLKeyContention.
//
// Go over the non-SQL keys from both inputs and merge them so that we stay
// under the limit. We take advantage of the fact that non-SQL keys for both
// inputs are already ordered.
//
// Note that this code is basically a copy of the corresponding code in
// mergeIndexContentionEvents, but it is very hard to abstract it away and
// reuse.
firstNKC, secondNKC := first.NonSQLKeysContention, second.NonSQLKeysContention
maxNumNonSQLKeys := len(firstNKC) + len(secondNKC)
if maxNumNonSQLKeys > orderedKeyMapMaxSize {
maxNumNonSQLKeys = orderedKeyMapMaxSize
}
result := make([]contentionpb.SingleNonSQLKeyContention, maxNumNonSQLKeys)
resultIdx, firstIdx, secondIdx := 0, 0, 0
// Iterate while we haven't taken enough non-SQL keys and at least one input
// is not exhausted.
for resultIdx < maxNumNonSQLKeys && (firstIdx < len(firstNKC) || secondIdx < len(secondNKC)) {
var cmp int
if firstIdx == len(firstNKC) {
// We've exhausted the list of non-SQL keys from the first input, so
// we will need to take from the second input.
cmp = 1
} else if secondIdx == len(secondNKC) {
// We've exhausted the list of non-SQL keys from the second input,
// so we will need to take from the first input.
cmp = -1
} else {
cmp = firstNKC[firstIdx].Key.Compare(secondNKC[secondIdx].Key)
}
if cmp == 0 {
// The keys are the same, so we're merging the contention
// information from both inputs.
f := firstNKC[firstIdx]
s := secondNKC[secondIdx]
result[resultIdx].Key = f.Key
result[resultIdx].NumContentionEvents = f.NumContentionEvents + s.NumContentionEvents
result[resultIdx].CumulativeContentionTime = f.CumulativeContentionTime + s.CumulativeContentionTime
result[resultIdx].Txns = mergeSingleKeyContention(f.Txns, s.Txns)
firstIdx++
secondIdx++
} else if cmp < 0 {
// The first non-SQL key is smaller, so we will take it as is.
result[resultIdx] = firstNKC[firstIdx]
firstIdx++
} else {
// The second non-SQL key is smaller, so we will take it as is.
result[resultIdx] = secondNKC[secondIdx]
secondIdx++
}
resultIdx++
}
// We might have merged the contention information for some non-SQL so that
// we allocated more than necessary, so we need to truncate if that's the
// case.
result = result[:resultIdx]
first.NonSQLKeysContention = result
return first
}
// mergeIndexContentionEvents merges two lists of contention events that
// occurred on the same index. It will panic if the indexes are different.
//
// The result will contain at most orderedKeyMapMaxSize number of single key
// contention events (ordered by the keys).
func mergeIndexContentionEvents(
first contentionpb.IndexContentionEvents, second contentionpb.IndexContentionEvents,
) contentionpb.IndexContentionEvents {
if first.TableID != second.TableID || first.IndexID != second.IndexID {
panic(fmt.Sprintf("attempting to merge contention events from different indexes\n%s%s", first.String(), second.String()))
}
var result contentionpb.IndexContentionEvents
result.TableID = first.TableID
result.IndexID = first.IndexID
result.NumContentionEvents = first.NumContentionEvents + second.NumContentionEvents
result.CumulativeContentionTime = first.CumulativeContentionTime + second.CumulativeContentionTime
// Go over the events from both inputs and merge them so that we stay under
// the limit. We take advantage of the fact that events for both inputs are
// already ordered by their keys.
//
// Note that this code is basically a copy of the corresponding code in
// MergeSerializedRegistries for non-SQL keys, but it is very hard to
// abstract it away and reuse.
maxNumEvents := len(first.Events) + len(second.Events)
if maxNumEvents > orderedKeyMapMaxSize {
maxNumEvents = orderedKeyMapMaxSize
}
result.Events = make([]contentionpb.SingleKeyContention, maxNumEvents)
resultIdx, firstIdx, secondIdx := 0, 0, 0
// Iterate while we haven't taken enough events and at least one input is
// not exhausted.
for resultIdx < maxNumEvents && (firstIdx < len(first.Events) || secondIdx < len(second.Events)) {
var cmp int
if firstIdx == len(first.Events) {
// We've exhausted the list of events from the first input, so we
// will need to take from the second input.
cmp = 1
} else if secondIdx == len(second.Events) {
// We've exhausted the list of events from the second input, so we
// will need to take from the first input.
cmp = -1
} else {
cmp = first.Events[firstIdx].Key.Compare(second.Events[secondIdx].Key)
}
if cmp == 0 {
// The keys are the same, so we're merging the events from both
// inputs.
f := first.Events[firstIdx]
s := second.Events[secondIdx]
result.Events[resultIdx].Key = f.Key
result.Events[resultIdx].Txns = mergeSingleKeyContention(f.Txns, s.Txns)
firstIdx++
secondIdx++
} else if cmp < 0 {
// The first event is smaller, so we will take it as is.
result.Events[resultIdx] = first.Events[firstIdx]
firstIdx++
} else {
// The second event is smaller, so we will take it as is.
result.Events[resultIdx] = second.Events[secondIdx]
secondIdx++
}
resultIdx++
}
// We might have merged some events so that we allocated more than
// necessary, so we need to truncate if that's the case.
result.Events = result.Events[:resultIdx]
return result
}
// mergeSingleKeyContention merges two lists of contention events that occurred
// on the same key updating first in-place. Objects are ordered by the count
// (after merge when applicable) in DESC order.
//
// The result will contain at most maxNumTxns number of transactions.
func mergeSingleKeyContention(
first, second []contentionpb.SingleTxnContention,
) []contentionpb.SingleTxnContention {
for s := range second {
found := false
for f := range first {
if bytes.Equal(first[f].TxnID.GetBytes(), second[s].TxnID.GetBytes()) {
first[f].Count += second[s].Count
found = true
break
}
}
if !found {
first = append(first, second[s])
}
}
// Sort all of the transactions so that more frequent ones are at the front
// and then truncate if needed.
sortSingleTxnContention(first)
if len(first) > maxNumTxns {
first = first[:maxNumTxns]
}
return first
}