Skip to content

Commit

Permalink
Update read write confilict check (#20897)
Browse files Browse the repository at this point in the history
Update read write confilict check. Not wait prev TXN to commit in queue.

Approved by: @XuPeng-SH
  • Loading branch information
jiangxinmeng1 authored Dec 28, 2024
1 parent 96ac25f commit 94569b4
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 7 deletions.
103 changes: 103 additions & 0 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10777,6 +10777,40 @@ func TestReplayDebugLog(t *testing.T) {
tae.Restart(ctx)
}

func TestRW2(t *testing.T) {
ctx := context.Background()
opts := config.WithLongScanAndCKPOpts(nil)
tae := testutil.NewTestEngine(ctx, ModuleName, t, opts)

schema := catalog.MockSchemaAll(1, -1)
schema.Extra.BlockMaxRows = 2
schema.Extra.ObjectMaxBlocks = 4
tae.BindSchema(schema)
bat := catalog.MockBatch(schema, 1)
defer bat.Close()
/*
create obj
txn1 delete row/warchecker.add
txn2 drop obj(mock delete)
txn2 commit
txn1 commit
*/
tae.CreateRelAndAppend(bat, true)

txn, rel := tae.GetRelation()
obj := testutil.GetOneBlockMeta(rel)
rel.Append(ctx, bat)
{
tbl := obj.GetTable()
txn2, _ := tae.StartTxn(nil)
tbl.DropObjectEntry(obj.ID(), txn2, false)
txn2.ToPreparingLocked(tae.TxnMgr.Now())
}
err := txn.Commit(ctx)
assert.True(t, moerr.IsMoErrCode(err, moerr.ErrTxnRWConflict))
}

func Test_BasicTxnModeSwitch(t *testing.T) {
ctx := context.Background()
opts := config.WithLongScanAndCKPOpts(nil)
Expand Down Expand Up @@ -10817,3 +10851,72 @@ func Test_OpenReplayDB1(t *testing.T) {
assert.Error(t, db.AddCronJob(tae.DB, "unknown", false))
assert.Error(t, db.CheckCronJobs(tae.DB, db.DBTxnMode_Write))
}

func TestRW3(t *testing.T) {
ctx := context.Background()
opts := config.WithLongScanAndCKPOpts(nil)
tae := testutil.NewTestEngine(ctx, ModuleName, t, opts)

objCount := 100
schema := catalog.MockSchemaAll(1, -1)
schema.Extra.BlockMaxRows = 1
schema.Extra.ObjectMaxBlocks = 4
tae.BindSchema(schema)
bat := catalog.MockBatch(schema, objCount)
defer bat.Close()
tae.CreateRelAndAppend(bat, true)

objs := make([]*catalog.ObjectEntry, 0)
txn, rel := tae.GetRelation()
iter := rel.MakeObjectIt(false)
for iter.Next() {
obj := iter.GetObject().GetMeta().(*catalog.ObjectEntry)
objs = append(objs, obj)
}
assert.Equal(t, objCount, len(objs))
err := txn.Commit(ctx)
assert.NoError(t, err)

var wg sync.WaitGroup

txn, rel = tae.GetRelation()
pkVec := containers.MakeVector(types.T_uint64.ToType(), common.DefaultAllocator)
defer pkVec.Close()
rowIDVec := containers.MakeVector(types.T_Rowid.ToType(), common.DefaultAllocator)
defer rowIDVec.Close()
for i, obj := range objs {
rowID := objectio.NewRowIDWithObjectIDBlkNumAndRowID(*obj.ID(), 0, 0)
rowIDVec.Append(rowID, false)
pkVec.Append(uint64(i), false)
}
err = rel.DeleteByPhyAddrKeys(rowIDVec, pkVec, handle.DT_Normal)
assert.NoError(t, err)
{
deleteObjectFn := func(offset int) func() {
return func() {
defer wg.Done()
txn, err := tae.StartTxn(nil)
assert.NoError(t, err)
obj := objs[offset]
task, err := jobs.NewFlushTableTailTask(nil, txn, []*catalog.ObjectEntry{obj}, nil, tae.Runtime)
assert.NoError(t, err)
err = task.OnExec(context.Background())
assert.NoError(t, err)
err = txn.Commit(ctx)
assert.NoError(t, err)
}
}
workers, err := ants.NewPool(50)
assert.NoError(t, err)
for i := 0; i < objCount; i++ {
wg.Add(1)
workers.Submit(deleteObjectFn(i))
}
}
err = txn.Commit(ctx)
assert.NoError(t, err)
wg.Wait()

tae.CheckRowsByScan(0, true)

}
18 changes: 14 additions & 4 deletions pkg/vm/engine/tae/txn/txnimpl/antidepend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,24 @@ import (

var ErrRWConflict = moerr.NewTxnRWConflictNoCtx()

func readWriteConfilictCheck(entry *catalog.ObjectEntry, ts types.TS) (err error) {
func readWriteConfilictCheck(entry *catalog.ObjectEntry, ts types.TS, inqueue bool) (err error) {
lastNode := entry.GetLatestNode()
if !lastNode.HasDropIntent() {
return nil
}
needWait, txnToWait := lastNode.GetLastMVCCNode().NeedWaitCommitting(ts)
// TODO:
// I don't think we need to wait here any more. `block` and `Object` are
// local metadata and never be involved in a 2PC txn. So a prepared `block`
// will never be rollbacked
if needWait {
if inqueue {
deleteTS := txnToWait.GetPrepareTS()
if deleteTS.LT(&ts) {
err = ErrRWConflict
}
return
}
txnToWait.GetTxnState(true)
lastNode = entry.GetLatestNode()
}
Expand Down Expand Up @@ -119,7 +129,7 @@ func (checker *warChecker) Insert(obj *catalog.ObjectEntry) {
checker.readSet[*obj.ID()] = obj
}

func (checker *warChecker) checkOne(id *common.ID, ts types.TS) (err error) {
func (checker *warChecker) checkOne(id *common.ID, ts types.TS, inqueue bool) (err error) {
// defer func() {
// logutil.Infof("checkOne blk=%s ts=%s err=%v", id.BlockString(), ts.ToString(), err)
// }()
Expand All @@ -131,12 +141,12 @@ func (checker *warChecker) checkOne(id *common.ID, ts types.TS) (err error) {
if entry == nil {
return
}
return readWriteConfilictCheck(entry, ts)
return readWriteConfilictCheck(entry, ts, inqueue)
}

func (checker *warChecker) checkAll(ts types.TS) (err error) {
for _, obj := range checker.readSet {
if err = readWriteConfilictCheck(obj, ts); err != nil {
if err = readWriteConfilictCheck(obj, ts, true); err != nil {
logutil.Error(
"Txn-Check-All",
zap.Error(err),
Expand Down
10 changes: 7 additions & 3 deletions pkg/vm/engine/tae/txn/txnimpl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (tbl *txnTable) TransferDeleteIntent(
panic(err)
}
ts := types.BuildTS(time.Now().UTC().UnixNano(), 0)
if err = readWriteConfilictCheck(entry, ts); err == nil {
if err = readWriteConfilictCheck(entry, ts, false); err == nil {
return
}
err = nil
Expand Down Expand Up @@ -420,6 +420,7 @@ func (tbl *txnTable) TransferDeletes(
if err = tbl.store.warChecker.checkOne(
id,
ts,
phase == txnif.PrePreparePhase,
); err == nil {
continue
}
Expand Down Expand Up @@ -470,6 +471,7 @@ func (tbl *txnTable) recurTransferDelete(
pkType *types.Type,
depth int,
ts types.TS,
phase string,
) error {

var page2 *common.PinnedItem[*model.TransferHashPage]
Expand Down Expand Up @@ -504,6 +506,7 @@ func (tbl *txnTable) recurTransferDelete(
err = readWriteConfilictCheck(
obj,
ts,
phase == txnif.PrePreparePhase,
)
if err == nil {
pkVec := tbl.store.rt.VectorPool.Small.GetVector(pkType)
Expand Down Expand Up @@ -554,7 +557,8 @@ func (tbl *txnTable) recurTransferDelete(
pk,
pkType,
depth+1,
ts)
ts,
phase)
}

func (tbl *txnTable) TransferDeleteRows(
Expand Down Expand Up @@ -610,7 +614,7 @@ func (tbl *txnTable) TransferDeleteRows(
page := pinned.Item()
depth := 0
if err = tbl.recurTransferDelete(
memo, page, id, row, pk, pkType, depth, ts); err != nil {
memo, page, id, row, pk, pkType, depth, ts, phase); err != nil {
return
}

Expand Down

0 comments on commit 94569b4

Please sign in to comment.