Skip to content

Commit

Permalink
remove getRelationById in some case (matrixorigin#20978)
Browse files Browse the repository at this point in the history
remove getRelationById in some case

Approved by: @zhangxu19830126, @badboynt1, @qingxinhome, @aunjgr
  • Loading branch information
ouyuanning committed Dec 27, 2024
1 parent 3665a3c commit 30ad053
Show file tree
Hide file tree
Showing 14 changed files with 514 additions and 359 deletions.
518 changes: 289 additions & 229 deletions pkg/pb/pipeline/pipeline.pb.go

Large diffs are not rendered by default.

276 changes: 168 additions & 108 deletions pkg/pb/plan/plan.pb.go

Large diffs are not rendered by default.

21 changes: 20 additions & 1 deletion pkg/sql/colexec/lockop/lock_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ func (lockOp *LockOp) Prepare(proc *process.Process) error {
GetFetchRowsFunc(lockOp.targets[idx].primaryColumnType))
}
}
if len(lockOp.ctr.relations) == 0 {
lockOp.ctr.relations = make([]engine.Relation, len(lockOp.targets))
for i, target := range lockOp.targets {
if target.objRef != nil {
rel, _, err := colexec.GetRelAndPartitionRelsByObjRef(proc.Ctx, proc, lockOp.engine, target.objRef, nil)
if err != nil {
return err
}
lockOp.ctr.relations[i] = rel
}
}
}
lockOp.ctr.parker = types.NewPacker()
return nil
}
Expand Down Expand Up @@ -184,7 +196,7 @@ func performLock(
proc.Ctx,
lockOp.engine,
analyzer,
nil,
lockOp.ctr.relations[idx],
target.tableID,
proc,
priVec,
Expand Down Expand Up @@ -780,6 +792,7 @@ func (lockOp *LockOp) CopyToPipelineTarget() []*pipeline.LockTarget {
Mode: target.mode,
LockRows: plan.DeepCopyExpr(target.lockRows),
LockTableAtTheEnd: target.lockTableAtTheEnd,
ObjRef: plan.DeepCopyObjectRef(target.objRef),
}
}
return targets
Expand All @@ -788,13 +801,15 @@ func (lockOp *LockOp) CopyToPipelineTarget() []*pipeline.LockTarget {
// AddLockTarget add lock target, LockMode_Exclusive will used
func (lockOp *LockOp) AddLockTarget(
tableID uint64,
objRef *plan.ObjectRef,
primaryColumnIndexInBatch int32,
primaryColumnType types.Type,
refreshTimestampIndexInBatch int32,
lockRows *plan.Expr,
lockTableAtTheEnd bool) *LockOp {
return lockOp.AddLockTargetWithMode(
tableID,
objRef,
lock.LockMode_Exclusive,
primaryColumnIndexInBatch,
primaryColumnType,
Expand All @@ -806,6 +821,7 @@ func (lockOp *LockOp) AddLockTarget(
// AddLockTargetWithMode add lock target with lock mode
func (lockOp *LockOp) AddLockTargetWithMode(
tableID uint64,
objRef *plan.ObjectRef,
mode lock.LockMode,
primaryColumnIndexInBatch int32,
primaryColumnType types.Type,
Expand All @@ -814,6 +830,7 @@ func (lockOp *LockOp) AddLockTargetWithMode(
lockTableAtTheEnd bool) *LockOp {
lockOp.targets = append(lockOp.targets, lockTarget{
tableID: tableID,
objRef: objRef,
primaryColumnIndexInBatch: primaryColumnIndexInBatch,
primaryColumnType: primaryColumnType,
refreshTimestampIndexInBatch: refreshTimestampIndexInBatch,
Expand Down Expand Up @@ -897,6 +914,7 @@ func (lockOp *LockOp) AddLockTargetWithPartitionAndMode(
// only one partition table, process as normal table
if len(tableIDs) == 1 {
return lockOp.AddLockTarget(tableIDs[0],
nil,
primaryColumnIndexInBatch,
primaryColumnType,
refreshTimestampIndexInBatch,
Expand Down Expand Up @@ -925,6 +943,7 @@ func (lockOp *LockOp) Reset(proc *process.Process, pipelineFailed bool, err erro
lockOp.resetParker()
lockOp.ctr.retryError = nil
lockOp.ctr.defChanged = false
lockOp.ctr.relations = nil
}

// Free free mem
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/lockop/lock_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func runLockNonBlockingOpTest(
IsLast: false,
}
for idx, table := range tables {
arg.AddLockTarget(table, offset, pkType, offset+1, nil, true)
arg.AddLockTarget(table, nil, offset, pkType, offset+1, nil, true)

vec := vector.NewVec(pkType)
vector.AppendFixedList(vec, values[idx], nil, proc.Mp())
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/lockop/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (lockOp *LockOp) Release() {

type lockTarget struct {
tableID uint64
objRef *plan.ObjectRef
primaryColumnIndexInBatch int32
refreshTimestampIndexInBatch int32
primaryColumnType types.Type
Expand Down Expand Up @@ -133,6 +134,7 @@ type state struct {
retryError error
defChanged bool
fetchers []FetchLockRowsFunc
relations []engine.Relation
hasNewVersionInRange hasNewVersionInRangeFunc
lockCount int64
}
41 changes: 23 additions & 18 deletions pkg/sql/colexec/preinsert/preinsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,26 +283,31 @@ func genAutoIncrCol(bat *batch.Batch, proc *proc, preInsert *PreInsert) error {
currentTxn.AddWorkspace(ws)
ws.BindTxnOp(currentTxn)
}
if _, _, rel, err := eng.GetRelationById(proc.Ctx, currentTxn, tableID); err == nil {
for col, idx := range needReCheck {
vec := bat.GetVector(int32(idx))
from, err := proc.GetIncrService().GetLastAllocateTS(proc.Ctx, tableID, col)
if err != nil {
return err
}
fromTs := types.TimestampToTS(from)
toTs := types.TimestampToTS(proc.Base.TxnOperator.SnapshotTS())
if mayChanged, err := rel.PrimaryKeysMayBeUpserted(proc.Ctx, fromTs, toTs, vec); err == nil {
if mayChanged {
logutil.Debugf("user may have manually specified the value to be inserted into the auto pk col before this transaction.")
return moerr.NewTxnNeedRetry(proc.Ctx)
}
} else {
return err
db, err := eng.Database(proc.Ctx, preInsert.SchemaName, currentTxn)
if err != nil {
return err
}
rel, err := db.Relation(proc.Ctx, preInsert.TableDef.Name, nil)
if err != nil {
return err
}

for col, idx := range needReCheck {
vec := bat.GetVector(int32(idx))
from, err := proc.GetIncrService().GetLastAllocateTS(proc.Ctx, tableID, col)
if err != nil {
return err
}
fromTs := types.TimestampToTS(from)
toTs := types.TimestampToTS(proc.Base.TxnOperator.SnapshotTS())
if mayChanged, err := rel.PrimaryKeysMayBeUpserted(proc.Ctx, fromTs, toTs, vec); err == nil {
if mayChanged {
logutil.Debugf("user may have manually specified the value to be inserted into the auto pk col before this transaction.")
return moerr.NewTxnNeedRetry(proc.Ctx)
}
} else {
return err
}
} else {
return err
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func constructLockOp(n *plan.Node, eng engine.Engine) (*lockop.LockOp, error) {
if target.IsPartitionTable {
arg.AddLockTargetWithPartition(target.GetPartitionTableIds(), target.GetPrimaryColIdxInBat(), typ, target.GetRefreshTsIdxInBat(), target.GetLockRows(), target.GetLockTableAtTheEnd(), target.GetFilterColIdxInBat())
} else {
arg.AddLockTarget(target.GetTableId(), target.GetPrimaryColIdxInBat(), typ, target.GetRefreshTsIdxInBat(), target.GetLockRows(), target.GetLockTableAtTheEnd())
arg.AddLockTarget(target.GetTableId(), target.GetObjRef(), target.GetPrimaryColIdxInBat(), typ, target.GetRefreshTsIdxInBat(), target.GetLockRows(), target.GetLockTableAtTheEnd())
}

}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/remoterun.go
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,7 @@ func convertToVmOperator(opr *pipeline.Instruction, ctx *scopeContext, eng engin
lockArg := lockop.NewArgumentByEngine(eng)
for _, target := range t.Targets {
typ := plan2.MakeTypeByPlan2Type(target.PrimaryColTyp)
lockArg.AddLockTarget(target.GetTableId(), target.GetPrimaryColIdxInBat(), typ, target.GetRefreshTsIdxInBat(), target.GetLockRows(), target.GetLockTableAtTheEnd())
lockArg.AddLockTarget(target.GetTableId(), target.GetObjRef(), target.GetPrimaryColIdxInBat(), typ, target.GetRefreshTsIdxInBat(), target.GetLockRows(), target.GetLockTableAtTheEnd())
}
for _, target := range t.Targets {
if target.LockTable {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/plan/bind_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func (builder *QueryBuilder) bindDelete(stmt *tree.Delete, bindCtx *BindContext)
if col.Name == tableDef.Pkey.PkeyColName {
lockTarget := &plan.LockTarget{
TableId: tableDef.TblId,
ObjRef: DeepCopyObjectRef(dmlCtx.objRefs[i]),
PrimaryColIdxInBat: int32(pkPos),
PrimaryColRelPos: selectNodeTag,
PrimaryColTyp: col.Typ,
Expand Down Expand Up @@ -310,6 +311,7 @@ func (builder *QueryBuilder) bindDelete(stmt *tree.Delete, bindCtx *BindContext)
if col.Name == idxNode.TableDef.Pkey.PkeyColName {
lockTargets = append(lockTargets, &plan.LockTarget{
TableId: idxNode.TableDef.TblId,
ObjRef: DeepCopyObjectRef(idxNode.ObjRef),
PrimaryColIdxInBat: int32(pkPos),
PrimaryColRelPos: idxNode.BindingTags[0],
PrimaryColTyp: col.Typ,
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/plan/bind_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert(
if col.Name == pkName && pkName != catalog.FakePrimaryKeyColName {
lockTarget := &plan.LockTarget{
TableId: tableDef.TblId,
ObjRef: DeepCopyObjectRef(objRef),
PrimaryColIdxInBat: int32(colName2Idx[tableDef.Name+"."+col.Name]),
PrimaryColRelPos: selectTag,
PrimaryColTyp: col.Typ,
Expand Down Expand Up @@ -218,6 +219,7 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert(
}
lockTarget := &plan.LockTarget{
TableId: idxTableDef.TblId,
ObjRef: DeepCopyObjectRef(dmlCtx.objRefs[0]),
PrimaryColIdxInBat: pkIdxInBat,
PrimaryColRelPos: selectTag,
PrimaryColTyp: selectNode.ProjectList[int(pkIdxInBat)].Typ,
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/plan/bind_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ func (builder *QueryBuilder) bindUpdate(stmt *tree.Update, bindCtx *BindContext)
if col.Name == tableDef.Pkey.PkeyColName {
lockTarget := &plan.LockTarget{
TableId: tableDef.TblId,
ObjRef: DeepCopyObjectRef(dmlCtx.objRefs[i]),
PrimaryColIdxInBat: int32(finalColIdx),
PrimaryColRelPos: finalProjTag,
PrimaryColTyp: col.Typ,
Expand All @@ -337,6 +338,7 @@ func (builder *QueryBuilder) bindUpdate(stmt *tree.Update, bindCtx *BindContext)
// need lock oldPk by old partition idx, lock new pk by new partition idx
lockTarget := &plan.LockTarget{
TableId: tableDef.TblId,
ObjRef: DeepCopyObjectRef(dmlCtx.objRefs[i]),
PrimaryColIdxInBat: int32(finalColIdx),
PrimaryColRelPos: finalProjTag,
PrimaryColTyp: col.Typ,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/plan/deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func DeepCopyLockTarget(target *plan.LockTarget) *plan.LockTarget {
}
return &plan.LockTarget{
TableId: target.TableId,
ObjRef: DeepCopyObjectRef(target.ObjRef),
PrimaryColIdxInBat: target.PrimaryColIdxInBat,
PrimaryColTyp: target.PrimaryColTyp,
RefreshTsIdxInBat: target.RefreshTsIdxInBat,
Expand Down
1 change: 1 addition & 0 deletions proto/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ message LockTarget {
lock.LockMode Mode = 8;
plan.Expr lock_rows = 9;
bool lock_table_at_the_end = 10;
plan.ObjectRef obj_ref = 11;
}

message LockOp {
Expand Down
1 change: 1 addition & 0 deletions proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,7 @@ message LockTarget {
int32 filter_col_rel_pos = 12;
Expr lock_rows = 13;
bool lock_table_at_the_end = 14;
ObjectRef obj_ref = 15;
}

message PreInsertUkCtx {
Expand Down

0 comments on commit 30ad053

Please sign in to comment.