-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
buffer.go
92 lines (80 loc) · 2.98 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package spanconfigsqlwatcher
import (
"context"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
// buffer is a helper struct for the SQLWatcher. It buffers events generated by
// the SQLWatcher's rangefeeds over system.zones and system.descriptors. All
// methods lock internally so they can be called concurrently.
//
// The buffer tracks frontier timestamps for both these rangefeeds as well. It
// maintains the notion of the combined frontier timestamp computed as the
// minimum of the two. This is used when flushing the buffer periodically.
type buffer struct {
mu struct {
syncutil.Mutex
// rangefeed.Buffer stores spanconfigsqlwatcher.Events.
buffer *rangefeedbuffer.Buffer
// rangefeedFrontiers tracks the frontier timestamps of individual
// rangefeeds established by the SQLWatcher.
rangefeedFrontiers [numRangefeeds]hlc.Timestamp
}
}
// newBuffer constructs and returns a new buffer.
func newBuffer() *buffer {
rangefeedBuffer := rangefeedbuffer.New(100 /* limit */)
eventBuffer := &buffer{}
eventBuffer.mu.buffer = rangefeedBuffer
return eventBuffer
}
// advance advances the frontier for the given rangefeed.
func (b *buffer) advance(rangefeed rangefeedKind, timestamp hlc.Timestamp) {
b.mu.Lock()
defer b.mu.Unlock()
b.mu.rangefeedFrontiers[rangefeed].Forward(timestamp)
}
// add records the given event in the buffer.
func (b *buffer) add(ev event) error {
b.mu.Lock()
defer b.mu.Unlock()
return b.mu.buffer.Add(ev)
}
// flush computes the combined frontier timestamp of the buffer and returns a
// list of unique spanconfig.SQLWatcherUpdates below this timestamp. The
// combined frontier timestamp is also returned.
func (b *buffer) flush(
ctx context.Context,
) (updates []spanconfig.SQLWatcherUpdate, combinedFrontierTS hlc.Timestamp) {
b.mu.Lock()
defer b.mu.Unlock()
seenIDs := make(map[descpb.ID]struct{})
// First we determine the checkpoint timestamp, which is the minimum
// checkpoint timestamp of all event types.
combinedFrontierTS = hlc.MaxTimestamp
for _, ts := range b.mu.rangefeedFrontiers {
combinedFrontierTS.Backward(ts)
}
events := b.mu.buffer.Flush(ctx, combinedFrontierTS)
for _, ev := range events {
update := ev.(event).update
// De-duplicate IDs from the returned result.
if _, seen := seenIDs[update.ID]; !seen {
seenIDs[update.ID] = struct{}{}
updates = append(updates, update)
}
}
return updates, combinedFrontierTS
}