Skip to content

Commit

Permalink
Merge #43786
Browse files Browse the repository at this point in the history
43786: colexec: add external sort r=yuzefovich a=yuzefovich

**execinfra: minor refactor of getting working memory limit**

This commit introduces a helper method for getting the working memory
limit in bytes from a combination of testing knobs and a cluster
setting.

Release note: None

**colexec: add simple implementation of an external sort**

This commit adds a simple implementation of external sort that works
in two stages:
1. it will use a combination of a newly introduced input partitioner
and in-memory sorter to divide up all batches from the input into
partitions, sort each partition in memory, and write sorted partitions
to disk
2. it will use OrderedSynchronizer to merge the partitions.

Closes: #42409.

Release note: None

**colexec: refactor all spooler to not require allocator when exporting**

This commit modifies 'spooler' interface to add a method that allows
accessing spooled tuples as "windowed batches" so that when exporting
buffered tuples an allocator is not required and no copying is not
necessary. Instead, windowed batches are returned which are safe to use
until the next call and need to be copied somewhere else. Such behavior
is acceptable because these batches will be spilled to disk.

Release note: None

**cmd: increase timeout and make TESTS regex more specific in pull-request-make**

This commit increases the timeout from 5 to 10 minutes (I ran into
a situation where the CI `testrace` was timing out). Also, it adds `$$`
symbols to the end of each test so that full length matching occurred
instead of prefix matching (e.g. `TestSort` also matches
`TestSortedDistinct` which could be not modified in the PR to run
`testrace` for).

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Jan 21, 2020
2 parents cdee208 + c90fcb1 commit f231ee8
Show file tree
Hide file tree
Showing 25 changed files with 999 additions and 243 deletions.
6 changes: 3 additions & 3 deletions pkg/cmd/github-pull-request-make/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ func main() {
log.Fatal(err)
}
if len(pkgs) > 0 {
// 5 minutes total seems OK, but at least a minute per test.
duration := (5 * time.Minute) / time.Duration(len(pkgs))
// 10 minutes total seems OK, but at least a minute per test.
duration := (10 * time.Minute) / time.Duration(len(pkgs))
if duration < time.Minute {
duration = time.Minute
}
Expand All @@ -267,7 +267,7 @@ func main() {
for name, pkg := range pkgs {
tests := "-"
if len(pkg.tests) > 0 {
tests = "(" + strings.Join(pkg.tests, "|") + ")"
tests = "(" + strings.Join(pkg.tests, "$$|") + "$$)"
}

cmd := exec.Command(
Expand Down
8 changes: 4 additions & 4 deletions pkg/col/coldata/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (b *Bytes) AssertOffsetsAreNonDecreasing(n uint64) {
// set on it.
func (b *Bytes) UpdateOffsetsToBeNonDecreasing(n uint64) {
// Note that we're not checking whether this Bytes is a window because
// although this function modifies the "window" Bytes, it maintains the
// although this function might modify the "window" Bytes, it maintains the
// invariant that we need to have.
prev := b.offsets[0]
for j := uint64(1); j <= n; j++ {
Expand All @@ -90,9 +90,9 @@ func (b *Bytes) UpdateOffsetsToBeNonDecreasing(n uint64) {
// b.maxSetIndex+1 are non-decreasing. Note that this method can be a noop when
// i <= b.maxSetIndex+1.
func (b *Bytes) maybeBackfillOffsets(i int) {
if b.isWindow {
panic("maybeBackfillOffsets is called on a window into Bytes")
}
// Note that we're not checking whether this Bytes is a window because
// although this function might modify the "window" Bytes, it maintains the
// invariant that we need to have.
for j := b.maxSetIndex + 2; j <= i; j++ {
b.offsets[j] = b.offsets[b.maxSetIndex+1]
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/sql/colexec/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,15 @@ func (a *Allocator) PerformOperation(destVecs []coldata.Vec, operation func()) {
}
}

// TODO(yuzefovich): extend Allocator so that it could free up the memory (and
// resize the memory account accordingly) when the caller is done with the
// batches.
// Used returns the number of bytes currently allocated through this allocator.
func (a *Allocator) Used() int64 {
return a.acc.Used()
}

// Clear clears up the memory account of the allocator.
func (a *Allocator) Clear() {
a.acc.Clear(a.ctx)
}

const (
sizeOfBool = int(unsafe.Sizeof(true))
Expand Down
16 changes: 8 additions & 8 deletions pkg/sql/colexec/and_or_projection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ func TestAndOrOps(t *testing.T) {
},
}
args := NewColOperatorArgs{
Spec: spec,
Inputs: input,
StreamingMemAccount: testMemAcc,
UseStreamingMemAccountForBuffering: true,
Spec: spec,
Inputs: input,
StreamingMemAccount: testMemAcc,
}
args.TestingKnobs.UseStreamingMemAccountForBuffering = true
result, err := NewColOperator(ctx, flowCtx, args)
if err != nil {
return nil, err
Expand Down Expand Up @@ -279,11 +279,11 @@ func benchmarkLogicalProjOp(
}

args := NewColOperatorArgs{
Spec: spec,
Inputs: []Operator{input},
StreamingMemAccount: testMemAcc,
UseStreamingMemAccountForBuffering: true,
Spec: spec,
Inputs: []Operator{input},
StreamingMemAccount: testMemAcc,
}
args.TestingKnobs.UseStreamingMemAccountForBuffering = true
result, err := NewColOperator(ctx, flowCtx, args)
if err != nil {
b.Fatal(err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/case_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ func TestCaseOp(t *testing.T) {
spec.Input[0].ColumnTypes = tc.inputTypes
spec.Post.RenderExprs[0].Expr = tc.renderExpr
args := NewColOperatorArgs{
Spec: spec,
Inputs: inputs,
StreamingMemAccount: testMemAcc,
UseStreamingMemAccountForBuffering: true,
Spec: spec,
Inputs: inputs,
StreamingMemAccount: testMemAcc,
}
args.TestingKnobs.UseStreamingMemAccountForBuffering = true
result, err := NewColOperator(ctx, flowCtx, args)
if err != nil {
return nil, err
Expand Down
26 changes: 12 additions & 14 deletions pkg/sql/colexec/disk_spiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ type bufferingInMemoryOperator interface {
//
// Calling ExportBuffered may invalidate the contents of the last batch
// returned by ExportBuffered.
// TODO(yuzefovich): it might be possible to avoid the need for an Allocator
// when exporting buffered tuples. This will require the refactor of the
// buffering in-memory operators.
ExportBuffered(*Allocator) coldata.Batch
ExportBuffered() coldata.Batch
}

// oneInputDiskSpiller is an Operator that manages the fallback from an
Expand Down Expand Up @@ -78,22 +75,20 @@ type bufferingInMemoryOperator interface {
type oneInputDiskSpiller struct {
NonExplainable

allocator *Allocator
initialized bool
spilled bool

input Operator
inMemoryOp bufferingInMemoryOperator
inMemoryMemMonitorName string
diskBackedOp Operator
spillingCallbackFn func()
}

var _ Operator = &oneInputDiskSpiller{}

// newOneInputDiskSpiller returns a new oneInputDiskSpiller. It takes the
// following arguments:
// - allocator - this Allocator is used (if spilling occurs) when copying the
// buffered tuples from the in-memory operator into the disk-backed one.
// - inMemoryOp - the in-memory operator that will be consuming input and doing
// computations until it either successfully processes the whole input or
// reaches its memory limit.
Expand All @@ -104,20 +99,22 @@ var _ Operator = &oneInputDiskSpiller{}
// operator when given an input operator. We take in a constructor rather
// than an already created operator in order to hide the complexity of buffer
// exporting operator that serves as the input to the disk-backed operator.
// - spillingCallbackFn will be called when the spilling from in-memory to disk
// backed operator occurs. It should only be set in tests.
func newOneInputDiskSpiller(
allocator *Allocator,
input Operator,
inMemoryOp bufferingInMemoryOperator,
inMemoryMemMonitorName string,
diskBackedOpConstructor func(input Operator) Operator,
spillingCallbackFn func(),
) Operator {
diskBackedOpInput := newBufferExportingOperator(allocator, inMemoryOp, input)
diskBackedOpInput := newBufferExportingOperator(inMemoryOp, input)
return &oneInputDiskSpiller{
allocator: allocator,
input: input,
inMemoryOp: inMemoryOp,
inMemoryMemMonitorName: inMemoryMemMonitorName,
diskBackedOp: diskBackedOpConstructor(diskBackedOpInput),
spillingCallbackFn: spillingCallbackFn,
}
}

Expand Down Expand Up @@ -147,6 +144,9 @@ func (d *oneInputDiskSpiller) Next(ctx context.Context) coldata.Batch {
if sqlbase.IsOutOfMemoryError(err) &&
strings.Contains(err.Error(), d.inMemoryMemMonitorName) {
d.spilled = true
if d.spillingCallbackFn != nil {
d.spillingCallbackFn()
}
d.diskBackedOp.Init()
return d.Next(ctx)
}
Expand Down Expand Up @@ -203,7 +203,6 @@ type bufferExportingOperator struct {
ZeroInputNode
NonExplainable

allocator *Allocator
firstSource bufferingInMemoryOperator
secondSource Operator
firstSourceDone bool
Expand All @@ -212,10 +211,9 @@ type bufferExportingOperator struct {
var _ Operator = &bufferExportingOperator{}

func newBufferExportingOperator(
allocator *Allocator, firstSource bufferingInMemoryOperator, secondSource Operator,
firstSource bufferingInMemoryOperator, secondSource Operator,
) Operator {
return &bufferExportingOperator{
allocator: allocator,
firstSource: firstSource,
secondSource: secondSource,
}
Expand All @@ -230,7 +228,7 @@ func (b *bufferExportingOperator) Next(ctx context.Context) coldata.Batch {
if b.firstSourceDone {
return b.secondSource.Next(ctx)
}
batch := b.firstSource.ExportBuffered(b.allocator)
batch := b.firstSource.ExportBuffered()
if batch.Length() == 0 {
b.firstSourceDone = true
return b.Next(ctx)
Expand Down
91 changes: 62 additions & 29 deletions pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,23 @@ func wrapRowSources(
// NewColOperatorArgs is a helper struct that encompasses all of the input
// arguments to NewColOperator call.
type NewColOperatorArgs struct {
Spec *execinfrapb.ProcessorSpec
Inputs []Operator
StreamingMemAccount *mon.BoundAccount
// UseStreamingMemAccountForBuffering specifies whether to use
// StreamingMemAccount when creating buffering operators and should only be
// set to 'true' in tests.
UseStreamingMemAccountForBuffering bool
ProcessorConstructor execinfra.ProcessorConstructor
Spec *execinfrapb.ProcessorSpec
Inputs []Operator
StreamingMemAccount *mon.BoundAccount
ProcessorConstructor execinfra.ProcessorConstructor
TestingKnobs struct {
// UseStreamingMemAccountForBuffering specifies whether to use
// StreamingMemAccount when creating buffering operators and should only be
// set to 'true' in tests. The idea behind this flag is reducing the number
// of memory accounts and monitors we need to close, so we plumbed it into
// the planning code so that it doesn't create extra memory monitoring
// infrastructure (and so that we could use testMemAccount defined in
// main_test.go).
UseStreamingMemAccountForBuffering bool
// SpillingCallbackFn will be called when the spilling from an in-memory to
// disk-backed operator occurs. It should only be set in tests.
SpillingCallbackFn func()
}
}

// NewColOperatorResult is a helper struct that encompasses all of the return
Expand Down Expand Up @@ -364,7 +373,7 @@ func NewColOperator(
spec := args.Spec
inputs := args.Inputs
streamingMemAccount := args.StreamingMemAccount
useStreamingMemAccountForBuffering := args.UseStreamingMemAccountForBuffering
useStreamingMemAccountForBuffering := args.TestingKnobs.UseStreamingMemAccountForBuffering
processorConstructor := args.ProcessorConstructor

log.VEventf(ctx, 2, "planning col operator for spec %q", spec)
Expand Down Expand Up @@ -557,7 +566,7 @@ func NewColOperator(
if needHash {
hashAggregatorMemAccount := streamingMemAccount
if !useStreamingMemAccountForBuffering {
hashAggregatorMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "hash-aggregator-limited")
hashAggregatorMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "hash-aggregator")
}
result.Op, err = NewHashAggregator(
NewAllocator(ctx, hashAggregatorMemAccount), inputs[0], typs, aggFns,
Expand Down Expand Up @@ -627,7 +636,7 @@ func NewColOperator(

hashJoinerMemAccount := streamingMemAccount
if !useStreamingMemAccountForBuffering {
hashJoinerMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "hash-joiner-limited")
hashJoinerMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "hash-joiner")
}
result.Op, err = NewEqHashJoinerOp(
NewAllocator(ctx, hashJoinerMemAccount),
Expand Down Expand Up @@ -708,7 +717,7 @@ func NewColOperator(
mergeJoinerMemAccount := streamingMemAccount
if !result.IsStreaming && !useStreamingMemAccountForBuffering {
// Whether the merge joiner is streaming is already set above.
mergeJoinerMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "merge-joiner-limited")
mergeJoinerMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "merge-joiner")
}
result.Op, err = NewMergeJoinOp(
NewAllocator(ctx, mergeJoinerMemAccount),
Expand Down Expand Up @@ -751,7 +760,7 @@ func NewColOperator(
if useStreamingMemAccountForBuffering {
sortChunksMemAccount = streamingMemAccount
} else {
sortChunksMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "sort-chunks-limited")
sortChunksMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "sort-chunks")
}
result.Op, err = NewSortChunks(
NewAllocator(ctx, sortChunksMemAccount), input, inputTypes,
Expand All @@ -769,7 +778,7 @@ func NewColOperator(
result.IsStreaming = true
} else {
// No optimizations possible. Default to the standard sort operator.
sorterMemMonitorName := fmt.Sprintf("sort-all-limited-%d", spec.ProcessorID)
sorterMemMonitorName := fmt.Sprintf("sort-all-%d", spec.ProcessorID)
var sorterMemAccount *mon.BoundAccount
if useStreamingMemAccountForBuffering {
sorterMemAccount = streamingMemAccount
Expand All @@ -784,22 +793,31 @@ func NewColOperator(
if err != nil {
return result, err
}
var diskSpillerMemAccount *mon.BoundAccount
if useStreamingMemAccountForBuffering {
diskSpillerMemAccount = streamingMemAccount
} else {
diskSpillerMemAccount = result.createBufferingMemAccount(
ctx, flowCtx, "disk-spiller-sort-all-limited",
)
}
diskSpillerAllocator := NewAllocator(ctx, diskSpillerMemAccount)
result.Op = newOneInputDiskSpiller(
diskSpillerAllocator,
input, inMemorySorter.(bufferingInMemoryOperator),
sorterMemMonitorName,
func(input Operator) Operator {
return newExternalSorter(diskSpillerAllocator, input, inputTypes, orderingCols)
})
monitorNamePrefix := "external-sorter-"
// We are using an unlimited memory monitor here because external
// sort itself is responsible for making sure that we stay within
// the memory limit.
unlimitedAllocator := NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(
ctx, flowCtx, monitorNamePrefix,
))
diskQueuesUnlimitedAllocator := NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(
ctx, flowCtx, monitorNamePrefix+"disk-queues",
))
return newExternalSorter(
unlimitedAllocator,
input, inputTypes, core.Sorter.OutputOrdering,
execinfra.GetWorkMemLimit(flowCtx.Cfg),
diskQueuesUnlimitedAllocator,
)
},
args.TestingKnobs.SpillingCallbackFn,
)
}
result.ColumnTypes = spec.Input[0].ColumnTypes

Expand All @@ -821,7 +839,7 @@ func NewColOperator(
// which kind of partitioner to use should come from the optimizer.
windowSortingPartitionerMemAccount := streamingMemAccount
if !useStreamingMemAccountForBuffering {
windowSortingPartitionerMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "window-sorting-partitioner-limited")
windowSortingPartitionerMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "window-sorting-partitioner")
}
input, err = NewWindowSortingPartitioner(
NewAllocator(ctx, windowSortingPartitionerMemAccount), input, typs,
Expand All @@ -832,7 +850,7 @@ func NewColOperator(
if len(wf.Ordering.Columns) > 0 {
windowSorterMemAccount := streamingMemAccount
if !useStreamingMemAccountForBuffering {
windowSorterMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "window-sorter-limited")
windowSorterMemAccount = result.createBufferingMemAccount(ctx, flowCtx, "window-sorter")
}
input, err = NewSorter(
NewAllocator(ctx, windowSorterMemAccount), input, typs,
Expand Down Expand Up @@ -1097,14 +1115,29 @@ func (r *NewColOperatorResult) createBufferingMemAccount(
ctx context.Context, flowCtx *execinfra.FlowCtx, name string,
) *mon.BoundAccount {
bufferingOpMemMonitor := execinfra.NewLimitedMonitor(
ctx, flowCtx.EvalCtx.Mon, flowCtx.Cfg, name,
ctx, flowCtx.EvalCtx.Mon, flowCtx.Cfg, name+"-limited",
)
r.BufferingOpMemMonitors = append(r.BufferingOpMemMonitors, bufferingOpMemMonitor)
bufferingMemAccount := bufferingOpMemMonitor.MakeBoundAccount()
r.BufferingOpMemAccounts = append(r.BufferingOpMemAccounts, &bufferingMemAccount)
return &bufferingMemAccount
}

// createBufferingUnlimitedMemAccount instantiates an unlimited memory monitor
// and a memory account to be used with a buffering disk-backed Operator. The
// receiver is updated to have references to both objects.
func (r *NewColOperatorResult) createBufferingUnlimitedMemAccount(
ctx context.Context, flowCtx *execinfra.FlowCtx, name string,
) *mon.BoundAccount {
bufferingOpUnlimitedMemMonitor := execinfra.NewMonitor(
ctx, flowCtx.EvalCtx.Mon, name+"-unlimited",
)
r.BufferingOpMemMonitors = append(r.BufferingOpMemMonitors, bufferingOpUnlimitedMemMonitor)
bufferingMemAccount := bufferingOpUnlimitedMemMonitor.MakeBoundAccount()
r.BufferingOpMemAccounts = append(r.BufferingOpMemAccounts, &bufferingMemAccount)
return &bufferingMemAccount
}

// setProjectedByJoinerColumnTypes sets column types on r according to a
// joiner handled projection.
// NOTE: r.ColumnTypes is updated.
Expand Down
Loading

0 comments on commit f231ee8

Please sign in to comment.