Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update read write confilict check #20897

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10777,6 +10777,40 @@ func TestReplayDebugLog(t *testing.T) {
tae.Restart(ctx)
}

func TestRW2(t *testing.T) {
ctx := context.Background()
opts := config.WithLongScanAndCKPOpts(nil)
tae := testutil.NewTestEngine(ctx, ModuleName, t, opts)

schema := catalog.MockSchemaAll(1, -1)
schema.Extra.BlockMaxRows = 2
schema.Extra.ObjectMaxBlocks = 4
tae.BindSchema(schema)
bat := catalog.MockBatch(schema, 1)
defer bat.Close()
/*
create obj
txn1 delete row/warchecker.add
txn2 drop obj(mock delete)

txn2 commit
txn1 commit
*/
tae.CreateRelAndAppend(bat, true)

txn, rel := tae.GetRelation()
obj := testutil.GetOneBlockMeta(rel)
rel.Append(ctx, bat)
{
tbl := obj.GetTable()
txn2, _ := tae.StartTxn(nil)
tbl.DropObjectEntry(obj.ID(), txn2, false)
txn2.ToPreparingLocked(tae.TxnMgr.Now())
}
err := txn.Commit(ctx)
assert.True(t, moerr.IsMoErrCode(err, moerr.ErrTxnRWConflict))
}

func Test_BasicTxnModeSwitch(t *testing.T) {
ctx := context.Background()
opts := config.WithLongScanAndCKPOpts(nil)
Expand Down Expand Up @@ -10817,3 +10851,72 @@ func Test_OpenReplayDB1(t *testing.T) {
assert.Error(t, db.AddCronJob(tae.DB, "unknown", false))
assert.Error(t, db.CheckCronJobs(tae.DB, db.DBTxnMode_Write))
}

func TestRW3(t *testing.T) {
ctx := context.Background()
opts := config.WithLongScanAndCKPOpts(nil)
tae := testutil.NewTestEngine(ctx, ModuleName, t, opts)

objCount := 100
schema := catalog.MockSchemaAll(1, -1)
schema.Extra.BlockMaxRows = 1
schema.Extra.ObjectMaxBlocks = 4
tae.BindSchema(schema)
bat := catalog.MockBatch(schema, objCount)
defer bat.Close()
tae.CreateRelAndAppend(bat, true)

objs := make([]*catalog.ObjectEntry, 0)
txn, rel := tae.GetRelation()
iter := rel.MakeObjectIt(false)
for iter.Next() {
obj := iter.GetObject().GetMeta().(*catalog.ObjectEntry)
objs = append(objs, obj)
}
assert.Equal(t, objCount, len(objs))
err := txn.Commit(ctx)
assert.NoError(t, err)

var wg sync.WaitGroup

txn, rel = tae.GetRelation()
pkVec := containers.MakeVector(types.T_uint64.ToType(), common.DefaultAllocator)
defer pkVec.Close()
rowIDVec := containers.MakeVector(types.T_Rowid.ToType(), common.DefaultAllocator)
defer rowIDVec.Close()
for i, obj := range objs {
rowID := objectio.NewRowIDWithObjectIDBlkNumAndRowID(*obj.ID(), 0, 0)
rowIDVec.Append(rowID, false)
pkVec.Append(uint64(i), false)
}
err = rel.DeleteByPhyAddrKeys(rowIDVec, pkVec, handle.DT_Normal)
assert.NoError(t, err)
{
deleteObjectFn := func(offset int) func() {
return func() {
defer wg.Done()
txn, err := tae.StartTxn(nil)
assert.NoError(t, err)
obj := objs[offset]
task, err := jobs.NewFlushTableTailTask(nil, txn, []*catalog.ObjectEntry{obj}, nil, tae.Runtime)
assert.NoError(t, err)
err = task.OnExec(context.Background())
assert.NoError(t, err)
err = txn.Commit(ctx)
assert.NoError(t, err)
}
}
workers, err := ants.NewPool(50)
assert.NoError(t, err)
for i := 0; i < objCount; i++ {
wg.Add(1)
workers.Submit(deleteObjectFn(i))
}
}
err = txn.Commit(ctx)
assert.NoError(t, err)
wg.Wait()

tae.CheckRowsByScan(0, true)

}
18 changes: 14 additions & 4 deletions pkg/vm/engine/tae/txn/txnimpl/antidepend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,24 @@ import (

var ErrRWConflict = moerr.NewTxnRWConflictNoCtx()

func readWriteConfilictCheck(entry *catalog.ObjectEntry, ts types.TS) (err error) {
func readWriteConfilictCheck(entry *catalog.ObjectEntry, ts types.TS, inqueue bool) (err error) {
lastNode := entry.GetLatestNode()
if !lastNode.HasDropIntent() {
return nil
}
needWait, txnToWait := lastNode.GetLastMVCCNode().NeedWaitCommitting(ts)
// TODO:
// I don't think we need to wait here any more. `block` and `Object` are
// local metadata and never be involved in a 2PC txn. So a prepared `block`
// will never be rollbacked
if needWait {
if inqueue {
deleteTS := txnToWait.GetPrepareTS()
if deleteTS.LT(&ts) {
err = ErrRWConflict
}
return
}
txnToWait.GetTxnState(true)
lastNode = entry.GetLatestNode()
}
Expand Down Expand Up @@ -119,7 +129,7 @@ func (checker *warChecker) Insert(obj *catalog.ObjectEntry) {
checker.readSet[*obj.ID()] = obj
}

func (checker *warChecker) checkOne(id *common.ID, ts types.TS) (err error) {
func (checker *warChecker) checkOne(id *common.ID, ts types.TS, inqueue bool) (err error) {
// defer func() {
// logutil.Infof("checkOne blk=%s ts=%s err=%v", id.BlockString(), ts.ToString(), err)
// }()
Expand All @@ -131,12 +141,12 @@ func (checker *warChecker) checkOne(id *common.ID, ts types.TS) (err error) {
if entry == nil {
return
}
return readWriteConfilictCheck(entry, ts)
return readWriteConfilictCheck(entry, ts, inqueue)
}

func (checker *warChecker) checkAll(ts types.TS) (err error) {
for _, obj := range checker.readSet {
if err = readWriteConfilictCheck(obj, ts); err != nil {
if err = readWriteConfilictCheck(obj, ts, true); err != nil {
logutil.Error(
"Txn-Check-All",
zap.Error(err),
Expand Down
10 changes: 7 additions & 3 deletions pkg/vm/engine/tae/txn/txnimpl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (tbl *txnTable) TransferDeleteIntent(
panic(err)
}
ts := types.BuildTS(time.Now().UTC().UnixNano(), 0)
if err = readWriteConfilictCheck(entry, ts); err == nil {
if err = readWriteConfilictCheck(entry, ts, false); err == nil {
return
}
err = nil
Expand Down Expand Up @@ -420,6 +420,7 @@ func (tbl *txnTable) TransferDeletes(
if err = tbl.store.warChecker.checkOne(
id,
ts,
phase == txnif.PrePreparePhase,
); err == nil {
continue
}
Expand Down Expand Up @@ -470,6 +471,7 @@ func (tbl *txnTable) recurTransferDelete(
pkType *types.Type,
depth int,
ts types.TS,
phase string,
) error {

var page2 *common.PinnedItem[*model.TransferHashPage]
Expand Down Expand Up @@ -504,6 +506,7 @@ func (tbl *txnTable) recurTransferDelete(
err = readWriteConfilictCheck(
obj,
ts,
phase == txnif.PrePreparePhase,
)
if err == nil {
pkVec := tbl.store.rt.VectorPool.Small.GetVector(pkType)
Expand Down Expand Up @@ -554,7 +557,8 @@ func (tbl *txnTable) recurTransferDelete(
pk,
pkType,
depth+1,
ts)
ts,
phase)
}

func (tbl *txnTable) TransferDeleteRows(
Expand Down Expand Up @@ -610,7 +614,7 @@ func (tbl *txnTable) TransferDeleteRows(
page := pinned.Item()
depth := 0
if err = tbl.recurTransferDelete(
memo, page, id, row, pk, pkType, depth, ts); err != nil {
memo, page, id, row, pk, pkType, depth, ts, phase); err != nil {
return
}

Expand Down
Loading