Skip to content

Commit

Permalink
refactor checkpoint and gc related (#20997)
Browse files Browse the repository at this point in the history
refactor checkpoint and gc related for the future tool and service migration

Approved by: @LeftHandCold, @triump2020
  • Loading branch information
XuPeng-SH authored Dec 28, 2024
1 parent 94569b4 commit 5c1b750
Show file tree
Hide file tree
Showing 30 changed files with 907 additions and 581 deletions.
66 changes: 34 additions & 32 deletions pkg/backup/tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"crypto/sha256"
"encoding/json"
"fmt"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc/v3"
"io"
"os"
"path"
Expand All @@ -30,6 +29,9 @@ import (
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc/v3"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand All @@ -39,7 +41,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function/ctl"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
Expand Down Expand Up @@ -305,7 +306,7 @@ func execBackup(
if err != nil {
return err
}
key, err := blockio.EncodeLocationFromString(metaLoc)
key, err := objectio.StringToLocation(metaLoc)
if err != nil {
return err
}
Expand Down Expand Up @@ -372,7 +373,7 @@ func execBackup(
taeFileList = append(taeFileList, sizeList...)
now = time.Now()
if trimInfo != "" {
cnLocation, err := blockio.EncodeLocationFromString(cnLoc)
cnLocation, err := objectio.StringToLocation(cnLoc)
if err != nil {
return err
}
Expand Down Expand Up @@ -426,32 +427,34 @@ func execBackup(

// CopyCheckpointDir copy checkpoint dir from srcFs to dstFs
// return taeFile list
// copy: if copy is true,it means not to check the suffix name and copy all files.
// doCopy: if doCopy is true,it means not to check the suffix name and doCopy all files.
func copyFileAndGetMetaFiles(
ctx context.Context,
srcFs, dstFs fileservice.FileService,
dir string,
backup types.TS,
decodeFunc func(string) (types.TS, types.TS, string),
copy bool,
) ([]*taeFile, []*checkpoint.MetaFile, []fileservice.DirEntry, error) {
decoder func(string) ioutil.TSRangeFile,
doCopy bool,
) ([]*taeFile, []ioutil.TSRangeFile, []fileservice.DirEntry, error) {
files, err := fileservice.SortedList(srcFs.List(ctx, dir))
if err != nil {
return nil, nil, nil, err
}
taeFileList := make([]*taeFile, 0, len(files))
metaFiles := make([]*checkpoint.MetaFile, 0)
metaFiles := make([]ioutil.TSRangeFile, 0)
var checksum []byte
for i, file := range files {
if file.IsDir {
panic("not support dir")
}
start, end, ext := decodeFunc(file.Name)
if !backup.IsEmpty() && start.GE(&backup) {
meta := decoder(file.Name)
meta.SetIdx(i)

if !backup.IsEmpty() && meta.GetStart().GE(&backup) {
logutil.Infof("[Backup] skip file %v", file.Name)
continue
}
if copy || ext == blockio.AcctExt || ext == blockio.SnapshotExt {
if doCopy || meta.IsAcctExt() || meta.IsSnapshotExt() {
checksum, err = CopyFileWithRetry(ctx, srcFs, dstFs, file.Name, dir)
if err != nil {
return nil, nil, nil, err
Expand All @@ -465,19 +468,16 @@ func copyFileAndGetMetaFiles(
})
}

if copy || ext == blockio.CheckpointExt || ext == blockio.GCFullExt {
metaFile := checkpoint.NewMetaFile(i, start, end, file.Name)
metaFiles = append(metaFiles, metaFile)
if doCopy || meta.IsCKPFile() || meta.IsFullGCExt() {
metaFiles = append(metaFiles, meta)
}
}

if len(metaFiles) == 0 {
return taeFileList, metaFiles, files, nil
}
sort.Slice(metaFiles, func(i, j int) bool {
end1 := metaFiles[i].GetEnd()
end2 := metaFiles[j].GetEnd()
return end1.LT(&end2)
return metaFiles[i].GetEnd().LT(metaFiles[j].GetEnd())
})

return taeFileList, metaFiles, files, nil
Expand All @@ -492,17 +492,17 @@ func CopyGCDir(
var checksum []byte

taeFileList, metaFiles, files, err := copyFileAndGetMetaFiles(
ctx, srcFs, dstFs, dir, backup, blockio.DecodeGCMetadataFileName, false)
ctx, srcFs, dstFs, dir, backup, ioutil.DecodeGCMetadataName, false,
)
if err != nil {
return nil, err
}

copyFiles := make([]*checkpoint.MetaFile, 0)
copyFiles := make([]ioutil.TSRangeFile, 0)

for _, metaFile := range metaFiles {
name := metaFile.GetName()
window := gc.NewGCWindow(common.DebugAllocator, srcFs)
err = window.ReadTable(ctx, gc.GCMetaDir+name, srcFs)
err = window.ReadTable(ctx, metaFile.GetGCFullName(), srcFs)
if err != nil {
return nil, err
}
Expand All @@ -519,7 +519,7 @@ func CopyGCDir(
}
filesList = append(filesList, &taeFile{
path: object.ObjectName().String(),
size: files[metaFile.GetIndex()].Size,
size: files[metaFile.GetIdx()].Size,
checksum: checksum,
needCopy: true,
ts: backup,
Expand All @@ -534,13 +534,12 @@ func CopyGCDir(
for i, metaFile := range copyFiles {
name := metaFile.GetName()
if i == len(metaFiles)-1 {
end := metaFile.GetEnd()
if !min.IsEmpty() && end.LT(&min) {
if !min.IsEmpty() && metaFile.GetEnd().LT(&min) {
// It means that the gc consumption is too slow, and the gc water level needs to be raised.
// Otherwise, the gc will not work after the cluster is restored because it cannot find the checkpoint.
// The gc water level is determined by the name of the meta,
// so the name of the last gc meta needs to be modified.
name = blockio.UpdateGCMetadataFileName(name, end, min)
name = ioutil.InheritGCMetadataName(name, metaFile.GetEnd(), &min)
}
}
checksum, err = CopyFileWithRetry(ctx, srcFs, dstFs, metaFile.GetName(), dir, name)
Expand All @@ -549,7 +548,7 @@ func CopyGCDir(
}
taeFileList = append(taeFileList, &taeFile{
path: dir + string(os.PathSeparator) + name,
size: files[metaFile.GetIndex()].Size,
size: files[metaFile.GetIdx()].Size,
checksum: checksum,
needCopy: true,
ts: backup,
Expand All @@ -563,11 +562,14 @@ func CopyCheckpointDir(
srcFs, dstFs fileservice.FileService,
dir string, backup types.TS,
) ([]*taeFile, types.TS, error) {
decodeFunc := func(name string) (types.TS, types.TS, string) {
start, end, _ := blockio.DecodeCheckpointMetadataFileName(name)
return start, end, ""
decoder := func(name string) ioutil.TSRangeFile {
meta := ioutil.DecodeCKPMetaName(name)
meta.SetExt("")
return meta
}
taeFileList, metaFiles, _, err := copyFileAndGetMetaFiles(ctx, srcFs, dstFs, dir, backup, decodeFunc, true)
taeFileList, metaFiles, _, err := copyFileAndGetMetaFiles(
ctx, srcFs, dstFs, dir, backup, decoder, true,
)
if err != nil {
return nil, types.TS{}, err
}
Expand All @@ -577,7 +579,7 @@ func CopyCheckpointDir(
for i := len(metaFiles) - 1; i >= 0; i-- {
ckpStart := metaFiles[i].GetStart()
if ckpStart.IsEmpty() {
minTs = metaFiles[i].GetEnd()
minTs = *metaFiles[i].GetEnd()
break
}
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/objectio/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,16 @@ func (bm BlockObject) GenerateBlockInfo(objName ObjectName, sorted bool) BlockIn

return blkInfo
}

func SumSizeOfBlocks(blocks []BlockObject) (size, osize uint32) {
for _, block := range blocks {
meta := block.GetMeta()
count := meta.BlockHeader().MetaColumnCount()
for i := 0; i < int(count); i++ {
col := block.MustGetColumn(uint16(i))
size += col.Location().Length()
osize += col.Location().OriginSize()
}
}
return
}
82 changes: 82 additions & 0 deletions pkg/objectio/ckputil/funcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ckputil

import (
"context"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
)

// GetMaxTSOfCompactCKP returns the max ts of the compact checkpoint
func GetMaxTSOfCompactCKP(
ctx context.Context,
fs fileservice.FileService,
) (ts types.TS, err error) {
var files []ioutil.TSRangeFile
if files, err = ioutil.ListTSRangeFiles(
ctx, ioutil.GetCheckpointDir(), fs,
); err != nil {
return
}
for _, file := range files {
if file.IsCompactExt() {
if ts.LT(file.GetEnd()) {
ts = *file.GetEnd()
}
}
}
return
}

// ListCKPMetaNames returns the names of all checkpoint meta files
func ListCKPMetaNames(
ctx context.Context,
fs fileservice.FileService,
) (files []string, err error) {
var tsFiles []ioutil.TSRangeFile
if tsFiles, err = ioutil.ListTSRangeFiles(
ctx, ioutil.GetCheckpointDir(), fs,
); err != nil {
return
}
for _, tsFile := range tsFiles {
if tsFile.IsMetadataFile() {
files = append(files, tsFile.GetName())
}
}
return
}

// ListCKPMetaFiles returns all checkpoint meta files
func ListCKPMetaFiles(
ctx context.Context,
fs fileservice.FileService,
) (files []ioutil.TSRangeFile, err error) {
var tsFiles []ioutil.TSRangeFile
if tsFiles, err = ioutil.ListTSRangeFiles(
ctx, ioutil.GetCheckpointDir(), fs,
); err != nil {
return
}
for _, tsFile := range tsFiles {
if tsFile.IsMetadataFile() {
files = append(files, tsFile)
}
}
return
}
15 changes: 15 additions & 0 deletions pkg/objectio/ckputil/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ckputil
Loading

0 comments on commit 5c1b750

Please sign in to comment.