Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
60946: changefeedccl: Envelope schema name reflects schema config r=[stevendanna] a=HonoreDB

Fixes a bug where the Avro -envelope schema name, as opposed to subject,
 did not honor the schema prefix and full table name options.

Release note (bug fix): Envelope schema in avro registry now honors schema_prefix and full_table_name

61662: rangefeed: handle closed timestamps with logical parts r=andreimatei a=andreimatei

Before this patch, the resolved timestamp computation code didn't handle
closed timestamps with logical parts properly. I believe we didn't use
to have such closed timestamps under "the old" closed timestamp
mechanism, and we also don't have with the "new one" [*]. Still,
the resolved timestamp should support them.

The trouble was when the resolved timestamp was tracking an intent at,
say, 11.0, with the closed timestamp at 10.2.
resolvedTimestamp.recompute() was doing FloorPrev(11)==10, and compared
that with the closed ts. It was freaking out when the FloorPrev was
going below the closed timestamp.
This patch rewords this logic to handle this case.

[*] Before #61559, the new closed ts code did sometimes close logical
timestamps - when we were trying to close above the lease expiration, we
were doing Backwards(leaseExpiration) (and leases with a logical
expiration are possible, although that seems separately dubious to me).
This case change in #61559, which added physical rounding to the
respective lease expiration.

Fixes #61176 - although I think the bug has gone back to hidden after #61559

Release note: None
Release justification: Sort of bug fix for new functionality. But more
importantly, this affords extra flexibility to 21.1 in dealing with
potential changes in 21.2.

61790: colexec: propagate disk full error as expected r=yuzefovich a=yuzefovich

Previously, we would always propagate the errors emitted by the spilling
queues and disk queues as "internal" which resulted in errors being
annotated. However, "disk full" errors are expected to occur, so this
commit cleans that up. Additionally, it plumbs the propagation into the
spilling queue's `Enqueue` method itself to remove some of the
duplicated code.

Fixes: #61769.

Release note: None

Co-authored-by: Aaron Zinger <zinger@cockroachlabs.com>
Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
4 people committed Mar 11, 2021
4 parents 2b8d784 + d825112 + c59b0d5 + a8dc0bf commit d01a9e5
Show file tree
Hide file tree
Showing 17 changed files with 145 additions and 142 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ func (e *confluentAvroEncoder) EncodeValue(ctx context.Context, row encodeRow) (

opts := avroEnvelopeOpts{afterField: true, beforeField: e.beforeField, updatedField: e.updatedField}
registered.schema, err = envelopeToAvroSchema(e.rawTableName(row.tableDesc), opts, beforeDataSchema, afterDataSchema, e.schemaPrefix)

if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,10 @@ func TestAvroSchemaNaming(t *testing.T) {
`supermovr.public.drivers-key`,
`supermovr.public.drivers-value`,
})

//Both changes to the subject are also reflected in the schema name in the posted schemas
require.Contains(t, reg.mu.schemas[reg.mu.subjects[`supermovr.public.drivers-key`]], `supermovr`)
require.Contains(t, reg.mu.schemas[reg.mu.subjects[`supermovr.public.drivers-value`]], `supermovr`)
}

t.Run(`enterprise`, enterpriseTest(testFn))
Expand Down
20 changes: 17 additions & 3 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,27 @@ func (rts *resolvedTimestamp) recompute() bool {
if !rts.IsInit() {
return false
}
if rts.closedTS.Less(rts.resolvedTS) {
panic(fmt.Sprintf("closed timestamp below resolved timestamp: %s < %s",
rts.closedTS, rts.resolvedTS))
}
newTS := rts.closedTS

// Take into account the intents that haven't been yet resolved - their
// timestamps cannot be resolved yet.
if txn := rts.intentQ.Oldest(); txn != nil {
txnTS := txn.timestamp.FloorPrev()
if txnTS.Less(newTS) {
newTS = txnTS
if txn.timestamp.LessEq(rts.resolvedTS) {
panic(fmt.Sprintf("unresolved txn equal to or below resolved timestamp: %s <= %s",
txn.timestamp, rts.resolvedTS))
}
// txn.timestamp cannot be resolved, so the resolved timestamp must be Prev.
txnTS := txn.timestamp.Prev()
newTS.Backward(txnTS)
}
// Truncate the logical part. It might have come from a Prev call above, and
// it's dangerous to start pushing things above Logical=MaxInt32.
newTS.Logical = 0

if newTS.Less(rts.resolvedTS) {
panic(fmt.Sprintf("resolved timestamp regression, was %s, recomputed as %s",
rts.resolvedTS, newTS))
Expand Down
41 changes: 41 additions & 0 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -563,3 +564,43 @@ func TestResolvedTimestampTxnAborted(t *testing.T) {
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 25}, rts.Get())
}

// Test that things go well when the closed timestamp has non-zero logical part.
func TestClosedTimestampLogicalPart(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
rts := makeResolvedTimestamp()
rts.Init()

// Set a new closed timestamp. Resolved timestamp advances.
fwd := rts.ForwardClosedTS(hlc.Timestamp{WallTime: 10, Logical: 2})
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get())

// Add an intent for a new transaction.
txn1 := uuid.MakeV4()
fwd = rts.ConsumeLogicalOp(writeIntentOp(txn1, hlc.Timestamp{WallTime: 10, Logical: 4}))
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get())

// Set a new closed timestamp. Resolved timestamp doesn't advance, since it
// could only theoretically advance up to 10.4, and it doesn't do logical
// parts.
fwd = rts.ForwardClosedTS(hlc.Timestamp{WallTime: 11, Logical: 6})
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 10, Logical: 0}, rts.Get())

// Abort txn1. Resolved timestamp advances.
fwd = rts.ConsumeLogicalOp(abortTxnOp(txn1))
require.True(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get())

// Create an intent one tick above the closed ts. Resolved timestamp doesn't
// advance. This tests the case where the closed timestamp has a logical part,
// and an intent is in the next wall tick; this used to cause an issue because
// of the rounding logic.
txn2 := uuid.MakeV4()
fwd = rts.ConsumeLogicalOp(writeIntentOp(txn2, hlc.Timestamp{WallTime: 12, Logical: 7}))
require.False(t, fwd)
require.Equal(t, hlc.Timestamp{WallTime: 11, Logical: 0}, rts.Get())
}
8 changes: 2 additions & 6 deletions pkg/sql/colexec/colexecjoin/crossjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ func (c *crossJoiner) consumeInputs(ctx context.Context) {
if needLeftTuples {
for {
batch := c.inputOne.Next(ctx)
if err := c.left.tuples.Enqueue(ctx, batch); err != nil {
colexecerror.InternalError(err)
}
c.left.tuples.Enqueue(ctx, batch)
if batch.Length() == 0 {
break
}
Expand All @@ -180,9 +178,7 @@ func (c *crossJoiner) consumeInputs(ctx context.Context) {
if needRightTuples {
for {
batch := c.inputTwo.Next(ctx)
if err := c.right.tuples.Enqueue(ctx, batch); err != nil {
colexecerror.InternalError(err)
}
c.right.tuples.Enqueue(ctx, batch)
if batch.Length() == 0 {
break
}
Expand Down
9 changes: 2 additions & 7 deletions pkg/sql/colexec/colexecjoin/mergejoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -577,9 +576,7 @@ func (o *mergeJoinBase) appendToBufferedGroup(
if batch.Length() == 0 || groupLength == 0 {
// We have finished appending to this buffered group, so we need to
// Enqueue a zero-length batch per the contract of the spilling queue.
if err := bufferedTuples.Enqueue(ctx, coldata.ZeroBatch); err != nil {
colexecerror.InternalError(err)
}
bufferedTuples.Enqueue(ctx, coldata.ZeroBatch)
return
}
// TODO(yuzefovich): for LEFT/RIGHT ANTI joins we only need to store the
Expand Down Expand Up @@ -625,9 +622,7 @@ func (o *mergeJoinBase) appendToBufferedGroup(
}
bufferedGroup.scratchBatch.SetLength(groupLength)
})
if err := bufferedTuples.Enqueue(ctx, bufferedGroup.scratchBatch); err != nil {
colexecerror.InternalError(err)
}
bufferedTuples.Enqueue(ctx, bufferedGroup.scratchBatch)
}

// setBuilderSourceToBatch sets the builder state to use groups from the
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/colexecutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/sql/colexecerror",
"//pkg/sql/colexecop",
"//pkg/sql/colmem",
"//pkg/sql/sqlerrors",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/cancelchecker",
Expand Down
17 changes: 8 additions & 9 deletions pkg/sql/colexec/colexecutils/spilling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,19 @@ func NewRewindableSpillingQueue(args *NewSpillingQueueArgs) *SpillingQueue {
// The ownership of the batch still lies with the caller, so the caller is
// responsible for accounting for the memory used by batch (although the
// spilling queue will account for memory used by the in-memory copies).
func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) error {
func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) {
if q.rewindable && q.rewindableState.numItemsDequeued > 0 {
return errors.Errorf("attempted to Enqueue to rewindable SpillingQueue after Dequeue has been called")
colexecerror.InternalError(errors.Errorf("attempted to Enqueue to rewindable SpillingQueue after Dequeue has been called"))
}

n := batch.Length()
if n == 0 {
if q.diskQueue != nil {
if err := q.diskQueue.Enqueue(ctx, batch); err != nil {
return err
HandleErrorFromDiskQueue(err)
}
}
return nil
return
}
q.testingKnobs.numEnqueues++

Expand All @@ -175,7 +175,7 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) error
// in-memory buffer
// so we have to add batch to the disk queue.
if err := q.maybeSpillToDisk(ctx); err != nil {
return err
HandleErrorFromDiskQueue(err)
}
if sel := batch.Selection(); sel != nil {
// We need to perform the deselection since the disk queue
Expand Down Expand Up @@ -204,10 +204,10 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) error
batch = q.diskQueueDeselectionScratch
}
if err := q.diskQueue.Enqueue(ctx, batch); err != nil {
return err
HandleErrorFromDiskQueue(err)
}
q.numOnDiskItems++
return nil
return
}

if q.numInMemoryItems == len(q.items) {
Expand Down Expand Up @@ -261,7 +261,7 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) error
if alreadyCopied == n {
// We were able to append all of the tuples, so we return early
// since we don't need to update any of the state.
return nil
return
}
}
}
Expand Down Expand Up @@ -315,7 +315,6 @@ func (q *SpillingQueue) Enqueue(ctx context.Context, batch coldata.Batch) error
q.curTailIdx = 0
}
q.numInMemoryItems++
return nil
}

// Dequeue returns the next batch from the queue which is valid only until the
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/colexec/colexecutils/spilling_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestSpillingQueue(t *testing.T) {

for {
b = op.Next(ctx)
require.NoError(t, q.Enqueue(ctx, b))
q.Enqueue(ctx, b)
if b.Length() == 0 {
break
}
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestSpillingQueueDidntSpill(t *testing.T) {

for {
b := op.Next(ctx)
require.NoError(t, q.Enqueue(ctx, b))
q.Enqueue(ctx, b)
b, err := q.Dequeue(ctx)
require.NoError(t, err)
if b.Length() == 0 {
Expand Down Expand Up @@ -388,7 +388,7 @@ func TestSpillingQueueMemoryAccounting(t *testing.T) {
return int64(batchesAccountedFor) * batchSize
}
for numEnqueuedBatches := 1; numEnqueuedBatches <= numInputBatches; numEnqueuedBatches++ {
require.NoError(t, q.Enqueue(ctx, batch))
q.Enqueue(ctx, batch)
if rng.Float64() < dequeueProbability {
b, err := q.Dequeue(ctx)
require.NoError(t, err)
Expand All @@ -397,7 +397,7 @@ func TestSpillingQueueMemoryAccounting(t *testing.T) {
}
require.Equal(t, getExpectedMemUsage(numEnqueuedBatches), q.unlimitedAllocator.Used())
}
require.NoError(t, q.Enqueue(ctx, coldata.ZeroBatch))
q.Enqueue(ctx, coldata.ZeroBatch)
for {
b, err := q.Dequeue(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestSpillingQueueMovingTailWhenSpilling(t *testing.T) {
sequenceValue := rng.Int63()
batch.ColVec(0).Int64()[0] = sequenceValue
expectedBatchSequence = append(expectedBatchSequence, sequenceValue)
require.NoError(t, q.Enqueue(ctx, batch))
q.Enqueue(ctx, batch)
}
// All enqueued batches should fit under the memory limit (to be
// precise, the last enqueued batch has just crossed the limit, but
Expand All @@ -488,7 +488,7 @@ func TestSpillingQueueMovingTailWhenSpilling(t *testing.T) {
sequenceValue := rng.Int63()
batch.ColVec(0).Int64()[0] = sequenceValue
expectedBatchSequence = append(expectedBatchSequence, sequenceValue)
require.NoError(t, q.Enqueue(ctx, batch))
q.Enqueue(ctx, batch)
numExtraInputBatches = 1
} else {
require.NoError(t, q.maybeSpillToDisk(ctx))
Expand All @@ -501,7 +501,7 @@ func TestSpillingQueueMovingTailWhenSpilling(t *testing.T) {
require.Equal(t, int64(0), q.unlimitedAllocator.Used())
require.Equal(t, numInputBatches+numExtraInputBatches, q.numOnDiskItems)

require.NoError(t, q.Enqueue(ctx, coldata.ZeroBatch))
q.Enqueue(ctx, coldata.ZeroBatch)

// Now check that all the batches are in the correct order.
batchCount := 0
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/colexec/colexecutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -267,3 +268,16 @@ var (
// length.
ZeroUint64Column = make([]uint64, coldata.MaxBatchSize)
)

// HandleErrorFromDiskQueue takes in non-nil error emitted by colcontainer.Queue
// or colcontainer.PartitionedDiskQueue implementations and propagates it
// throughout the vectorized engine.
func HandleErrorFromDiskQueue(err error) {
if sqlerrors.IsDiskFullError(err) {
// We don't want to annotate the disk full error, so we propagate it
// as expected one.
colexecerror.ExpectedError(err)
} else {
colexecerror.InternalError(err)
}
}
Loading

0 comments on commit d01a9e5

Please sign in to comment.