Skip to content

Commit

Permalink
Merge #64001
Browse files Browse the repository at this point in the history
64001: kvserver: fix tscache read summaries r=andreimatei a=andreimatei

r.ClosedTimestampV2() was recently broken in that it was only
consultingh the side-transport, not the raft closed timestamp. This lead
to read summaries (used by lease transfers and range merges) to not
properly incorporate the closed timestamp.

This broke in edc4b53

Release note: None

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
  • Loading branch information
craig[bot] and andreimatei committed Apr 21, 2021
2 parents c8820e6 + f8e1c90 commit 8ae7146
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 4 deletions.
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_closedts.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -208,7 +207,7 @@ type sidetransportReceiver interface {
) (hlc.Timestamp, ctpb.LAI)
}

func (st *sidetransportAccess) init(receiver *sidetransport.Receiver, rangeID roachpb.RangeID) {
func (st *sidetransportAccess) init(receiver sidetransportReceiver, rangeID roachpb.RangeID) {
if receiver != nil {
// Avoid st.receiver becoming a typed nil.
st.receiver = receiver
Expand Down
66 changes: 66 additions & 0 deletions pkg/kv/kvserver/replica_closedts_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ package kvserver
import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -137,3 +139,67 @@ func (r *mockReceiver) GetClosedTimestamp(
) (hlc.Timestamp, ctpb.LAI) {
return r.closed, r.lai
}

// Test that r.ClosedTimestampV2() mixes its sources of information correctly.
func TestReplicaClosedTimestampV2(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
ts1 := hlc.Timestamp{WallTime: 1}
ts2 := hlc.Timestamp{WallTime: 2}

for _, test := range []struct {
name string
applied ctpb.LAI
raftClosed hlc.Timestamp
sidetransportClosed hlc.Timestamp
sidetransportLAI ctpb.LAI
expClosed hlc.Timestamp
}{
{
name: "raft closed ahead",
applied: 10,
raftClosed: ts2,
sidetransportClosed: ts1,
sidetransportLAI: 5,
expClosed: ts2,
},
{
name: "sidetrans closed ahead",
applied: 10,
raftClosed: ts1,
sidetransportClosed: ts2,
sidetransportLAI: 5,
expClosed: ts2,
},
{
name: "sidetrans ahead but replication behind",
applied: 10,
raftClosed: ts1,
sidetransportClosed: ts2,
sidetransportLAI: 11,
expClosed: ts1,
},
} {
t.Run(test.name, func(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

receiver := &mockReceiver{
closed: test.sidetransportClosed,
lai: test.sidetransportLAI,
}
var tc testContext
tc.manualClock = hlc.NewManualClock(123) // required by StartWithStoreConfig
cfg := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond))
cfg.TestingKnobs.DontCloseTimestamps = true
cfg.ClosedTimestampReceiver = receiver
tc.StartWithStoreConfig(t, stopper, cfg)
tc.repl.mu.Lock()
tc.repl.mu.state.RaftClosedTimestamp = test.raftClosed
tc.repl.mu.state.LeaseAppliedIndex = uint64(test.applied)
tc.repl.mu.Unlock()
require.Equal(t, test.expClosed, tc.repl.ClosedTimestampV2(ctx))
})
}
}
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func (r *Replica) ClosedTimestampV2(ctx context.Context) hlc.Timestamp {
r.mu.RLock()
appliedLAI := ctpb.LAI(r.mu.state.LeaseAppliedIndex)
leaseholder := r.mu.state.Lease.Replica.NodeID
raftClosed := r.mu.state.RaftClosedTimestamp
r.mu.RUnlock()
return r.sideTransportClosedTimestamp.get(ctx, leaseholder, appliedLAI, hlc.Timestamp{} /* sufficient */)
sideTransportClosed := r.sideTransportClosedTimestamp.get(ctx, leaseholder, appliedLAI, hlc.Timestamp{} /* sufficient */)
raftClosed.Forward(sideTransportClosed)
return raftClosed
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ type StoreConfig struct {

ClosedTimestamp *container.Container
ClosedTimestampSender *sidetransport.Sender
ClosedTimestampReceiver *sidetransport.Receiver
ClosedTimestampReceiver sidetransportReceiver

// SQLExecutor is used by the store to execute SQL statements.
SQLExecutor sqlutil.InternalExecutor
Expand Down

0 comments on commit 8ae7146

Please sign in to comment.