Skip to content

Commit

Permalink
move ioresult to iorequest to avoid allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
samiskin committed Mar 21, 2023
1 parent a8f13a0 commit 7e7c5ff
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 79 deletions.
19 changes: 13 additions & 6 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ type sinkBatchBuffer struct {

alloc kvevent.Alloc
hasher hash.Hash32

flushErr error
}

// FinalizePayload closes the writer to produce a payload that is ready to be
Expand All @@ -219,6 +221,11 @@ func (sb *sinkBatchBuffer) Keys() intsets.Fast {
return sb.keys
}

// SetError implements the IORequest interface.
func (sb *sinkBatchBuffer) SetError(err error) {
sb.flushErr = err
}

func (sb *sinkBatchBuffer) isEmpty() bool {
return sb.numMessages == 0
}
Expand Down Expand Up @@ -276,7 +283,7 @@ func (bs *batchingSink) runBatchingWorker(ctx context.Context) {
ioEmitter := newParallelIO(ctx, bs.retryOpts, bs.ioWorkers, ioHandler, bs.metrics)
defer ioEmitter.Close()

var handleResult func(result *ioResult)
var handleResult func(result IORequest)

tryFlushBatch := func() {
if batchBuffer.isEmpty() {
Expand Down Expand Up @@ -312,11 +319,11 @@ func (bs *batchingSink) runBatchingWorker(ctx context.Context) {
inflight := 0
var sinkFlushWaiter chan struct{}

handleResult = func(result *ioResult) {
batch, _ := result.request.(*sinkBatchBuffer)
handleResult = func(result IORequest) {
batch, _ := result.(*sinkBatchBuffer)

if result.err != nil {
bs.handleError(result.err)
if batch.flushErr != nil {
bs.handleError(batch.flushErr)
} else {
bs.metrics.recordEmittedBatch(
batch.bufferTime, batch.numMessages, batch.mvcc, batch.numKVBytes, sinkDoesNotCompress,
Expand All @@ -325,7 +332,7 @@ func (bs *batchingSink) runBatchingWorker(ctx context.Context) {

inflight -= batch.numMessages

if (result.err != nil || inflight == 0) && sinkFlushWaiter != nil {
if (batch.flushErr != nil || inflight == 0) && sinkFlushWaiter != nil {
close(sinkFlushWaiter)
sinkFlushWaiter = nil
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,7 @@ var NewWebhookSinkEnabled = settings.RegisterBoolSetting(
"changefeed.new_webhook_sink_enabled",
"if enabled, this setting enables a new implementation of the webhook sink"+
" that allows for a much higher throughput",
true,
// util.ConstantWithMetamorphicTestBool("changefeed.new_webhook_sink_enabled", false),
util.ConstantWithMetamorphicTestBool("changefeed.new_webhook_sink_enabled", false),
)

var SinkParallelism = settings.RegisterIntSetting(
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,11 +830,11 @@ func randomSinkType(opts ...feedTestOption) string {

func randomSinkTypeWithOptions(options feedTestOptions) string {
sinkWeights := map[string]int{
"kafka": 0,
"enterprise": 0,
"kafka": 3,
"enterprise": 1,
"webhook": 1,
"pubsub": 0,
"sinkless": 0,
"pubsub": 1,
"sinkless": 2,
"cloudstorage": 0,
}
if options.externalIODir != "" {
Expand Down
117 changes: 56 additions & 61 deletions pkg/ccl/changefeedccl/parallel_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
// which ordering is preserved.
// Example: if the events [[a,b], [b,c], [c,d], [e,f]] are all submitted in that
// order, [a,b] and [e,f] can be emitted concurrentyl while [b,c] will block
// until [a,b] completes, then [c,d] will block until [b,c] completes.
// If [c,d] errored, [b,c] would never be sent.
// until [a,b] completes, then [c,d] will block until [b,c] completes. If [c,d]
// errored, [b,c] would never be sent, and SetError would be called on [c,d]
// prior to it being returned on resultCh.
type parallelIO struct {
retryOpts retry.Options
wg ctxgroup.Group
Expand All @@ -32,25 +33,20 @@ type parallelIO struct {
ioHandler IOHandler

requestCh chan IORequest
resultCh chan *ioResult
resultCh chan IORequest
}

// IORequest represents an abstract unit of IO that has a set of keys upon which
// sequential ordering of fulfillment must be enforced.
// sequential ordering of fulfillment must be enforced, and allows the storing
// of an error if one is encountered during handling.
type IORequest interface {
Keys() intsets.Fast
SetError(error)
}

// IOHandler performs a blocking IO operation on an IORequest
type IOHandler func(context.Context, IORequest) error

// ioResult contains a completed request along with the error from IOHandler if
// despite retries the request could not be handled.
type ioResult struct {
request IORequest
err error
}

func newParallelIO(
ctx context.Context,
retryOpts retry.Options,
Expand All @@ -65,7 +61,7 @@ func newParallelIO(
metrics: metrics,
ioHandler: handler,
requestCh: make(chan IORequest, numWorkers),
resultCh: make(chan *ioResult, numWorkers),
resultCh: make(chan IORequest, numWorkers),
doneCh: make(chan struct{}),
}

Expand All @@ -77,8 +73,7 @@ func newParallelIO(
}

// Close stops all workers immediately and returns once they shut down. Inflight
// requests sent to requestCh may never result in their corresponding ioResult
// being sent to resultCh.
// requests sent to requestCh may never result in being sent to resultCh.
func (pe *parallelIO) Close() {
close(pe.doneCh)
_ = pe.wg.Wait()
Expand All @@ -99,28 +94,37 @@ func (pe *parallelIO) runWorkers(ctx context.Context, numEmitWorkers int) error
// Multiple worker routines handle the IO operations, retrying when necessary.
emitCh := make(chan IORequest, numEmitWorkers)
defer close(emitCh)
emitResultCh := make(chan *ioResult, numEmitWorkers)
emitSuccessCh := make(chan IORequest, numEmitWorkers)

for i := 0; i < numEmitWorkers; i++ {
pe.wg.GoCtx(func(ctx context.Context) error {
for req := range emitCh {
select {
case <-ctx.Done():
return ctx.Err()
case <-pe.doneCh:
return nil
case emitResultCh <- &ioResult{
err: emitWithRetries(ctx, req),
request: req,
}:
err := emitWithRetries(ctx, req)
if err != nil {
req.SetError(err)
select {
case <-ctx.Done():
return ctx.Err()
case <-pe.doneCh:
return nil
case pe.resultCh <- req:
}
} else {
select {
case <-ctx.Done():
return ctx.Err()
case <-pe.doneCh:
return nil
case emitSuccessCh <- req:
}
}
}
return nil
})
}

var handleResult func(*ioResult)
var pendingResults []*ioResult
var handleSuccess func(IORequest)
var pendingResults []IORequest

sendToWorker := func(ctx context.Context, req IORequest) {
for {
Expand All @@ -131,7 +135,7 @@ func (pe *parallelIO) runWorkers(ctx context.Context, numEmitWorkers int) error
return
case emitCh <- req:
return
case res := <-emitResultCh:
case res := <-emitSuccessCh:
// Must also handle results to avoid the above emit being able to block
// forever on all workers being busy trying to emit results.
pendingResults = append(pendingResults, res)
Expand All @@ -146,43 +150,34 @@ func (pe *parallelIO) runWorkers(ctx context.Context, numEmitWorkers int) error
var inflight intsets.Fast
var pending []IORequest

// do pending keys

handleResult = func(res *ioResult) {
if res.err == nil {
// Clear out the completed keys to check for newly valid pending requests
inflight.DifferenceWith(res.request.Keys())

var stillPending = pending[:0] // Reuse underlying space
for _, pendingReq := range pending {
overlap := res.request.Keys().Intersection(pendingReq.Keys())

// If no overlap, nothing changed for this request
if overlap.Empty() {
stillPending = append(stillPending, pendingReq)
continue
}
handleSuccess = func(req IORequest) {
// Clear out the completed keys to check for newly valid pending requests
inflight.DifferenceWith(req.Keys())

// If it is now free to send, send it
if !inflight.Intersects(pendingReq.Keys()) {
sendToWorker(ctx, pendingReq)
} else {
stillPending = append(stillPending, pendingReq)
}
var stillPending = pending[:0] // Reuse underlying space
for _, pendingReq := range pending {
// If no intersection, nothing changed for this request's validity
if !req.Keys().Intersects(pendingReq.Keys()) {
stillPending = append(stillPending, pendingReq)
continue
}

// Re-add whatever keys in the pending request that were removed
inflight.UnionWith(overlap)
// If it is now free to send, send it
if !inflight.Intersects(pendingReq.Keys()) {
sendToWorker(ctx, pendingReq)
} else {
stillPending = append(stillPending, pendingReq)
}
pending = stillPending
}

// If res.err != nil, inflight remains unchanged thereby blocking any
// further conflicting requests from ever being processed.
// Re-add whatever keys in the pending request that were removed
inflight.UnionWith(pendingReq.Keys())
}
pending = stillPending

select {
case <-ctx.Done():
case <-pe.doneCh:
case pe.resultCh <- res:
case pe.resultCh <- req:
}
}

Expand All @@ -192,10 +187,10 @@ func (pe *parallelIO) runWorkers(ctx context.Context, numEmitWorkers int) error
// inside sendtoWorker, as having a re-entrant sendToWorker -> handleResult
// -> sendToWorker -> handleResult chain creates complexity with managing
// pending requests
unhandledResults := pendingResults
unhandled := pendingResults
pendingResults = nil
for _, res := range unhandledResults {
handleResult(res)
for _, res := range unhandled {
handleSuccess(res)
}

select {
Expand All @@ -211,8 +206,8 @@ func (pe *parallelIO) runWorkers(ctx context.Context, numEmitWorkers int) error
inflight.UnionWith(req.Keys())
pending = append(pending, req)
}
case res := <-emitResultCh:
handleResult(res)
case res := <-emitSuccessCh:
handleSuccess(res)
case <-ctx.Done():
return ctx.Err()
case <-pe.doneCh:
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink_webhook_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (jw *webhookJSONWriter) Close() (SinkPayload, error) {
suffix := fmt.Sprintf("],\"length\":%d}", len(jw.messages))

// Grow all at once to avoid reallocations
buffer.Grow(len(prefix) + jw.numBytes + len(jw.messages) + len(suffix))
buffer.Grow(len(prefix) + jw.numBytes /* kvs */ + len(jw.messages) /* commas */ + len(suffix))

buffer.WriteString(prefix)
for i, msg := range jw.messages {
Expand Down
8 changes: 3 additions & 5 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,19 +1225,17 @@ func registerCDC(r registry.Registry) {

ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m"})

if _, err := ct.DB().Exec("SET CLUSTER SETTING changefeed.new_webhook_sink_enabled = true"); err != nil {
if _, err := ct.DB().Exec("SET CLUSTER SETTING changefeed.new_webhook_sink_enabled = true;"); err != nil {
ct.t.Fatal(err)
}

feed := ct.newChangefeed(feedArgs{
sinkType: webhookSink,
targets: allTpccTargets,
opts: map[string]string{
"metrics_label": "'webhook'",
"initial_scan": "'only'",
// Webhook sink without batching is currently too slow to handle even 1 warehouse
"metrics_label": "'webhook'",
"initial_scan": "'only'",
"webhook_sink_config": `'{"Flush": { "Messages": 100, "Frequency": "5s" } }'`,
// "kafka_sink_config": `'{"Flush": { "Messages": 100, "Frequency": "5s" } }'`,
},
})

Expand Down

0 comments on commit 7e7c5ff

Please sign in to comment.