Skip to content

Commit

Permalink
Merge branch 'main' into fix-gc-tombstone
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Dec 27, 2024
2 parents 1df3736 + 6731696 commit 76ff503
Show file tree
Hide file tree
Showing 44 changed files with 1,801 additions and 886 deletions.
12 changes: 8 additions & 4 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func TestBackupData(t *testing.T) {
locations = append(locations, backupTime.Format(time.DateTime))
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
assert.Nil(t, err)
db.BGCheckpointRunner.DisableCheckpoint()
err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
assert.NoError(t, err)
locations = append(locations, location)
checkpoints := db.BGCheckpointRunner.GetAllCheckpoints()
files := make(map[string]string, 0)
Expand Down Expand Up @@ -232,7 +233,8 @@ func TestBackupData2(t *testing.T) {
locations = append(locations, backupTime.Format(time.DateTime))
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
assert.Nil(t, err)
db.BGCheckpointRunner.DisableCheckpoint()
err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
assert.NoError(t, err)
locations = append(locations, location)
compacted := db.BGCheckpointRunner.GetCompacted()
checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted)
Expand Down Expand Up @@ -302,7 +304,8 @@ func TestBackupData3(t *testing.T) {
locations = append(locations, backupTime.Format(time.DateTime))
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
assert.Nil(t, err)
db.BGCheckpointRunner.DisableCheckpoint()
err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
assert.NoError(t, err)
locations = append(locations, location)
compacted := db.BGCheckpointRunner.GetCompacted()
checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted)
Expand Down Expand Up @@ -382,7 +385,8 @@ func TestBackupData4(t *testing.T) {
locations = append(locations, backupTime.Format(time.DateTime))
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
assert.Nil(t, err)
db.BGCheckpointRunner.DisableCheckpoint()
err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
assert.NoError(t, err)
locations = append(locations, location)
compacted := db.BGCheckpointRunner.GetCompacted()
checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted)
Expand Down
16 changes: 14 additions & 2 deletions pkg/frontend/show_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,23 @@ func tryGetSizeFromMTS(
de = eng.(*engine.EntireEngine).Engine.(*disttae.Engine)
}

vals, accs, err, ok = de.QueryTableStatsByAccounts(
// if len(account id) == 1 and is not sys account
//
// case 1:
// show accounts for new account
//
// case 2:
// show accounts like "xxx"
forceUpdate := false
if len(accs) == 1 && accs[0] != uint64(sysAccountID) {
forceUpdate = true
}

vals, accs, ok, err = de.QueryTableStatsByAccounts(
ctx,
[]int{disttae.TableStatsTableSize},
accs,
false,
forceUpdate,
false,
)

Expand Down
6 changes: 3 additions & 3 deletions pkg/objectio/injects.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func CheckpointSaveInjected() (string, bool) {
return sarg, injected
}

func InjectCheckpointSave(msg string) (rmFault func(), err error) {
func InjectCheckpointSave(msg string) (rmFault func() (bool, error), err error) {
if err = fault.AddFaultPoint(
context.Background(),
FJ_CheckpointSave,
Expand All @@ -281,8 +281,8 @@ func InjectCheckpointSave(msg string) (rmFault func(), err error) {
); err != nil {
return
}
rmFault = func() {
fault.RemoveFaultPoint(context.Background(), FJ_CheckpointSave)
rmFault = func() (ok bool, err error) {
return fault.RemoveFaultPoint(context.Background(), FJ_CheckpointSave)
}
return
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/pb/query/query.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkg/sql/colexec/anti/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func (antiJoin *AntiJoin) Prepare(proc *process.Process) (err error) {
}
}

return antiJoin.PrepareProjection(proc)
}
return nil
}
Expand Down
26 changes: 5 additions & 21 deletions pkg/sql/colexec/anti/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type AntiJoin struct {
JoinMapTag int32

vm.OperatorBase
colexec.Projection
}

func (antiJoin *AntiJoin) GetOperatorBase() *vm.OperatorBase {
Expand Down Expand Up @@ -114,18 +113,11 @@ func (antiJoin *AntiJoin) Reset(proc *process.Process, pipelineFailed bool, err
ctr.batchRowCount = 0
ctr.eligible = ctr.eligible[:0]

if antiJoin.ProjectList != nil {
if antiJoin.OpAnalyzer != nil {
antiJoin.OpAnalyzer.Alloc(antiJoin.ProjectAllocSize + antiJoin.ctr.maxAllocSize)
}
antiJoin.ctr.maxAllocSize = 0
antiJoin.ResetProjection(proc)
} else {
if antiJoin.OpAnalyzer != nil {
antiJoin.OpAnalyzer.Alloc(antiJoin.ctr.maxAllocSize)
}
antiJoin.ctr.maxAllocSize = 0
if antiJoin.OpAnalyzer != nil {
antiJoin.OpAnalyzer.Alloc(antiJoin.ctr.maxAllocSize)
}
antiJoin.ctr.maxAllocSize = 0

}

func (antiJoin *AntiJoin) Free(proc *process.Process, pipelineFailed bool, err error) {
Expand All @@ -136,18 +128,10 @@ func (antiJoin *AntiJoin) Free(proc *process.Process, pipelineFailed bool, err e
ctr.cleanExprExecutor()
ctr.cleanBatch(proc)

if antiJoin.ProjectList != nil {
antiJoin.FreeProjection(proc)
}
}

func (antiJoin *AntiJoin) ExecProjection(proc *process.Process, input *batch.Batch) (*batch.Batch, error) {
batch := input
var err error
if antiJoin.ProjectList != nil {
batch, err = antiJoin.EvalProjection(input, proc)
}
return batch, err
return input, nil
}

func (ctr *container) resetExprExecutor() {
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/colexec/indexjoin/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ func (indexJoin *IndexJoin) Prepare(proc *process.Process) (err error) {
indexJoin.OpAnalyzer.Reset()
}

if indexJoin.ProjectList != nil {
err = indexJoin.PrepareProjection(proc)
}
if indexJoin.ctr.buf == nil {
indexJoin.ctr.buf = batch.NewWithSize(len(indexJoin.Result))
}
Expand Down
16 changes: 1 addition & 15 deletions pkg/sql/colexec/indexjoin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/reuse"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)
Expand All @@ -41,7 +40,6 @@ type IndexJoin struct {
RuntimeFilterSpecs []*plan.RuntimeFilterSpec

vm.OperatorBase
colexec.Projection
}

func (indexJoin *IndexJoin) GetOperatorBase() *vm.OperatorBase {
Expand Down Expand Up @@ -80,27 +78,15 @@ func (indexJoin *IndexJoin) Reset(proc *process.Process, pipelineFailed bool, er
if indexJoin.ctr.buf != nil {
indexJoin.ctr.buf.CleanOnlyData()
}
if indexJoin.ProjectList != nil {
if indexJoin.OpAnalyzer != nil {
indexJoin.OpAnalyzer.Alloc(indexJoin.ProjectAllocSize)
}
indexJoin.ResetProjection(proc)
}
}

func (indexJoin *IndexJoin) Free(proc *process.Process, pipelineFailed bool, err error) {
if indexJoin.ctr.buf != nil {
indexJoin.ctr.buf.Clean(proc.Mp())
indexJoin.ctr.buf = nil
}
indexJoin.FreeProjection(proc)
}

func (indexJoin *IndexJoin) ExecProjection(proc *process.Process, input *batch.Batch) (*batch.Batch, error) {
batch := input
var err error
if indexJoin.ProjectList != nil {
batch, err = indexJoin.EvalProjection(input, proc)
}
return batch, err
return input, nil
}
1 change: 0 additions & 1 deletion pkg/sql/colexec/join/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func (innerJoin *InnerJoin) Prepare(proc *process.Process) (err error) {
return err
}
}
return innerJoin.PrepareProjection(proc)
}
return err
}
Expand Down
25 changes: 5 additions & 20 deletions pkg/sql/colexec/join/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ type InnerJoin struct {
JoinMapTag int32

vm.OperatorBase
colexec.Projection
}

func (innerJoin *InnerJoin) GetOperatorBase() *vm.OperatorBase {
Expand Down Expand Up @@ -130,19 +129,11 @@ func (innerJoin *InnerJoin) Reset(proc *process.Process, pipelineFailed bool, er
ctr.state = Build
ctr.batchRowCount = 0

if innerJoin.ProjectList != nil {
if innerJoin.OpAnalyzer != nil {
innerJoin.OpAnalyzer.Alloc(innerJoin.ProjectAllocSize + innerJoin.ctr.maxAllocSize)
}

innerJoin.ctr.maxAllocSize = 0
innerJoin.ResetProjection(proc)
} else {
if innerJoin.OpAnalyzer != nil {
innerJoin.OpAnalyzer.Alloc(innerJoin.ctr.maxAllocSize)
}
innerJoin.ctr.maxAllocSize = 0
if innerJoin.OpAnalyzer != nil {
innerJoin.OpAnalyzer.Alloc(innerJoin.ctr.maxAllocSize)
}
innerJoin.ctr.maxAllocSize = 0

}

func (innerJoin *InnerJoin) Free(proc *process.Process, pipelineFailed bool, err error) {
Expand All @@ -152,16 +143,10 @@ func (innerJoin *InnerJoin) Free(proc *process.Process, pipelineFailed bool, err
ctr.cleanExprExecutor()
ctr.cleanBatch(proc)

innerJoin.FreeProjection(proc)
}

func (innerJoin *InnerJoin) ExecProjection(proc *process.Process, input *batch.Batch) (*batch.Batch, error) {
batch := input
var err error
if innerJoin.ProjectList != nil {
batch, err = innerJoin.EvalProjection(input, proc)
}
return batch, err
return input, nil
}

func (ctr *container) resetExprExecutor() {
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/colexec/left/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func (leftJoin *LeftJoin) Prepare(proc *process.Process) (err error) {
return err
}
}
return leftJoin.PrepareProjection(proc)
}
return err
}
Expand Down Expand Up @@ -137,10 +136,7 @@ func (leftJoin *LeftJoin) Call(proc *process.Process) (vm.CallResult, error) {
return result, moerr.NewInternalErrorNoCtx("left join hanging")
}

result.Batch, err = leftJoin.EvalProjection(probeResult.Batch, proc)
if err != nil {
return result, err
}
result.Batch = probeResult.Batch
return result, nil

default:
Expand Down
16 changes: 3 additions & 13 deletions pkg/sql/colexec/left/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ type LeftJoin struct {
JoinMapTag int32

vm.OperatorBase
colexec.Projection
}

func (leftJoin *LeftJoin) GetOperatorBase() *vm.OperatorBase {
Expand Down Expand Up @@ -118,18 +117,10 @@ func (leftJoin *LeftJoin) Reset(proc *process.Process, pipelineFailed bool, err
ctr.state = Build
ctr.batchRowCount = 0

if leftJoin.ProjectList != nil {
if leftJoin.OpAnalyzer != nil {
leftJoin.OpAnalyzer.Alloc(leftJoin.ProjectAllocSize + leftJoin.ctr.maxAllocSize)
}
leftJoin.ctr.maxAllocSize = 0
leftJoin.ResetProjection(proc)
} else {
if leftJoin.OpAnalyzer != nil {
leftJoin.OpAnalyzer.Alloc(leftJoin.ctr.maxAllocSize)
}
leftJoin.ctr.maxAllocSize = 0
if leftJoin.OpAnalyzer != nil {
leftJoin.OpAnalyzer.Alloc(leftJoin.ctr.maxAllocSize)
}
leftJoin.ctr.maxAllocSize = 0
}

func (leftJoin *LeftJoin) Free(proc *process.Process, pipelineFailed bool, err error) {
Expand All @@ -139,7 +130,6 @@ func (leftJoin *LeftJoin) Free(proc *process.Process, pipelineFailed bool, err e
ctr.cleanExprExecutor()
ctr.cleanBatch(proc)

leftJoin.FreeProjection(proc)
}

func (leftJoin *LeftJoin) ExecProjection(proc *process.Process, input *batch.Batch) (*batch.Batch, error) {
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/colexec/loopjoin/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ func (loopJoin *LoopJoin) Prepare(proc *process.Process) error {
return err
}
}

if loopJoin.ProjectList != nil && loopJoin.ProjectExecutors == nil {
err = loopJoin.PrepareProjection(proc)
}
return err
}

Expand Down
18 changes: 1 addition & 17 deletions pkg/sql/colexec/loopjoin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ type LoopJoin struct {
MarkPos int

vm.OperatorBase
colexec.Projection
}

func (loopJoin *LoopJoin) GetOperatorBase() *vm.OperatorBase {
Expand Down Expand Up @@ -107,13 +106,6 @@ func (loopJoin *LoopJoin) Reset(proc *process.Process, pipelineFailed bool, err
ctr.cleanHashMap()
ctr.state = Build
ctr.inbat = nil

if loopJoin.ProjectList != nil {
if loopJoin.OpAnalyzer != nil {
loopJoin.OpAnalyzer.Alloc(loopJoin.ProjectAllocSize)
}
loopJoin.ResetProjection(proc)
}
}

func (loopJoin *LoopJoin) Free(proc *process.Process, pipelineFailed bool, err error) {
Expand All @@ -122,18 +114,10 @@ func (loopJoin *LoopJoin) Free(proc *process.Process, pipelineFailed bool, err e
ctr.cleanBatch(proc.Mp())
ctr.cleanExprExecutor()

if loopJoin.ProjectList != nil {
loopJoin.FreeProjection(proc)
}
}

func (loopJoin *LoopJoin) ExecProjection(proc *process.Process, input *batch.Batch) (*batch.Batch, error) {
batch := input
var err error
if loopJoin.ProjectList != nil {
batch, err = loopJoin.EvalProjection(input, proc)
}
return batch, err
return input, nil
}

func (ctr *container) cleanBatch(mp *mpool.MPool) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/colexec/product/product.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ func (product *Product) Prepare(proc *process.Process) error {
product.OpAnalyzer.Reset()
}

if product.ProjectList != nil {
return product.PrepareProjection(proc)
}
return nil
}

Expand Down
Loading

0 comments on commit 76ff503

Please sign in to comment.