Skip to content

Commit

Permalink
kv,storage: persist gateway node id in transaction intents
Browse files Browse the repository at this point in the history
This change augments the `TxnMeta` protobuf structure to include the
gateway node ID (responsible for initiating the transaction) when
serializing the intent.  By doing so, this commit enables the Contention
Event Store proposed in #71965, utilizing option 2.

Release note: None
  • Loading branch information
AlexTalks committed Dec 17, 2021
1 parent c0fe9cf commit f7b66bf
Show file tree
Hide file tree
Showing 50 changed files with 284 additions and 181 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestCanSendToFollower(t *testing.T) {
future := clock.Now().Add(2*clock.MaxOffset().Nanoseconds(), 0)

txn := func(ts hlc.Timestamp) *roachpb.Transaction {
txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0)
txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0, 1)
return &txn
}
withWriteTimestamp := func(txn *roachpb.Transaction, ts hlc.Timestamp) *roachpb.Transaction {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
t.Run(fmt.Sprintf("direct-txn-%d", i), func(t *testing.T) {
db := setup(test.nodeID)
now := db.Clock().NowAsClockTimestamp()
kvTxn := roachpb.MakeTransaction("unnamed", nil /*baseKey*/, roachpb.NormalUserPriority, now.ToTimestamp(), db.Clock().MaxOffset().Nanoseconds())
kvTxn := roachpb.MakeTransaction("unnamed", nil /* baseKey */, roachpb.NormalUserPriority, now.ToTimestamp(), db.Clock().MaxOffset().Nanoseconds(), int32(test.nodeID))
txn := kv.NewTxnFromProto(ctx, db, test.nodeID, now, test.typ, &kvTxn)
ots := txn.TestingCloneTxn().ObservedTimestamps
if (len(ots) == 1 && ots[0].NodeID == test.nodeID) != test.expObserved {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestRangeLookupWithOpenTransaction(t *testing.T) {
// engine.
key := testutils.MakeKey(keys.Meta1Prefix, roachpb.KeyMax)
now := s.Clock().Now()
txn := roachpb.MakeTransaction("txn", roachpb.Key("foobar"), 0, now, 0)
txn := roachpb.MakeTransaction("txn", roachpb.Key("foobar"), 0, now, 0, int32(s.SQLInstanceID()))
if err := storage.MVCCPutProto(
context.Background(), s.Engines()[0],
nil, key, now, &txn, &roachpb.RangeDescriptor{}); err != nil {
Expand Down Expand Up @@ -1218,7 +1218,7 @@ func TestMultiRangeScanDeleteRange(t *testing.T) {
}

now := s.Clock().NowAsClockTimestamp()
txnProto := roachpb.MakeTransaction("MyTxn", nil, 0, now.ToTimestamp(), 0)
txnProto := roachpb.MakeTransaction("MyTxn", nil, 0, now.ToTimestamp(), 0, int32(s.SQLInstanceID()))
txn := kv.NewTxnFromProto(ctx, db, s.NodeID(), now, kv.RootTxn, &txnProto)

scan := roachpb.NewScan(writes[0], writes[len(writes)-1].Next(), false)
Expand Down
15 changes: 11 additions & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func TestImmutableBatchArgs(t *testing.T) {

txn := roachpb.MakeTransaction(
"test", nil /* baseKey */, roachpb.NormalUserPriority,
clock.Now(), clock.MaxOffset().Nanoseconds(),
clock.Now(), clock.MaxOffset().Nanoseconds(), int32(ds.getNodeID()),
)
origTxnTs := txn.WriteTimestamp

Expand Down Expand Up @@ -2167,7 +2167,14 @@ func TestMultiRangeGapReverse(t *testing.T) {

ds := NewDistSender(cfg)

txn := roachpb.MakeTransaction("foo", nil, 1.0, clock.Now(), 0)
txn := roachpb.MakeTransaction(
"foo",
nil, // baseKey
1.0, // userPriority
clock.Now(),
0, // maxOffsetNs
1, // coordinatorNodeID
)

var ba roachpb.BatchRequest
ba.Txn = &txn
Expand Down Expand Up @@ -2965,7 +2972,7 @@ func TestParallelCommitsDetectIntentMissingCause(t *testing.T) {
key := roachpb.Key("a")
txn := roachpb.MakeTransaction(
"test", key, roachpb.NormalUserPriority,
clock.Now(), clock.MaxOffset().Nanoseconds(),
clock.Now(), clock.MaxOffset().Nanoseconds(), 1, /* coordinatorNodeID */
)

txnRecordPresent := true
Expand Down Expand Up @@ -3300,7 +3307,7 @@ func TestMultipleErrorsMerged(t *testing.T) {

txn := roachpb.MakeTransaction(
"test", nil /* baseKey */, roachpb.NormalUserPriority,
clock.Now(), clock.MaxOffset().Nanoseconds(),
clock.Now(), clock.MaxOffset().Nanoseconds(), 1, /* coordinatorNodeID */
)
// We're also going to check that the highest bumped WriteTimestamp makes it
// to the merged error.
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ func TestTxnCoordSenderTxnUpdatedOnError(t *testing.T) {
roachpb.UserPriority(0),
now.ToTimestamp(),
clock.MaxOffset().Nanoseconds(),
0, /* coordinatorNodeID */
)
// TODO(andrei): I've monkeyed with the priorities on this initial
// Transaction to keep the test happy from a previous version in which the
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ func makeMockTxnPipeliner(iter condensableSpanSetRangeIterator) (txnPipeliner, *
}

func makeTxnProto() roachpb.Transaction {
return roachpb.MakeTransaction("test", []byte("key"), 0, hlc.Timestamp{WallTime: 10}, 0)
return roachpb.MakeTransaction("test", []byte("key"), 0, hlc.Timestamp{WallTime: 10},
0 /* maxOffsetNs */, 0 /* coordinatorNodeID */)
}

// TestTxnPipeliner1PCTransaction tests that the writes performed by 1PC
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvnemesis/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func withTimestamp(op Operation, ts int) Operation {
nil, // baseKey
roachpb.NormalUserPriority,
hlc.Timestamp{WallTime: int64(ts)},
0,
0, // maxOffsetNs
0, // coordinatorNodeID
)
switch o := op.GetValue().(type) {
case *ClosureTxnOperation:
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func TestEvalAddSSTable(t *testing.T) {
defer engine.Close()

// Write initial data.
intentTxn := roachpb.MakeTransaction("intentTxn", nil, 0, hlc.Timestamp{WallTime: intentTS}, 0)
intentTxn := roachpb.MakeTransaction("intentTxn", nil, 0, hlc.Timestamp{WallTime: intentTS}, 0, 1)
b := engine.NewBatch()
for i := len(tc.data) - 1; i >= 0; i-- { // reverse, older timestamps first
kv := tc.data[i]
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestEndTxnUpdatesTransactionRecord(t *testing.T) {

k, k2 := roachpb.Key("a"), roachpb.Key("b")
ts, ts2, ts3 := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2}, hlc.Timestamp{WallTime: 3}
txn := roachpb.MakeTransaction("test", k, 0, ts, 0)
txn := roachpb.MakeTransaction("test", k, 0, ts, 0, 1)
writes := []roachpb.SequencedWrite{{Key: k, Sequence: 0}}
intents := []roachpb.Span{{Key: k2}}

Expand Down Expand Up @@ -983,7 +983,7 @@ func TestPartialRollbackOnEndTransaction(t *testing.T) {
k := roachpb.Key("a")
ts := hlc.Timestamp{WallTime: 1}
ts2 := hlc.Timestamp{WallTime: 2}
txn := roachpb.MakeTransaction("test", k, 0, ts, 0)
txn := roachpb.MakeTransaction("test", k, 0, ts, 0, 1)
endKey := roachpb.Key("z")
desc := roachpb.RangeDescriptor{
RangeID: 99,
Expand Down Expand Up @@ -1138,7 +1138,7 @@ func TestCommitWaitBeforeIntentResolutionIfCommitTrigger(t *testing.T) {

now := clock.Now()
commitTS := cfg.commitTS(now)
txn := roachpb.MakeTransaction("test", desc.StartKey.AsRawKey(), 0, now, 0)
txn := roachpb.MakeTransaction("test", desc.StartKey.AsRawKey(), 0, now, 0, 1)
txn.ReadTimestamp = commitTS
txn.WriteTimestamp = commitTS

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestQueryResolvedTimestamp(t *testing.T) {
require.NoError(t, storage.MVCCDelete(ctx, db, nil, roachpb.Key(k), makeTS(ts), nil))
}
writeIntent := func(k string, ts int64) {
txn := roachpb.MakeTransaction("test", roachpb.Key(k), 0, makeTS(ts), 0)
txn := roachpb.MakeTransaction("test", roachpb.Key(k), 0, makeTS(ts), 0, 1)
require.NoError(t, storage.MVCCDelete(ctx, db, nil, roachpb.Key(k), makeTS(ts), &txn))
}
writeInline := func(k string) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_recover_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestRecoverTxn(t *testing.T) {
ctx := context.Background()
k, k2 := roachpb.Key("a"), roachpb.Key("b")
ts := hlc.Timestamp{WallTime: 1}
txn := roachpb.MakeTransaction("test", k, 0, ts, 0)
txn := roachpb.MakeTransaction("test", k, 0, ts, 0, 1)
txn.Status = roachpb.STAGING
txn.LockSpans = []roachpb.Span{{Key: k}}
txn.InFlightWrites = []roachpb.SequencedWrite{{Key: k2, Sequence: 0}}
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestRecoverTxnRecordChanged(t *testing.T) {
ctx := context.Background()
k := roachpb.Key("a")
ts := hlc.Timestamp{WallTime: 1}
txn := roachpb.MakeTransaction("test", k, 0, ts, 0)
txn := roachpb.MakeTransaction("test", k, 0, ts, 0, 1)
txn.Status = roachpb.STAGING

testCases := []struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_resolve_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestResolveIntentAfterPartialRollback(t *testing.T) {
ts := hlc.Timestamp{WallTime: 1}
ts2 := hlc.Timestamp{WallTime: 2}
endKey := roachpb.Key("z")
txn := roachpb.MakeTransaction("test", k, 0, ts, 0)
txn := roachpb.MakeTransaction("test", k, 0, ts, 0, 1)
desc := roachpb.RangeDescriptor{
RangeID: 99,
StartKey: roachpb.RKey(k),
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_revert_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestCmdRevertRange(t *testing.T) {
})
}

txn := roachpb.MakeTransaction("test", nil, roachpb.NormalUserPriority, tsC, 1)
txn := roachpb.MakeTransaction("test", nil, roachpb.NormalUserPriority, tsC, 1, 1)
if err := storage.MVCCPut(
ctx, eng, &stats, []byte("0012"), tsC, roachpb.MakeValueFromBytes([]byte("i")), &txn,
); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestCollectIntentsUsesSameIterator(t *testing.T) {

// Write an intent.
val := roachpb.MakeValueFromBytes([]byte("val"))
txn := roachpb.MakeTransaction("test", key, roachpb.NormalUserPriority, ts, 0)
txn := roachpb.MakeTransaction("test", key, roachpb.NormalUserPriority, ts, 0, 1)
var err error
if delete {
err = storage.MVCCDelete(ctx, db, nil, key, ts, &txn)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestUpdateAbortSpan(t *testing.T) {
}
as := abortspan.New(desc.RangeID)

txn := roachpb.MakeTransaction("test", txnKey, 0, hlc.Timestamp{WallTime: 1}, 0)
txn := roachpb.MakeTransaction("test", txnKey, 0, hlc.Timestamp{WallTime: 1}, 0, 1)
newTxnAbortSpanEntry := roachpb.AbortSpanEntry{
Key: txn.Key,
Timestamp: txn.WriteTimestamp,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
return m
},
emptySum: 7551962144604783939,
populatedSum: 6784975417727259950,
populatedSum: 6170112718709472849,
},
reflect.TypeOf(&enginepb.RangeAppliedState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,8 @@ func mergeCheckingTimestampCaches(
// Simulate a txn abort on the RHS from a node with a newer clock. Because
// the transaction record for the pushee was not yet written, this will bump
// the timestamp cache to record the abort.
pushee := roachpb.MakeTransaction("pushee", rhsKey, roachpb.MinUserPriority, readTS, 0)
pusher := roachpb.MakeTransaction("pusher", rhsKey, roachpb.MaxUserPriority, readTS, 0)
pushee := roachpb.MakeTransaction("pushee", rhsKey, roachpb.MinUserPriority, readTS, 0, 0)
pusher := roachpb.MakeTransaction("pusher", rhsKey, roachpb.MaxUserPriority, readTS, 0, 0)
ba = roachpb.BatchRequest{}
ba.Timestamp = readTS.Next()
ba.RangeID = rhsDesc.RangeID
Expand Down Expand Up @@ -4833,7 +4833,7 @@ func sendWithTxn(
args roachpb.Request,
) error {
txn := roachpb.MakeTransaction("test txn", desc.StartKey.AsRawKey(),
0, ts, maxOffset.Nanoseconds())
0, ts, maxOffset.Nanoseconds(), 0)
_, pErr := kv.SendWrappedWith(context.Background(), store.TestSender(), roachpb.Header{Txn: &txn}, args)
return pErr.GoError()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestStoreResolveMetrics(t *testing.T) {
require.NoError(t, err)
span := roachpb.Span{Key: key, EndKey: key.Next()}

txn := roachpb.MakeTransaction("foo", span.Key, roachpb.MinUserPriority, hlc.Timestamp{WallTime: 123}, 999)
txn := roachpb.MakeTransaction("foo", span.Key, roachpb.MinUserPriority, hlc.Timestamp{WallTime: 123}, 999, int32(s.NodeID()))

const resolveCommitCount = int64(200)
const resolveAbortCount = int64(800)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func TestTxnReadWithinUncertaintyInterval(t *testing.T) {
now := s.Clock().Now()
maxOffset := s.Clock().MaxOffset().Nanoseconds()
require.NotZero(t, maxOffset)
txn := roachpb.MakeTransaction("test", key, 1, now, maxOffset)
txn := roachpb.MakeTransaction("test", key, 1, now, maxOffset, int32(s.SQLInstanceID()))
require.True(t, txn.ReadTimestamp.Less(txn.GlobalUncertaintyLimit))
require.Len(t, txn.ObservedTimestamps, 0)

Expand Down Expand Up @@ -641,7 +641,7 @@ func TestTxnReadWithinUncertaintyIntervalAfterLeaseTransfer(t *testing.T) {
now := clocks[1].Now()
maxOffset := clocks[1].MaxOffset().Nanoseconds()
require.NotZero(t, maxOffset)
txn := roachpb.MakeTransaction("test", keyB, 1, now, maxOffset)
txn := roachpb.MakeTransaction("test", keyB, 1, now, maxOffset, int32(tc.Servers[1].SQLInstanceID()))
require.True(t, txn.ReadTimestamp.Less(txn.GlobalUncertaintyLimit))
require.Len(t, txn.ObservedTimestamps, 0)

Expand Down Expand Up @@ -1374,7 +1374,7 @@ func TestRangeLocalUncertaintyLimitAfterNewLease(t *testing.T) {
}

// Start a transaction using node2 as a gateway.
txn := roachpb.MakeTransaction("test", keyA, 1, tc.Servers[1].Clock().Now(), tc.Servers[1].Clock().MaxOffset().Nanoseconds() /* maxOffsetNs */)
txn := roachpb.MakeTransaction("test", keyA, 1 /* userPriority */, tc.Servers[1].Clock().Now(), tc.Servers[1].Clock().MaxOffset().Nanoseconds() /* maxOffsetNs */, int32(tc.Servers[1].SQLInstanceID()))
// Simulate a read to another range on node2 by setting the observed timestamp.
txn.UpdateObservedTimestamp(2, tc.Servers[1].Clock().NowAsClockTimestamp())

Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestStoreSplitAbortSpan(t *testing.T) {
left, middle, right := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")

txn := func(key roachpb.Key, ts hlc.Timestamp) *roachpb.Transaction {
txn := roachpb.MakeTransaction("test", key, 0, ts, 0)
txn := roachpb.MakeTransaction("test", key, 0, ts, 0, int32(s.SQLInstanceID()))
return &txn
}

Expand Down Expand Up @@ -556,7 +556,7 @@ func TestStoreRangeSplitIdempotency(t *testing.T) {
// Increments are a good way of testing idempotency. Up here, we
// address them to the original range, then later to the one that
// contains the key.
txn := roachpb.MakeTransaction("test", []byte("c"), 10, store.Clock().Now(), 0)
txn := roachpb.MakeTransaction("test", []byte("c"), 10, store.Clock().Now(), 0, int32(s.SQLInstanceID()))
lIncArgs := incrementArgs([]byte("apoptosis"), 100)
lTxn := txn
lTxn.Sequence++
Expand Down Expand Up @@ -3166,7 +3166,8 @@ func TestRangeLookupAsyncResolveIntent(t *testing.T) {
t.Fatal(err)
}
txn := roachpb.MakeTransaction("test", key2, 1,
store.Clock().Now(), store.Clock().MaxOffset().Nanoseconds())
store.Clock().Now(), store.Clock().MaxOffset().Nanoseconds(),
int32(s.SQLInstanceID()))
// Officially begin the transaction. If not for this, the intent resolution
// machinery would simply remove the intent we write below, see #3020.
// We send directly to Replica throughout this test, so there's no danger
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestClosedTimestampCanServe(t *testing.T) {
var baWrite roachpb.BatchRequest
r := &roachpb.DeleteRequest{}
r.Key = desc.StartKey.AsRawKey()
txn := roachpb.MakeTransaction("testwrite", r.Key, roachpb.NormalUserPriority, ts, 100)
txn := roachpb.MakeTransaction("testwrite", r.Key, roachpb.NormalUserPriority, ts, 100, int32(tc.Server(0).SQLInstanceID()))
baWrite.Txn = &txn
baWrite.Add(r)
baWrite.RangeID = repls[0].RangeID
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestClosedTimestampCantServeWithConflictingIntent(t *testing.T) {
// replica.
txnKey := desc.StartKey.AsRawKey()
txnKey = txnKey[:len(txnKey):len(txnKey)] // avoid aliasing
txn := roachpb.MakeTransaction("txn", txnKey, 0, tc.Server(0).Clock().Now(), 0)
txn := roachpb.MakeTransaction("txn", txnKey, 0, tc.Server(0).Clock().Now(), 0, int32(tc.Server(0).SQLInstanceID()))
var keys []roachpb.Key
for i := range repls {
key := append(txnKey, []byte(strconv.Itoa(i))...)
Expand Down Expand Up @@ -1321,7 +1321,7 @@ func verifyCanReadFromAllRepls(
}

func makeTxnReadBatchForDesc(desc roachpb.RangeDescriptor, ts hlc.Timestamp) roachpb.BatchRequest {
txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0)
txn := roachpb.MakeTransaction("txn", nil, 0, ts, 0, 0)

var baRead roachpb.BatchRequest
baRead.Header.RangeID = desc.RangeID
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func setupLockTableWaiterTest() (
}

func makeTxnProto(name string) roachpb.Transaction {
return roachpb.MakeTransaction(name, []byte("key"), 0, hlc.Timestamp{WallTime: 10}, 0)
return roachpb.MakeTransaction(name, []byte("key"), 0, hlc.Timestamp{WallTime: 10}, 0, 6)
}

// TestLockTableWaiterWithTxn tests the lockTableWaiter's behavior under
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/gc/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestIntentAgeThresholdSetting(t *testing.T) {
intentHlc := hlc.Timestamp{
WallTime: intentTs.Nanoseconds(),
}
txn := roachpb.MakeTransaction("txn", key, roachpb.NormalUserPriority, intentHlc, 1000)
txn := roachpb.MakeTransaction("txn", key, roachpb.NormalUserPriority, intentHlc, 1000, 0)
require.NoError(t, storage.MVCCPut(ctx, eng, nil, key, intentHlc, value, &txn))
require.NoError(t, eng.Flush())

Expand Down Expand Up @@ -169,7 +169,7 @@ func TestIntentCleanupBatching(t *testing.T) {
}
for _, prefix := range txnPrefixes {
key := []byte{prefix, objectKeys[0]}
txn := roachpb.MakeTransaction("txn", key, roachpb.NormalUserPriority, intentHlc, 1000)
txn := roachpb.MakeTransaction("txn", key, roachpb.NormalUserPriority, intentHlc, 1000, 0)
for _, suffix := range objectKeys {
key := []byte{prefix, suffix}
require.NoError(t, storage.MVCCPut(ctx, eng, nil, key, intentHlc, value, &txn))
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/intent_resolver_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ func TestReliableIntentCleanup(t *testing.T) {
roachpb.MaxUserPriority,
now.ToTimestamp(),
srv.Clock().MaxOffset().Nanoseconds(),
int32(srv.SQLInstanceID()),
)
pusher := kv.NewTxnFromProto(ctx, db, srv.NodeID(), now, kv.RootTxn, &pusherProto)
if err := pusher.Put(ctx, txnKey, []byte("pushit")); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/intentresolver/intent_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ func newTransaction(
offset = clock.MaxOffset().Nanoseconds()
now = clock.Now()
}
txn := roachpb.MakeTransaction(name, baseKey, userPriority, now, offset)
txn := roachpb.MakeTransaction(name, baseKey, userPriority, now, offset, 1 /* coordinatorNodeID */)
return &txn
}

Expand Down
Loading

0 comments on commit f7b66bf

Please sign in to comment.