Skip to content

Commit

Permalink
Merge pull request #96182 from miretskiy/backport22.2-95798
Browse files Browse the repository at this point in the history
release-22.2: changefeedcc: Acquire memory prior to issuing scan RPCs
  • Loading branch information
miretskiy authored Jan 30, 2023
2 parents 799d8db + e80b623 commit 1d755df
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 18 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ bulkio.backup.file_size byte size 128 MiB target size for individual data files
bulkio.backup.read_timeout duration 5m0s amount of time after which a read attempt is considered timed out, which causes the backup to fail
bulkio.backup.read_with_priority_after duration 1m0s amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads
bulkio.stream_ingestion.minimum_flush_interval duration 5s the minimum timestamp between flushes; flushes may still occur if internal buffers fill up
changefeed.backfill.scan_request_size integer 524288 the maximum number of bytes returned by each scan request
changefeed.balance_range_distribution.enable boolean false if enabled, the ranges are balanced equally among all nodes
changefeed.event_consumer_worker_queue_size integer 16 if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer
changefeed.event_consumer_workers integer 0 the number of workers to use when processing events: <0 disables, 0 assigns a reasonable default, >0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<tr><td><div id="setting-bulkio-backup-read-timeout" class="anchored"><code>bulkio.backup.read_timeout</code></div></td><td>duration</td><td><code>5m0s</code></td><td>amount of time after which a read attempt is considered timed out, which causes the backup to fail</td></tr>
<tr><td><div id="setting-bulkio-backup-read-with-priority-after" class="anchored"><code>bulkio.backup.read_with_priority_after</code></div></td><td>duration</td><td><code>1m0s</code></td><td>amount of time since the read-as-of time above which a BACKUP should use priority when retrying reads</td></tr>
<tr><td><div id="setting-bulkio-stream-ingestion-minimum-flush-interval" class="anchored"><code>bulkio.stream_ingestion.minimum_flush_interval</code></div></td><td>duration</td><td><code>5s</code></td><td>the minimum timestamp between flushes; flushes may still occur if internal buffers fill up</td></tr>
<tr><td><div id="setting-changefeed-backfill-scan-request-size" class="anchored"><code>changefeed.backfill.scan_request_size</code></div></td><td>integer</td><td><code>524288</code></td><td>the maximum number of bytes returned by each scan request</td></tr>
<tr><td><div id="setting-changefeed-balance-range-distribution-enable" class="anchored"><code>changefeed.balance_range_distribution.enable</code></div></td><td>boolean</td><td><code>false</code></td><td>if enabled, the ranges are balanced equally among all nodes</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-worker-queue-size" class="anchored"><code>changefeed.event_consumer_worker_queue_size</code></div></td><td>integer</td><td><code>16</code></td><td>if changefeed.event_consumer_workers is enabled, this setting sets the maxmimum number of events which a worker can buffer</td></tr>
<tr><td><div id="setting-changefeed-event-consumer-workers" class="anchored"><code>changefeed.event_consumer_workers</code></div></td><td>integer</td><td><code>0</code></td><td>the number of workers to use when processing events: &lt;0 disables, 0 assigns a reasonable default, &gt;0 assigns the setting value. for experimental/core changefeeds and changefeeds using parquet format, this is disabled</td></tr>
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ var ScanRequestSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.backfill.scan_request_size",
"the maximum number of bytes returned by each scan request",
16<<20, // 16 MiB
)
1<<19, // 1/2 MiB
).WithPublic()

// SinkThrottleConfig describes throttling configuration for the sink.
// 0 values for any of the settings disable that setting.
Expand Down
41 changes: 26 additions & 15 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,28 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) {
return nil
}

// AcquireMemory acquires specified number of bytes form the memory monitor,
// blocking acquisition if needed.
func (b *blockingBuffer) AcquireMemory(ctx context.Context, n int64) (alloc Alloc, _ error) {
if l := changefeedbase.PerChangefeedMemLimit.Get(b.sv); n > l {
return alloc, errors.Newf("event size %d exceeds per changefeed limit %d", alloc, l)
}
alloc.init(n, &b.qp)
if err := func() error {
b.req.Lock()
defer b.req.Unlock()
b.req.memRequest = memRequest(n)
if err := b.qp.Acquire(ctx, &b.req); err != nil {
return err
}
return nil
}(); err != nil {
return alloc, err
}
b.metrics.BufferEntriesMemAcquired.Inc(n)
return alloc, nil
}

// Add implements Writer interface.
func (b *blockingBuffer) Add(ctx context.Context, e Event) error {
// Immediately enqueue event if it already has allocation,
Expand All @@ -237,24 +259,13 @@ func (b *blockingBuffer) Add(ctx context.Context, e Event) error {
}

// Acquire the quota first.
alloc := int64(changefeedbase.EventMemoryMultiplier.Get(b.sv) * float64(e.ApproximateSize()))
if l := changefeedbase.PerChangefeedMemLimit.Get(b.sv); alloc > l {
return errors.Newf("event size %d exceeds per changefeed limit %d", alloc, l)
}
e.alloc.init(alloc, &b.qp)
n := int64(changefeedbase.EventMemoryMultiplier.Get(b.sv) * float64(e.ApproximateSize()))
e.bufferAddTimestamp = timeutil.Now()
if err := func() error {
b.req.Lock()
defer b.req.Unlock()
b.req.memRequest = memRequest(alloc)
if err := b.qp.Acquire(ctx, &b.req); err != nil {
return err
}
return nil
}(); err != nil {
alloc, err := b.AcquireMemory(ctx, n)
if err != nil {
return err
}
b.metrics.BufferEntriesMemAcquired.Inc(alloc)
e.alloc = alloc
return b.enqueue(ctx, e)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/kvevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ type Writer interface {
CloseWithReason(ctx context.Context, reason error) error
}

// MemAllocator is an interface for acquiring memory.
type MemAllocator interface {
AcquireMemory(ctx context.Context, n int64) (Alloc, error)
}

// Type indicates the type of the event.
// Different types indicate which methods will be meaningful.
// Events are implemented this way rather than as an interface to remove the
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,21 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg
return errors.CombineErrors(err, g.Wait())
}

var spanAlloc kvevent.Alloc
if allocator, ok := sink.(kvevent.MemAllocator); ok {
// Sink implements memory allocator interface, so acquire
// memory needed to hold scan reply.
spanAlloc, err = allocator.AcquireMemory(ctx, changefeedbase.ScanRequestSize.Get(&p.settings.SV))
if err != nil {
cancel()
return errors.CombineErrors(err, g.Wait())
}
}

g.GoCtx(func(ctx context.Context) error {
defer limAlloc.Release()
defer spanAlloc.Release(ctx)

err := p.exportSpan(ctx, span, cfg.Timestamp, cfg.WithDiff, sink, cfg.Knobs)
finished := atomic.AddInt64(&atomicFinished, 1)
if backfillDec != nil {
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
)

type recordResolvedWriter struct {
resolved []jobspb.ResolvedSpan
resolved []jobspb.ResolvedSpan
memAcquired bool
}

func (r *recordResolvedWriter) Add(ctx context.Context, e kvevent.Event) error {
Expand All @@ -46,6 +47,12 @@ func (r *recordResolvedWriter) CloseWithReason(ctx context.Context, reason error
return nil
}

func (r *recordResolvedWriter) AcquireMemory(ctx context.Context, n int64) (kvevent.Alloc, error) {
// Don't care to actually acquire memory; just testing that we try to do so.
r.memAcquired = true
return kvevent.Alloc{}, nil
}

var _ kvevent.Writer = (*recordResolvedWriter)(nil)

func TestEmitsResolvedDuringScan(t *testing.T) {
Expand Down Expand Up @@ -85,6 +92,7 @@ INSERT INTO t VALUES (1), (2), (3);

sink := &recordResolvedWriter{}
require.NoError(t, scanner.Scan(ctx, sink, cfg))
require.True(t, sink.memAcquired)

startKey := span.Key
require.Equal(t, 3, len(sink.resolved))
Expand Down

0 comments on commit 1d755df

Please sign in to comment.