diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index e20e00b9399f..2e94b6f81142 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -414,6 +414,7 @@ func (e *confluentAvroEncoder) EncodeValue(ctx context.Context, row encodeRow) ( opts := avroEnvelopeOpts{afterField: true, beforeField: e.beforeField, updatedField: e.updatedField} registered.schema, err = envelopeToAvroSchema(e.rawTableName(row.tableDesc), opts, beforeDataSchema, afterDataSchema, e.schemaPrefix) + if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index d35987db083b..1b3267501baf 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -482,6 +482,10 @@ func TestAvroSchemaNaming(t *testing.T) { `supermovr.public.drivers-key`, `supermovr.public.drivers-value`, }) + + //Both changes to the subject are also reflected in the schema name in the posted schemas + require.Contains(t, reg.mu.schemas[reg.mu.subjects[`supermovr.public.drivers-key`]], `supermovr`) + require.Contains(t, reg.mu.schemas[reg.mu.subjects[`supermovr.public.drivers-value`]], `supermovr`) } t.Run(`enterprise`, enterpriseTest(testFn)) diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go index 22f22a25a57f..0119b735bc20 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp.go @@ -217,13 +217,27 @@ func (rts *resolvedTimestamp) recompute() bool { if !rts.IsInit() { return false } + if rts.closedTS.Less(rts.resolvedTS) { + panic(fmt.Sprintf("closed timestamp below resolved timestamp: %s < %s", + rts.closedTS, rts.resolvedTS)) + } newTS := rts.closedTS + + // Take into account the intents that haven't been yet resolved - their + // timestamps cannot be resolved yet. if txn := rts.intentQ.Oldest(); txn != nil { - txnTS := txn.timestamp.FloorPrev() - if txnTS.Less(newTS) { - newTS = txnTS + if txn.timestamp.LessEq(rts.resolvedTS) { + panic(fmt.Sprintf("unresolved txn equal to or below resolved timestamp: %s <= %s", + txn.timestamp, rts.resolvedTS)) } + // txn.timestamp cannot be resolved, so the resolved timestamp must be Prev. + txnTS := txn.timestamp.Prev() + newTS.Backward(txnTS) } + // Truncate the logical part. It might have come from a Prev call above, and + // it's dangerous to start pushing things above Logical=MaxInt32. + newTS.Logical = 0 + if newTS.Less(rts.resolvedTS) { panic(fmt.Sprintf("resolved timestamp regression, was %s, recomputed as %s", rts.resolvedTS, newTS)) diff --git a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go index a6573f932f8b..ff5f9795131d 100644 --- a/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go +++ b/pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) @@ -563,3 +564,43 @@ func TestResolvedTimestampTxnAborted(t *testing.T) { require.True(t, fwd) require.Equal(t, hlc.Timestamp{WallTime: 25}, rts.Get()) } + +// Test that things go well when the closed timestamp has non-zero logical part. +func TestClosedTimestampLogicalPart(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + rts := makeResolvedTimestamp() + rts.Init() + + // Set a new closed timestamp. Resolved timestamp advances. + fwd := rts.ForwardClosedTS(hlc.Timestamp{WallTime: 10, Logical: 2}) + require.True(t, fwd) + require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) + + // Add an intent for a new transaction. + txn1 := uuid.MakeV4() + fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4})) + require.False(t, fwd) + require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) + + // Set a new closed timestamp. Resolved timestamp doesn't advance, since it + // could only theoretically advance up to 10.4, and it doesn't do logical + // parts. + fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 11, Logical: 6}) + require.False(t, fwd) + require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get()) + + // Abort txn1. Resolved timestamp advances. + fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1)) + require.True(t, fwd) + require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) + + // Create an intent one tick above the closed ts. Resolved timestamp doesn't + // advance. This tests the case where the closed timestamp has a logical part, + // and an intent is in the next wall tick; this used to cause an issue because + // of the rounding logic. + txn2 := uuid.MakeV4() + fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7})) + require.False(t, fwd) + require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get()) +} diff --git a/pkg/sql/colexec/colexecjoin/crossjoiner.go b/pkg/sql/colexec/colexecjoin/crossjoiner.go index 5d7d07dc9a68..9550675d2cbb 100644 --- a/pkg/sql/colexec/colexecjoin/crossjoiner.go +++ b/pkg/sql/colexec/colexecjoin/crossjoiner.go @@ -168,9 +168,7 @@ func (c *crossJoiner) consumeInputs(ctx context.Context) { if needLeftTuples { for { batch := c.inputOne.Next(ctx) - if err := c.left.tuples.Enqueue(ctx, batch); err != nil { - colexecerror.InternalError(err) - } + c.left.tuples.Enqueue(ctx, batch) if batch.Length() == 0 { break } @@ -180,9 +178,7 @@ func (c *crossJoiner) consumeInputs(ctx context.Context) { if needRightTuples { for { batch := c.inputTwo.Next(ctx) - if err := c.right.tuples.Enqueue(ctx, batch); err != nil { - colexecerror.InternalError(err) - } + c.right.tuples.Enqueue(ctx, batch) if batch.Length() == 0 { break } diff --git a/pkg/sql/colexec/colexecjoin/mergejoiner.go b/pkg/sql/colexec/colexecjoin/mergejoiner.go index d6d7c78c6c7a..ce15baf672f1 100644 --- a/pkg/sql/colexec/colexecjoin/mergejoiner.go +++ b/pkg/sql/colexec/colexecjoin/mergejoiner.go @@ -20,7 +20,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecbase" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" - "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -577,9 +576,7 @@ func (o *mergeJoinBase) appendToBufferedGroup( if batch.Length() == 0 || groupLength == 0 { // We have finished appending to this buffered group, so we need to // Enqueue a zero-length batch per the contract of the spilling queue. - if err := bufferedTuples.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + bufferedTuples.Enqueue(ctx, coldata.ZeroBatch) return } // TODO(yuzefovich): for LEFT/RIGHT ANTI joins we only need to store the @@ -625,9 +622,7 @@ func (o *mergeJoinBase) appendToBufferedGroup( } bufferedGroup.scratchBatch.SetLength(groupLength) }) - if err := bufferedTuples.Enqueue(ctx, bufferedGroup.scratchBatch); err != nil { - colexecerror.InternalError(err) - } + bufferedTuples.Enqueue(ctx, bufferedGroup.scratchBatch) } // setBuilderSourceToBatch sets the builder state to use groups from the diff --git a/pkg/sql/colexec/colexecutils/BUILD.bazel b/pkg/sql/colexec/colexecutils/BUILD.bazel index 501a93394bf0..4c3abb7e697c 100644 --- a/pkg/sql/colexec/colexecutils/BUILD.bazel +++ b/pkg/sql/colexec/colexecutils/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/sql/colexecerror", "//pkg/sql/colexecop", "//pkg/sql/colmem", + "//pkg/sql/sqlerrors", "//pkg/sql/types", "//pkg/util", "//pkg/util/cancelchecker", diff --git a/pkg/sql/colexec/colexecutils/spilling_queue.go b/pkg/sql/colexec/colexecutils/spilling_queue.go index 151a8cecba0d..e255af67d72c 100644 --- a/pkg/sql/colexec/colexecutils/spilling_queue.go +++ b/pkg/sql/colexec/colexecutils/spilling_queue.go @@ -147,19 +147,19 @@ func NewRewindableSpillingQueue(args *NewSpillingQueueArgs) *SpillingQueue { // The ownership of the batch still lies with the caller, so the caller is // responsible for accounting for the memory used by batch (although the // spilling queue will account for memory used by the in-memory copies). -func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) error { +func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) { if q.rewindable && q.rewindableState.numItemsDequeued > 0 { - return errors.Errorf("attempted to Enqueue to rewindable SpillingQueue after Dequeue has been called") + colexecerror.InternalError(errors.Errorf("attempted to Enqueue to rewindable SpillingQueue after Dequeue has been called")) } n := batch.Length() if n == 0 { if q.diskQueue != nil { if err := q.diskQueue.Enqueue(ctx, batch); err != nil { - return err + HandleErrorFromDiskQueue(err) } } - return nil + return } q.testingKnobs.numEnqueues++ @@ -175,7 +175,7 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) error // in-memory buffer // so we have to add batch to the disk queue. if err := q.maybeSpillToDisk(ctx); err != nil { - return err + HandleErrorFromDiskQueue(err) } if sel := batch.Selection(); sel != nil { // We need to perform the deselection since the disk queue @@ -204,10 +204,10 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) error batch = q.diskQueueDeselectionScratch } if err := q.diskQueue.Enqueue(ctx, batch); err != nil { - return err + HandleErrorFromDiskQueue(err) } q.numOnDiskItems++ - return nil + return } if q.numInMemoryItems == len(q.items) { @@ -261,7 +261,7 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) error if alreadyCopied == n { // We were able to append all of the tuples, so we return early // since we don't need to update any of the state. - return nil + return } } } @@ -315,7 +315,6 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) error q.curTailIdx = 0 } q.numInMemoryItems++ - return nil } // Dequeue returns the next batch from the queue which is valid only until the diff --git a/pkg/sql/colexec/colexecutils/spilling_queue_test.go b/pkg/sql/colexec/colexecutils/spilling_queue_test.go index cea0bde12611..d0e68442af5d 100644 --- a/pkg/sql/colexec/colexecutils/spilling_queue_test.go +++ b/pkg/sql/colexec/colexecutils/spilling_queue_test.go @@ -180,7 +180,7 @@ func TestSpillingQueue(t *testing.T) { for { b = op.Next(ctx) - require.NoError(t, q.Enqueue(ctx, b)) + q.Enqueue(ctx, b) if b.Length() == 0 { break } @@ -299,7 +299,7 @@ func TestSpillingQueueDidntSpill(t *testing.T) { for { b := op.Next(ctx) - require.NoError(t, q.Enqueue(ctx, b)) + q.Enqueue(ctx, b) b, err := q.Dequeue(ctx) require.NoError(t, err) if b.Length() == 0 { @@ -388,7 +388,7 @@ func TestSpillingQueueMemoryAccounting(t *testing.T) { return int64(batchesAccountedFor) * batchSize } for numEnqueuedBatches := 1; numEnqueuedBatches <= numInputBatches; numEnqueuedBatches++ { - require.NoError(t, q.Enqueue(ctx, batch)) + q.Enqueue(ctx, batch) if rng.Float64() < dequeueProbability { b, err := q.Dequeue(ctx) require.NoError(t, err) @@ -397,7 +397,7 @@ func TestSpillingQueueMemoryAccounting(t *testing.T) { } require.Equal(t, getExpectedMemUsage(numEnqueuedBatches), q.unlimitedAllocator.Used()) } - require.NoError(t, q.Enqueue(ctx, coldata.ZeroBatch)) + q.Enqueue(ctx, coldata.ZeroBatch) for { b, err := q.Dequeue(ctx) require.NoError(t, err) @@ -476,7 +476,7 @@ func TestSpillingQueueMovingTailWhenSpilling(t *testing.T) { sequenceValue := rng.Int63() batch.ColVec(0).Int64()[0] = sequenceValue expectedBatchSequence = append(expectedBatchSequence, sequenceValue) - require.NoError(t, q.Enqueue(ctx, batch)) + q.Enqueue(ctx, batch) } // All enqueued batches should fit under the memory limit (to be // precise, the last enqueued batch has just crossed the limit, but @@ -488,7 +488,7 @@ func TestSpillingQueueMovingTailWhenSpilling(t *testing.T) { sequenceValue := rng.Int63() batch.ColVec(0).Int64()[0] = sequenceValue expectedBatchSequence = append(expectedBatchSequence, sequenceValue) - require.NoError(t, q.Enqueue(ctx, batch)) + q.Enqueue(ctx, batch) numExtraInputBatches = 1 } else { require.NoError(t, q.maybeSpillToDisk(ctx)) @@ -501,7 +501,7 @@ func TestSpillingQueueMovingTailWhenSpilling(t *testing.T) { require.Equal(t, int64(0), q.unlimitedAllocator.Used()) require.Equal(t, numInputBatches+numExtraInputBatches, q.numOnDiskItems) - require.NoError(t, q.Enqueue(ctx, coldata.ZeroBatch)) + q.Enqueue(ctx, coldata.ZeroBatch) // Now check that all the batches are in the correct order. batchCount := 0 diff --git a/pkg/sql/colexec/colexecutils/utils.go b/pkg/sql/colexec/colexecutils/utils.go index 550eee290392..a6d6eb591be2 100644 --- a/pkg/sql/colexec/colexecutils/utils.go +++ b/pkg/sql/colexec/colexecutils/utils.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colmem" + "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/errors" ) @@ -267,3 +268,16 @@ var ( // length. ZeroUint64Column = make([]uint64, coldata.MaxBatchSize) ) + +// HandleErrorFromDiskQueue takes in non-nil error emitted by colcontainer.Queue +// or colcontainer.PartitionedDiskQueue implementations and propagates it +// throughout the vectorized engine. +func HandleErrorFromDiskQueue(err error) { + if sqlerrors.IsDiskFullError(err) { + // We don't want to annotate the disk full error, so we propagate it + // as expected one. + colexecerror.ExpectedError(err) + } else { + colexecerror.InternalError(err) + } +} diff --git a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go index e85b929b2861..8b96ef3a018b 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank.eg.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank.eg.go @@ -239,9 +239,7 @@ func (r *percentRankNoPartitionOp) Next(ctx context.Context) coldata.Batch { batch := r.Input.Next(ctx) n := batch.Length() if n == 0 { - if err := r.bufferedTuples.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + r.bufferedTuples.Enqueue(ctx, coldata.ZeroBatch) // We have fully consumed the input, so now we can populate the output. r.state = relativeRankEmitting continue @@ -267,9 +265,7 @@ func (r *percentRankNoPartitionOp) Next(ctx context.Context) coldata.Batch { } r.scratch.SetLength(n) }) - if err := r.bufferedTuples.Enqueue(ctx, r.scratch); err != nil { - colexecerror.InternalError(err) - } + r.bufferedTuples.Enqueue(ctx, r.scratch) // Then, we need to update the sizes of the partitions. // There is a single partition in the whole input. @@ -453,21 +449,15 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { batch := r.Input.Next(ctx) n := batch.Length() if n == 0 { - if err := r.bufferedTuples.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + r.bufferedTuples.Enqueue(ctx, coldata.ZeroBatch) // We need to flush the last vector of the running partitions // sizes, including the very last partition. runningPartitionsSizesCol := r.partitionsState.runningSizes.ColVec(0).Int64() runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.partitionsState.idx++ r.partitionsState.runningSizes.SetLength(r.partitionsState.idx) - if err := r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } - if err := r.partitionsState.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes) + r.partitionsState.Enqueue(ctx, coldata.ZeroBatch) // We have fully consumed the input, so now we can populate the output. r.state = relativeRankEmitting continue @@ -495,9 +485,7 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { } r.scratch.SetLength(n) }) - if err := r.bufferedTuples.Enqueue(ctx, r.scratch); err != nil { - colexecerror.InternalError(err) - } + r.bufferedTuples.Enqueue(ctx, r.scratch) // Then, we need to update the sizes of the partitions. partitionCol := batch.ColVec(r.partitionColIdx).Bool() @@ -518,9 +506,7 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { if r.partitionsState.idx == coldata.BatchSize() { // We need to flush the vector of partitions sizes. r.partitionsState.runningSizes.SetLength(coldata.BatchSize()) - if err := r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } + r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes) r.partitionsState.idx = 0 r.partitionsState.runningSizes.ResetInternalBatch() } @@ -543,9 +529,7 @@ func (r *percentRankWithPartitionOp) Next(ctx context.Context) coldata.Batch { if r.partitionsState.idx == coldata.BatchSize() { // We need to flush the vector of partitions sizes. r.partitionsState.runningSizes.SetLength(coldata.BatchSize()) - if err := r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } + r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes) r.partitionsState.idx = 0 r.partitionsState.runningSizes.ResetInternalBatch() } @@ -755,21 +739,15 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { batch := r.Input.Next(ctx) n := batch.Length() if n == 0 { - if err := r.bufferedTuples.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + r.bufferedTuples.Enqueue(ctx, coldata.ZeroBatch) // We need to flush the last vector of the running peer groups // sizes, including the very last peer group. runningPeerGroupsSizesCol := r.peerGroupsState.runningSizes.ColVec(0).Int64() runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.peerGroupsState.idx++ r.peerGroupsState.runningSizes.SetLength(r.peerGroupsState.idx) - if err := r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } - if err := r.peerGroupsState.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes) + r.peerGroupsState.Enqueue(ctx, coldata.ZeroBatch) // We have fully consumed the input, so now we can populate the output. r.state = relativeRankEmitting continue @@ -795,9 +773,7 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { } r.scratch.SetLength(n) }) - if err := r.bufferedTuples.Enqueue(ctx, r.scratch); err != nil { - colexecerror.InternalError(err) - } + r.bufferedTuples.Enqueue(ctx, r.scratch) // Then, we need to update the sizes of the partitions. // There is a single partition in the whole input. @@ -822,9 +798,7 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { if r.peerGroupsState.idx == coldata.BatchSize() { // We need to flush the vector of peer group sizes. r.peerGroupsState.runningSizes.SetLength(coldata.BatchSize()) - if err := r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } + r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes) r.peerGroupsState.idx = 0 r.peerGroupsState.runningSizes.ResetInternalBatch() } @@ -847,9 +821,7 @@ func (r *cumeDistNoPartitionOp) Next(ctx context.Context) coldata.Batch { if r.peerGroupsState.idx == coldata.BatchSize() { // We need to flush the vector of peer group sizes. r.peerGroupsState.runningSizes.SetLength(coldata.BatchSize()) - if err := r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } + r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes) r.peerGroupsState.idx = 0 r.peerGroupsState.runningSizes.ResetInternalBatch() } @@ -1059,33 +1031,23 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { batch := r.Input.Next(ctx) n := batch.Length() if n == 0 { - if err := r.bufferedTuples.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + r.bufferedTuples.Enqueue(ctx, coldata.ZeroBatch) // We need to flush the last vector of the running partitions // sizes, including the very last partition. runningPartitionsSizesCol := r.partitionsState.runningSizes.ColVec(0).Int64() runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.partitionsState.idx++ r.partitionsState.runningSizes.SetLength(r.partitionsState.idx) - if err := r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } - if err := r.partitionsState.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes) + r.partitionsState.Enqueue(ctx, coldata.ZeroBatch) // We need to flush the last vector of the running peer groups // sizes, including the very last peer group. runningPeerGroupsSizesCol := r.peerGroupsState.runningSizes.ColVec(0).Int64() runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.peerGroupsState.idx++ r.peerGroupsState.runningSizes.SetLength(r.peerGroupsState.idx) - if err := r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } - if err := r.peerGroupsState.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes) + r.peerGroupsState.Enqueue(ctx, coldata.ZeroBatch) // We have fully consumed the input, so now we can populate the output. r.state = relativeRankEmitting continue @@ -1113,9 +1075,7 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { } r.scratch.SetLength(n) }) - if err := r.bufferedTuples.Enqueue(ctx, r.scratch); err != nil { - colexecerror.InternalError(err) - } + r.bufferedTuples.Enqueue(ctx, r.scratch) // Then, we need to update the sizes of the partitions. partitionCol := batch.ColVec(r.partitionColIdx).Bool() @@ -1136,9 +1096,7 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if r.partitionsState.idx == coldata.BatchSize() { // We need to flush the vector of partitions sizes. r.partitionsState.runningSizes.SetLength(coldata.BatchSize()) - if err := r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } + r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes) r.partitionsState.idx = 0 r.partitionsState.runningSizes.ResetInternalBatch() } @@ -1161,9 +1119,7 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if r.partitionsState.idx == coldata.BatchSize() { // We need to flush the vector of partitions sizes. r.partitionsState.runningSizes.SetLength(coldata.BatchSize()) - if err := r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } + r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes) r.partitionsState.idx = 0 r.partitionsState.runningSizes.ResetInternalBatch() } @@ -1192,9 +1148,7 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if r.peerGroupsState.idx == coldata.BatchSize() { // We need to flush the vector of peer group sizes. r.peerGroupsState.runningSizes.SetLength(coldata.BatchSize()) - if err := r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } + r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes) r.peerGroupsState.idx = 0 r.peerGroupsState.runningSizes.ResetInternalBatch() } @@ -1217,9 +1171,7 @@ func (r *cumeDistWithPartitionOp) Next(ctx context.Context) coldata.Batch { if r.peerGroupsState.idx == coldata.BatchSize() { // We need to flush the vector of peer group sizes. r.peerGroupsState.runningSizes.SetLength(coldata.BatchSize()) - if err := r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } + r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes) r.peerGroupsState.idx = 0 r.peerGroupsState.runningSizes.ResetInternalBatch() } diff --git a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go index d3358326f6c2..7352c6303bce 100644 --- a/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go +++ b/pkg/sql/colexec/colexecwindow/relative_rank_tmpl.go @@ -160,9 +160,7 @@ func _COMPUTE_PARTITIONS_SIZES(_HAS_SEL bool) { // */}} if r.partitionsState.idx == coldata.BatchSize() { // We need to flush the vector of partitions sizes. r.partitionsState.runningSizes.SetLength(coldata.BatchSize()) - if err := r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } + r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes) r.partitionsState.idx = 0 r.partitionsState.runningSizes.ResetInternalBatch() } @@ -194,9 +192,7 @@ func _COMPUTE_PEER_GROUPS_SIZES(_HAS_SEL bool) { // */}} if r.peerGroupsState.idx == coldata.BatchSize() { // We need to flush the vector of peer group sizes. r.peerGroupsState.runningSizes.SetLength(coldata.BatchSize()) - if err := r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } + r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes) r.peerGroupsState.idx = 0 r.peerGroupsState.runningSizes.ResetInternalBatch() } @@ -368,9 +364,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch { batch := r.Input.Next(ctx) n := batch.Length() if n == 0 { - if err := r.bufferedTuples.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + r.bufferedTuples.Enqueue(ctx, coldata.ZeroBatch) // {{if .HasPartition}} // We need to flush the last vector of the running partitions // sizes, including the very last partition. @@ -378,12 +372,8 @@ func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch { runningPartitionsSizesCol[r.partitionsState.idx] = r.numTuplesInPartition r.partitionsState.idx++ r.partitionsState.runningSizes.SetLength(r.partitionsState.idx) - if err := r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } - if err := r.partitionsState.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + r.partitionsState.Enqueue(ctx, r.partitionsState.runningSizes) + r.partitionsState.Enqueue(ctx, coldata.ZeroBatch) // {{end}} // {{if .IsCumeDist}} // We need to flush the last vector of the running peer groups @@ -392,12 +382,8 @@ func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch { runningPeerGroupsSizesCol[r.peerGroupsState.idx] = r.numPeers r.peerGroupsState.idx++ r.peerGroupsState.runningSizes.SetLength(r.peerGroupsState.idx) - if err := r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes); err != nil { - colexecerror.InternalError(err) - } - if err := r.peerGroupsState.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + r.peerGroupsState.Enqueue(ctx, r.peerGroupsState.runningSizes) + r.peerGroupsState.Enqueue(ctx, coldata.ZeroBatch) // {{end}} // We have fully consumed the input, so now we can populate the output. r.state = relativeRankEmitting @@ -431,9 +417,7 @@ func (r *_RELATIVE_RANK_STRINGOp) Next(ctx context.Context) coldata.Batch { } r.scratch.SetLength(n) }) - if err := r.bufferedTuples.Enqueue(ctx, r.scratch); err != nil { - colexecerror.InternalError(err) - } + r.bufferedTuples.Enqueue(ctx, r.scratch) // Then, we need to update the sizes of the partitions. // {{if .HasPartition}} diff --git a/pkg/sql/colexec/external_sort.go b/pkg/sql/colexec/external_sort.go index 176ba16f423d..d2a95219a65c 100644 --- a/pkg/sql/colexec/external_sort.go +++ b/pkg/sql/colexec/external_sort.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/col/coldata" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" + "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" @@ -453,7 +454,7 @@ func (s *externalSorter) enqueue(ctx context.Context, b coldata.Batch) { // performs a deselection when buffering up the tuples, and the in-memory // sorter has allSpooler as its input. if err := s.partitioner.Enqueue(ctx, s.currentPartitionIdx, b); err != nil { - colexecerror.InternalError(err) + colexecutils.HandleErrorFromDiskQueue(err) } } diff --git a/pkg/sql/colexec/hash_aggregator.go b/pkg/sql/colexec/hash_aggregator.go index 8b2069dfebfb..df9567a69c19 100644 --- a/pkg/sql/colexec/hash_aggregator.go +++ b/pkg/sql/colexec/hash_aggregator.go @@ -216,9 +216,7 @@ func (op *hashAggregator) Next(ctx context.Context) coldata.Batch { op.bufferingState.pendingBatch, op.bufferingState.unprocessedIdx = op.Input.Next(ctx), 0 n := op.bufferingState.pendingBatch.Length() if op.inputTrackingState.tuples != nil { - if err := op.inputTrackingState.tuples.Enqueue(ctx, op.bufferingState.pendingBatch); err != nil { - colexecerror.InternalError(err) - } + op.inputTrackingState.tuples.Enqueue(ctx, op.bufferingState.pendingBatch) op.inputTrackingState.zeroBatchEnqueued = n == 0 } if n == 0 { @@ -480,9 +478,7 @@ func (op *hashAggregator) ExportBuffered(ctx context.Context, _ colexecop.Operat if !op.inputTrackingState.zeroBatchEnqueued { // Per the contract of the spilling queue, we need to append a // zero-length batch. - if err := op.inputTrackingState.tuples.Enqueue(ctx, coldata.ZeroBatch); err != nil { - colexecerror.InternalError(err) - } + op.inputTrackingState.tuples.Enqueue(ctx, coldata.ZeroBatch) op.inputTrackingState.zeroBatchEnqueued = true } batch, err := op.inputTrackingState.tuples.Dequeue(ctx) diff --git a/pkg/sql/colexec/hash_based_partitioner.go b/pkg/sql/colexec/hash_based_partitioner.go index 0164a099f058..3763f29d83f0 100644 --- a/pkg/sql/colexec/hash_based_partitioner.go +++ b/pkg/sql/colexec/hash_based_partitioner.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecargs" "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexechash" + "github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils" "github.com/cockroachdb/cockroach/pkg/sql/colexecerror" "github.com/cockroachdb/cockroach/pkg/sql/colexecop" "github.com/cockroachdb/cockroach/pkg/sql/colmem" @@ -374,7 +375,7 @@ func (op *hashBasedPartitioner) partitionBatch( scratchBatch.SetLength(len(sel)) }) if err := op.partitioners[inputIdx].Enqueue(ctx, partitionIdx, scratchBatch); err != nil { - colexecerror.InternalError(err) + colexecutils.HandleErrorFromDiskQueue(err) } partitionInfo, ok := op.partitionsToProcessUsingMain[partitionIdx] if !ok { diff --git a/pkg/sql/colflow/routers.go b/pkg/sql/colflow/routers.go index 8fa5c212e7cb..2dc67e0c4724 100644 --- a/pkg/sql/colflow/routers.go +++ b/pkg/sql/colflow/routers.go @@ -334,12 +334,11 @@ func (o *routerOutputOp) addBatch(ctx context.Context, batch coldata.Batch) bool } o.mu.numUnread += batch.Length() - err := o.mu.data.Enqueue(ctx, batch) - if err == nil && o.testingKnobs.addBatchTestInducedErrorCb != nil { - err = o.testingKnobs.addBatchTestInducedErrorCb() - } - if err != nil { - colexecerror.InternalError(err) + o.mu.data.Enqueue(ctx, batch) + if o.testingKnobs.addBatchTestInducedErrorCb != nil { + if err := o.testingKnobs.addBatchTestInducedErrorCb(); err != nil { + colexecerror.InternalError(err) + } } if batch.Length() == 0 { diff --git a/pkg/sql/sqlerrors/errors.go b/pkg/sql/sqlerrors/errors.go index e3d31b892bf3..d7e777f6102e 100644 --- a/pkg/sql/sqlerrors/errors.go +++ b/pkg/sql/sqlerrors/errors.go @@ -223,6 +223,11 @@ func IsOutOfMemoryError(err error) bool { return errHasCode(err, pgcode.OutOfMemory) } +// IsDiskFullError checks whether this is a disk full error. +func IsDiskFullError(err error) bool { + return errHasCode(err, pgcode.DiskFull) +} + // IsUndefinedColumnError checks whether this is an undefined column error. func IsUndefinedColumnError(err error) bool { return errHasCode(err, pgcode.UndefinedColumn)