Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
125724: sql: reduce miscellaneous allocations r=yuzefovich a=yuzefovich

See each commit for details.

Combined improvement:
```
name                   old time/op    new time/op    delta
LastWriteWinsInsert-8    1.42ms ±16%    1.44ms ±13%    ~     (p=0.820 n=20+20)

name                   old alloc/op   new alloc/op   delta
LastWriteWinsInsert-8     267kB ± 2%     265kB ± 2%    ~     (p=0.057 n=19+20)

name                   old allocs/op  new allocs/op  delta
LastWriteWinsInsert-8     1.95k ± 0%     1.91k ± 0%  -2.38%  (p=0.000 n=17+18)
```

Epic: CRDB-39063.

125794: roachtest: improve logging in gossip/chaos/nodes=9 r=nvanbenschoten a=nvanbenschoten

Informs cockroachdb#124828.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
3 people committed Jun 17, 2024
3 parents 38165df + 30f8866 + 9145a21 commit ecc8d50
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 118 deletions.
4 changes: 3 additions & 1 deletion pkg/cmd/roachtest/tests/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ SELECT node_id
}

if len(expLiveNodes) == 0 {
t.L().Printf("%d: found %d live nodes\n", i, len(liveNodes))
expLiveNodes = liveNodes
continue
}
Expand All @@ -119,9 +120,10 @@ SELECT node_id
}

waitForGossip := func(deadNode int) {
t.Status("waiting for gossip to exclude dead node")
t.Status("waiting for gossip to exclude dead node %d", deadNode)
start := timeutil.Now()
for {
t.L().Printf("checking if gossip excludes dead node %d\n", deadNode)
if gossipOK(start, deadNode) {
return
}
Expand Down
165 changes: 81 additions & 84 deletions pkg/sql/colexec/colbuilder/execplan.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecargs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/sql/colcontainer",
"//pkg/sql/colexecerror",
"//pkg/sql/colexecop",
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/execinfra/execreleasable",
"//pkg/sql/execinfrapb",
Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/colexec/colexecargs/op_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -60,9 +61,12 @@ type OpWithMetaInfo struct {
// NewColOperatorArgs is a helper struct that encompasses all of the input
// arguments to NewColOperator call.
type NewColOperatorArgs struct {
Spec *execinfrapb.ProcessorSpec
Inputs []OpWithMetaInfo
StreamingMemAccount *mon.BoundAccount
Spec *execinfrapb.ProcessorSpec
Inputs []OpWithMetaInfo
// StreamingMemAccount, if nil, is allocated lazily in NewColOperator.
StreamingMemAccount *mon.BoundAccount
// StreamingAllocator will be allocated lazily in NewColOperator.
StreamingAllocator *colmem.Allocator
ProcessorConstructor execinfra.ProcessorConstructor
LocalProcessors []execinfra.LocalProcessor
// any is actually a coldata.Batch, see physicalplan.PhysicalInfrastructure comments.
Expand Down
42 changes: 20 additions & 22 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type Allocator struct {
// allocation is denied by acc.
unlimitedAcc *mon.BoundAccount
factory coldata.ColumnFactory
maxBatchSize int
}

// SelVectorSize returns the memory usage of the selection vector of the given
Expand Down Expand Up @@ -149,18 +148,6 @@ func NewLimitedAllocator(
}
}

// SetMaxBatchSize use this to get more or less than the coldata.BatchSize() default.
func (a *Allocator) SetMaxBatchSize(siz int) {
a.maxBatchSize = siz
}

func (a *Allocator) getMaxBatchSize() int {
if a.maxBatchSize == 0 {
return coldata.BatchSize()
}
return a.maxBatchSize
}

// NewMemBatchWithFixedCapacity allocates a new in-memory coldata.Batch with the
// given vector capacity.
// Note: consider whether you want the dynamic batch size behavior (in which
Expand Down Expand Up @@ -238,7 +225,7 @@ func growCapacity(oldCapacity int, minDesiredCapacity int, maxBatchSize int) int
//
// The method will grow the allocated capacity of the batch exponentially
// (possibly incurring a reallocation), until the batch reaches
// coldata.BatchSize() in capacity or maxBatchMemSize in the memory footprint if
// maxBatchSize in capacity or maxBatchMemSize in the memory footprint if
// desiredCapacitySufficient is false. When that parameter is true and the
// capacity of old batch is at least minDesiredCapacity, then the old batch is
// reused.
Expand All @@ -256,11 +243,12 @@ func growCapacity(oldCapacity int, minDesiredCapacity int, maxBatchSize int) int
// released, so it is expected that the caller will lose the references to the
// old batch.
// Note: the method assumes that minDesiredCapacity is at least 0 and will clamp
// minDesiredCapacity to be between 1 and coldata.BatchSize() inclusive.
// minDesiredCapacity to be between 1 and maxBatchSize inclusive.
func (a *Allocator) resetMaybeReallocate(
typs []*types.T,
oldBatch coldata.Batch,
minDesiredCapacity int,
maxBatchSize int,
maxBatchMemSize int64,
desiredCapacitySufficient bool,
alwaysReallocate bool,
Expand All @@ -269,8 +257,8 @@ func (a *Allocator) resetMaybeReallocate(
colexecerror.InternalError(errors.AssertionFailedf("invalid minDesiredCapacity %d", minDesiredCapacity))
} else if minDesiredCapacity == 0 {
minDesiredCapacity = 1
} else if minDesiredCapacity > a.getMaxBatchSize() {
minDesiredCapacity = a.getMaxBatchSize()
} else if minDesiredCapacity > maxBatchSize {
minDesiredCapacity = maxBatchSize
}
reallocated = true
if oldBatch == nil {
Expand All @@ -281,15 +269,15 @@ func (a *Allocator) resetMaybeReallocate(
var useOldBatch bool
// Avoid calculating the memory footprint if possible.
var oldBatchMemSize int64
if oldCapacity == a.getMaxBatchSize() {
if oldCapacity == maxBatchSize {
// If old batch is already of the largest capacity, we will reuse
// it.
useOldBatch = true
} else {
// Check that if we were to grow the capacity and allocate a new
// batch, the new batch would still not exceed the limit.
if estimatedMaxCapacity := truncateToMemoryLimit(
growCapacity(oldCapacity, minDesiredCapacity, a.getMaxBatchSize()), maxBatchMemSize, typs,
growCapacity(oldCapacity, minDesiredCapacity, maxBatchSize), maxBatchMemSize, typs,
); estimatedMaxCapacity < minDesiredCapacity {
// Reduce the ask according to the estimated maximum. Note that
// we do not set desiredCapacitySufficient to false since this
Expand Down Expand Up @@ -335,7 +323,7 @@ func (a *Allocator) resetMaybeReallocate(
newBatch = oldBatch
} else {
a.ReleaseMemory(oldBatchMemSize)
newCapacity := growCapacity(oldCapacity, minDesiredCapacity, a.getMaxBatchSize())
newCapacity := growCapacity(oldCapacity, minDesiredCapacity, maxBatchSize)
newCapacity = truncateToMemoryLimit(newCapacity, maxBatchMemSize, typs)
newBatch = a.NewMemBatchWithFixedCapacity(typs, newCapacity)
}
Expand All @@ -355,7 +343,7 @@ func (a *Allocator) ResetMaybeReallocateNoMemLimit(
typs []*types.T, oldBatch coldata.Batch, requiredCapacity int,
) (newBatch coldata.Batch, reallocated bool) {
newBatch, reallocated, _ = a.resetMaybeReallocate(
typs, oldBatch, requiredCapacity, noMemLimit,
typs, oldBatch, requiredCapacity, coldata.BatchSize() /* maxBatchSize */, noMemLimit,
true /* desiredCapacitySufficient */, false, /* alwaysReallocate */
)
return newBatch, reallocated
Expand Down Expand Up @@ -709,6 +697,9 @@ func GetFixedSizeTypeSize(t *types.T) (size int64) {
// used.
type AccountingHelper struct {
allocator *Allocator
// maxBatchSize determines the maximum size of the batches produced by the
// helper in rows (coldata.BatchSize() by default).
maxBatchSize int
// memoryLimit determines the maximum memory footprint of the batch.
memoryLimit int64
// maxCapacity if non-zero indicates the target capacity of the batch. It is
Expand Down Expand Up @@ -736,6 +727,7 @@ func (h *AccountingHelper) discardBatch(batchMemSize int64) bool {
// components.
func (h *AccountingHelper) Init(allocator *Allocator, memoryLimit int64) {
h.allocator = allocator
h.maxBatchSize = coldata.BatchSize()
if memoryLimit == 1 {
// The memory limit of 1 most likely indicates that we are in a "force
// disk spilling" scenario, but the helper should ignore that, so we
Expand Down Expand Up @@ -807,7 +799,7 @@ func (h *AccountingHelper) ResetMaybeReallocate(
}
var oldBatchReachedMemSize bool
newBatch, reallocated, oldBatchReachedMemSize = h.allocator.resetMaybeReallocate(
typs, oldBatch, minDesiredCapacity, h.memoryLimit, desiredCapacitySufficient, h.alwaysReallocate,
typs, oldBatch, minDesiredCapacity, h.maxBatchSize, h.memoryLimit, desiredCapacitySufficient, h.alwaysReallocate,
)
if oldBatchReachedMemSize && h.maxCapacity == 0 {
// The old batch has just reached the memory size for the first time, so
Expand Down Expand Up @@ -924,6 +916,12 @@ func (h *SetAccountingHelper) Init(
h.datumVecs = make([]coldata.DatumVec, h.varSizeVecIdxs.Len()-numDecimalVecs)
}

// SetMaxBatchSize use this to get more or less than the coldata.BatchSize()
// default.
func (h *SetAccountingHelper) SetMaxBatchSize(maxBatchSize int) {
h.helper.maxBatchSize = maxBatchSize
}

func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 {
var bytesLikeTotalSize int64
for _, b := range h.bytesLikeVectors {
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/colmem/reset_maybe_reallocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ func TestResetMaybeReallocate(t *testing.T) {
typs := []*types.T{types.Bytes}

// Allocate a new batch and modify it.
b, _, _ = testAllocator.resetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64, false /* desiredCapacitySufficient */, false /* alwaysReallocate */)
b, _, _ = testAllocator.resetMaybeReallocate(typs, b, coldata.BatchSize(), coldata.BatchSize(), math.MaxInt64, false /* desiredCapacitySufficient */, false /* alwaysReallocate */)
b.SetSelection(true)
b.Selection()[0] = 1
b.ColVec(0).Bytes().Set(1, []byte("foo"))

oldBatch := b
b, _, _ = testAllocator.resetMaybeReallocate(typs, b, coldata.BatchSize(), math.MaxInt64, false /* desiredCapacitySufficient */, false /* alwaysReallocate */)
b, _, _ = testAllocator.resetMaybeReallocate(typs, b, coldata.BatchSize(), coldata.BatchSize(), math.MaxInt64, false /* desiredCapacitySufficient */, false /* alwaysReallocate */)
// We should have used the same batch, and now it should be in a "reset"
// state.
require.Equal(t, oldBatch, b)
Expand All @@ -83,21 +83,21 @@ func TestResetMaybeReallocate(t *testing.T) {
// Allocate a new batch attempting to use the batch with too small of a
// capacity - new batch should **not** be allocated because the memory
// limit is already exceeded.
b, _, _ = testAllocator.resetMaybeReallocate(typs, smallBatch, minDesiredCapacity, smallMemSize, false /* desiredCapacitySufficient */, false /* alwaysReallocate */)
b, _, _ = testAllocator.resetMaybeReallocate(typs, smallBatch, minDesiredCapacity, coldata.BatchSize(), smallMemSize, false /* desiredCapacitySufficient */, false /* alwaysReallocate */)
require.Equal(t, smallBatch, b)
require.Equal(t, minDesiredCapacity/2, b.Capacity())

oldBatch := b

// Reset the batch asking for the same small desired capacity when it is
// sufficient - the same batch should be returned.
b, _, _ = testAllocator.resetMaybeReallocate(typs, b, minDesiredCapacity/2, smallMemSize, true /* desiredCapacitySufficient */, false /* alwaysReallocate */)
b, _, _ = testAllocator.resetMaybeReallocate(typs, b, minDesiredCapacity/2, coldata.BatchSize(), smallMemSize, true /* desiredCapacitySufficient */, false /* alwaysReallocate */)
require.Equal(t, smallBatch, b)
require.Equal(t, minDesiredCapacity/2, b.Capacity())

// Reset the batch and confirm that a new batch is allocated because we
// have given larger memory limit.
b, _, _ = testAllocator.resetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize, false /* desiredCapacitySufficient */, false /* alwaysReallocate */)
b, _, _ = testAllocator.resetMaybeReallocate(typs, b, minDesiredCapacity, coldata.BatchSize(), largeMemSize, false /* desiredCapacitySufficient */, false /* alwaysReallocate */)
require.NotEqual(t, oldBatch, b)
require.Equal(t, minDesiredCapacity, b.Capacity())

Expand All @@ -108,7 +108,7 @@ func TestResetMaybeReallocate(t *testing.T) {
// resetMaybeReallocate truncates the capacity at
// coldata.BatchSize(), so we run this part of the test only when
// doubled capacity will not be truncated.
b, _, _ = testAllocator.resetMaybeReallocate(typs, b, minDesiredCapacity, largeMemSize, false /* desiredCapacitySufficient */, false /* alwaysReallocate */)
b, _, _ = testAllocator.resetMaybeReallocate(typs, b, minDesiredCapacity, coldata.BatchSize(), largeMemSize, false /* desiredCapacitySufficient */, false /* alwaysReallocate */)
require.NotEqual(t, oldBatch, b)
require.Equal(t, 2*minDesiredCapacity, b.Capacity())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/copy_from.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,9 @@ func (c *copyMachine) initVectorizedCopy(ctx context.Context, typs []*types.T) e
c.vectorized = true
factory := coldataext.NewExtendedColumnFactory(c.p.EvalContext())
alloc := colmem.NewLimitedAllocator(ctx, &c.rowsMemAcc, nil /*optional unlimited memory account*/, factory)
alloc.SetMaxBatchSize(c.copyBatchRowSize)
// TODO(cucaroach): Avoid allocating selection vector.
c.accHelper.Init(alloc, c.maxRowMem, typs, false /*alwaysReallocate*/)
c.accHelper.Init(alloc, c.maxRowMem, typs, false /* alwaysReallocate */)
c.accHelper.SetMaxBatchSize(c.copyBatchRowSize)
// Start with small number of rows, compromise between going too big and
// overallocating memory and avoiding some doubling growth batches.
if err := colexecerror.CatchVectorizedRuntimeError(func() {
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func (ie *InternalExecutor) initConnEx(
mode: mode,
sync: syncCallback,
}
clientComm.results = clientComm.resultsScratch[:0]
clientComm.rowsAffectedState.rewind = func() {
var zero int
_ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero})
Expand Down Expand Up @@ -1431,6 +1432,8 @@ type internalClientComm struct {
// at any point in time (i.e. any command is created, evaluated, and then
// closed / discarded, and only after that a new command can be processed).
results []*streamingCommandResult
// resultsScratch is the underlying storage for results.
resultsScratch [4]*streamingCommandResult

// The results of the query execution will be written into w.
w ieResultWriter
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/physicalplan/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ var _ tree.Visitor = &ivarRemapper{}

func (v *ivarRemapper) VisitPre(expr tree.Expr) (recurse bool, newExpr tree.Expr) {
if ivar, ok := expr.(*tree.IndexedVar); ok {
if ivar.Idx == v.indexVarMap[ivar.Idx] {
// Avoid the identical remapping since it's redundant.
return false, expr
}
newIvar := v.allocIndexedVar()
*newIvar = *ivar
newIvar.Idx = v.indexVarMap[ivar.Idx]
Expand Down

0 comments on commit ecc8d50

Please sign in to comment.