Skip to content

Commit

Permalink
Merge #60593 #60604
Browse files Browse the repository at this point in the history
60593: colexec: make external sorter respect memory limit better r=yuzefovich a=yuzefovich

**colexec: register memory used by dequeued batches from partitioned queue**

Previously, we forgot to perform the memory accounting of the batches
that are dequeued from the partitions in the external sort (which could
be substantial when we're merging multiple partitions at once and the
tuples are wide) and in the hash based partitioner. This is now fixed.

Additionally, this commit retains references to some internal operators
in the external sort in order to reuse the memory under the dequeued
batches (this will be beneficial if we perform repeated merging).

Also, this commit fixes an issue with repeated re-initializing of the
disk-backed operators in the disk spiller if the latter has been reset
(the problem would lead to redundant allocations and not reusing of the
available memory).

Slight complication with accounting was because of the fact that we were
using the same allocator for all usages. This would be quite wrong
because in the merge phase we have two distinct memory usage with
different lifecycles - the memory under the dequeued batches is kept
(and reused later) whereas the memory under the output batch of the
ordered synchronizer is released. We now correctly handle these
lifecycles by using separate allocators.

Release note (bug fix): CockroachDB previously didn't account for some
RAM used when disk-spilling operations (like sorts and hash joins) were
using the temporary storage in the vectorized execution engine. This
could result in OOM crashes, especially when the rows are large in size.

**colexec: make external sorter respect memory limit better**

This commit improves in how the external sorter manages its available
RAM. There are two different main usages that overlap because we are
keeping the references to both at all times:
1. during the spilling/sorting phase, we use a single in-memory sorter
2. during the merging phase, we use the ordered synchronizer that reads
one batch from each of the partitions and also allocates an output
batch.

Previously, we would give the whole memory limit to the in-memory sorter
in 1. which resulted in the external sorter using at least 2x of its
memory limit. This is now fixed by giving only a half to the in-memory
sorter.

The handling of 2. was even worse - we didn't have any logic that would
limit the number of active partitions based on the memory footprint. If
the batches are large (say 1GB in size), during the merge phase we would
be using on the order of 16GB of RAM (number 16 would be determined
based on the number of file descriptors). Additionally, we would give
the whole memory limit to the output batch too.

This misbehavior is also now fixed by tracking the maximum size of
a single batch in each active partition and computing the actual maximum
number of partitions to have using those sizes.

Fixes: #60017.

Release note: None

60604: sql: remove QueryWithCols method from the internal executor r=yuzefovich a=yuzefovich

Previous commit removed this method from the interface, and this commit
follows up to remove the method entirely. This is done in a similar
fashion - by changing to using `QueryRowExWithCols` (when at most one
row is expected) and to using the iterator API (avoiding the buffering
of rows in all cases).

Addresses: #48595.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Feb 17, 2021
3 parents 738e60a + 750ef76 + e257275 commit 889c27a
Show file tree
Hide file tree
Showing 12 changed files with 692 additions and 365 deletions.
537 changes: 351 additions & 186 deletions pkg/server/admin.go

Large diffs are not rendered by default.

28 changes: 14 additions & 14 deletions pkg/server/api_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,22 +295,22 @@ func (r *roleAuthorizationMux) getRoleForUser(
// Shortcut.
return superUserRole, nil
}
rows, _, err := r.ie.QueryWithCols(
row, err := r.ie.QueryRowEx(
ctx, "check-is-admin", nil, /* txn */
sessiondata.InternalExecutorOverride{User: user},
"SELECT crdb_internal.is_admin()")
if err != nil {
return regularRole, err
}
if len(rows) != 1 {
return regularRole, errors.AssertionFailedf("hasAdminRole: expected 1 row, got %d", len(rows))
if row == nil {
return regularRole, errors.AssertionFailedf("hasAdminRole: expected 1 row, got 0")
}
if len(rows[0]) != 1 {
return regularRole, errors.AssertionFailedf("hasAdminRole: expected 1 column, got %d", len(rows[0]))
if len(row) != 1 {
return regularRole, errors.AssertionFailedf("hasAdminRole: expected 1 column, got %d", len(row))
}
dbDatum, ok := tree.AsDBool(rows[0][0])
dbDatum, ok := tree.AsDBool(row[0])
if !ok {
return regularRole, errors.AssertionFailedf("hasAdminRole: expected bool, got %T", rows[0][0])
return regularRole, errors.AssertionFailedf("hasAdminRole: expected bool, got %T", row[0])
}
if dbDatum {
return adminRole, nil
Expand All @@ -325,22 +325,22 @@ func (r *roleAuthorizationMux) hasRoleOption(
// Shortcut.
return true, nil
}
rows, _, err := r.ie.QueryWithCols(
row, err := r.ie.QueryRowEx(
ctx, "check-role-option", nil, /* txn */
sessiondata.InternalExecutorOverride{User: user},
"SELECT crdb_internal.has_role_option($1)", roleOption.String())
if err != nil {
return false, err
}
if len(rows) != 1 {
return false, errors.AssertionFailedf("hasRoleOption: expected 1 row, got %d", len(rows))
if row == nil {
return false, errors.AssertionFailedf("hasRoleOption: expected 1 row, got 0")
}
if len(rows[0]) != 1 {
return false, errors.AssertionFailedf("hasRoleOption: expected 1 column, got %d", len(rows[0]))
if len(row) != 1 {
return false, errors.AssertionFailedf("hasRoleOption: expected 1 column, got %d", len(row))
}
dbDatum, ok := tree.AsDBool(rows[0][0])
dbDatum, ok := tree.AsDBool(row[0])
if !ok {
return false, errors.AssertionFailedf("hasRoleOption: expected bool, got %T", rows[0][0])
return false, errors.AssertionFailedf("hasRoleOption: expected bool, got %T", row[0])
}
return bool(dbDatum), nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ go_library(
"//pkg/util/cancelchecker",
"//pkg/util/duration", # keep
"//pkg/util/encoding", # keep
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
Expand Down
18 changes: 14 additions & 4 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,16 +444,26 @@ func (r opResult) createDiskBackedSort(
sorterMemMonitorName,
func(input colexecbase.Operator) colexecbase.Operator {
monitorNamePrefix := fmt.Sprintf("%sexternal-sorter", memMonitorNamePrefix)
// We are using an unlimited memory monitor here because external
// We are using unlimited memory monitors here because external
// sort itself is responsible for making sure that we stay within
// the memory limit.
unlimitedAllocator := colmem.NewAllocator(
sortUnlimitedAllocator := colmem.NewAllocator(
ctx, r.createBufferingUnlimitedMemAccount(
ctx, flowCtx, monitorNamePrefix+"-sort",
), factory)
mergeUnlimitedAllocator := colmem.NewAllocator(
ctx, r.createBufferingUnlimitedMemAccount(
ctx, flowCtx, monitorNamePrefix+"-merge",
), factory)
outputUnlimitedAllocator := colmem.NewAllocator(
ctx, r.createBufferingUnlimitedMemAccount(
ctx, flowCtx, monitorNamePrefix,
ctx, flowCtx, monitorNamePrefix+"-output",
), factory)
diskAccount := r.createDiskAccount(ctx, flowCtx, monitorNamePrefix)
es := colexec.NewExternalSorter(
unlimitedAllocator,
sortUnlimitedAllocator,
mergeUnlimitedAllocator,
outputUnlimitedAllocator,
input, inputTypes, ordering,
execinfra.GetWorkMemLimit(flowCtx.Cfg),
maxNumberPartitions,
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/colexec/disk_spiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,13 @@ func (d *diskSpillerBase) Next(ctx context.Context) coldata.Batch {
if d.spillingCallbackFn != nil {
d.spillingCallbackFn()
}
d.diskBackedOp.Init()
d.distBackedOpInitStatus = OperatorInitialized
if d.distBackedOpInitStatus == OperatorNotInitialized {
// The disk spiller might be reset for reuse in which case the
// the disk-backed operator has already been initialized and we
// don't want to perform the initialization again.
d.diskBackedOp.Init()
d.distBackedOpInitStatus = OperatorInitialized
}
return d.diskBackedOp.Next(ctx)
}
// Either not an out of memory error or an OOM error coming from a
Expand Down
Loading

0 comments on commit 889c27a

Please sign in to comment.