Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
samiskin committed Mar 21, 2023
1 parent 7e7c5ff commit f3c1faf
Show file tree
Hide file tree
Showing 4 changed files with 773 additions and 2,814 deletions.
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,11 +830,11 @@ func randomSinkType(opts ...feedTestOption) string {

func randomSinkTypeWithOptions(options feedTestOptions) string {
sinkWeights := map[string]int{
"kafka": 3,
"enterprise": 1,
"kafka": 0,
"enterprise": 0,
"webhook": 1,
"pubsub": 1,
"sinkless": 2,
"pubsub": 0,
"sinkless": 0,
"cloudstorage": 0,
}
if options.externalIODir != "" {
Expand Down
45 changes: 22 additions & 23 deletions pkg/ccl/changefeedccl/parallel_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,25 +154,17 @@ func (pe *parallelIO) runWorkers(ctx context.Context, numEmitWorkers int) error
// Clear out the completed keys to check for newly valid pending requests
inflight.DifferenceWith(req.Keys())

var stillPending = pending[:0] // Reuse underlying space
for _, pendingReq := range pending {
// If no intersection, nothing changed for this request's validity
if !req.Keys().Intersects(pendingReq.Keys()) {
stillPending = append(stillPending, pendingReq)
continue
}

// If it is now free to send, send it
if !inflight.Intersects(pendingReq.Keys()) {
pendingKeys := intsets.Fast{}
for i, pendingReq := range pending {
if !inflight.Intersects(pendingReq.Keys()) && !pendingKeys.Intersects(pendingReq.Keys()) {
inflight.UnionWith(pendingReq.Keys())
pending = append(pending[:i], pending[i+1:]...)
sendToWorker(ctx, pendingReq)
} else {
stillPending = append(stillPending, pendingReq)
break
}

// Re-add whatever keys in the pending request that were removed
inflight.UnionWith(pendingReq.Keys())
pendingKeys.UnionWith(pendingReq.Keys())
}
pending = stillPending

select {
case <-ctx.Done():
Expand All @@ -181,6 +173,18 @@ func (pe *parallelIO) runWorkers(ctx context.Context, numEmitWorkers int) error
}
}

keysInFlight := func(keys intsets.Fast) bool {
if inflight.Intersects(keys) {
return true
}
for _, pendingReq := range pending {
if pendingReq.Keys().Intersects(keys) {
return true
}
}
return false
}

for {
// Results read from sendToWorker need to be first added to a pendingResults
// list and then handled separately here rather than calling handleResult
Expand All @@ -195,16 +199,11 @@ func (pe *parallelIO) runWorkers(ctx context.Context, numEmitWorkers int) error

select {
case req := <-pe.requestCh:
if !inflight.Intersects(req.Keys()) {
inflight.UnionWith(req.Keys())
sendToWorker(ctx, req)
if keysInFlight(req.Keys()) {
pending = append(pending, req)
} else {
// Even if the request isn't going to be immediately sent out, it must
// still be considered "inflight" as future incoming events overlapping
// its keys must not be sent until this event is removed from the queue
// and successfully emitted.
inflight.UnionWith(req.Keys())
pending = append(pending, req)
sendToWorker(ctx, req)
}
case res := <-emitSuccessCh:
handleSuccess(res)
Expand Down
Loading

0 comments on commit f3c1faf

Please sign in to comment.