Skip to content

Commit

Permalink
Merge #37094
Browse files Browse the repository at this point in the history
37094: changefeedccl: signal memBuffer with a channel instead of polling r=nvanbenschoten a=danhhz

With RangeFeed now firmly in place in changefeeds, we're very close to
end-to-end push, which will dramatically improve latencies of row
changes (resolved timestamps, however, will continue to be limited by
closed timestamp latency).

This commit removes one of two remaining places where we poll (the other
is detailed in #36289). No functional change besides more responsiveness
after the first changed kv gets written to memBuffer after it being
empty for a while (and so the old polling would have already backed off
to the max interval).

Touches #36289

Release note: None

Co-authored-by: Daniel Harrison <daniel.harrison@gmail.com>
  • Loading branch information
craig[bot] and danhhz committed Apr 25, 2019
2 parents f96a395 + 886e48d commit d7953f4
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions pkg/ccl/changefeedccl/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)
Expand Down Expand Up @@ -101,14 +100,17 @@ var memBufferColTypes = []types.T{

// memBuffer is an in-memory buffer for changed KV and resolved timestamp
// events. It's size is limited only by the BoundAccount passed to the
// constructor.
// constructor. memBuffer is only for use with single-producer single-consumer.
type memBuffer struct {
metrics *Metrics

mu struct {
syncutil.Mutex
entries rowcontainer.RowContainer
}
// signalCh can be selected on to learn when an entry is written to
// mu.entries.
signalCh chan struct{}

allocMu struct {
syncutil.Mutex
Expand All @@ -117,7 +119,10 @@ type memBuffer struct {
}

func makeMemBuffer(acc mon.BoundAccount, metrics *Metrics) *memBuffer {
b := &memBuffer{metrics: metrics}
b := &memBuffer{
metrics: metrics,
signalCh: make(chan struct{}, 1),
}
b.mu.entries.Init(acc, sqlbase.ColTypeInfoFromColTypes(memBufferColTypes), 0 /* rowCapacity */)
return b
}
Expand Down Expand Up @@ -206,17 +211,17 @@ func (b *memBuffer) addRow(ctx context.Context, row tree.Datums) error {
_, err := b.mu.entries.AddRow(ctx, row)
b.mu.Unlock()
b.metrics.BufferEntriesIn.Inc(1)
select {
case b.signalCh <- struct{}{}:
default:
// Already signaled, don't need to signal again.
}
return err
}

func (b *memBuffer) getRow(ctx context.Context) (tree.Datums, error) {
retryOpts := retry.Options{
InitialBackoff: time.Millisecond,
MaxBackoff: time.Second,
}

var row tree.Datums
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
for {
var row tree.Datums
b.mu.Lock()
if b.mu.entries.Len() > 0 {
row = b.mu.entries.At(0)
Expand All @@ -227,6 +232,11 @@ func (b *memBuffer) getRow(ctx context.Context) (tree.Datums, error) {
b.metrics.BufferEntriesOut.Inc(1)
return row, nil
}

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-b.signalCh:
}
}
return nil, ctx.Err()
}

0 comments on commit d7953f4

Please sign in to comment.