Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor checkpoint and gc related #20997

Merged
merged 31 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 34 additions & 32 deletions pkg/backup/tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"crypto/sha256"
"encoding/json"
"fmt"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc/v3"
"io"
"os"
"path"
Expand All @@ -30,6 +29,9 @@ import (
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc/v3"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand All @@ -39,7 +41,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function/ctl"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
Expand Down Expand Up @@ -305,7 +306,7 @@ func execBackup(
if err != nil {
return err
}
key, err := blockio.EncodeLocationFromString(metaLoc)
key, err := objectio.StringToLocation(metaLoc)
if err != nil {
return err
}
Expand Down Expand Up @@ -372,7 +373,7 @@ func execBackup(
taeFileList = append(taeFileList, sizeList...)
now = time.Now()
if trimInfo != "" {
cnLocation, err := blockio.EncodeLocationFromString(cnLoc)
cnLocation, err := objectio.StringToLocation(cnLoc)
if err != nil {
return err
}
Expand Down Expand Up @@ -426,32 +427,34 @@ func execBackup(

// CopyCheckpointDir copy checkpoint dir from srcFs to dstFs
// return taeFile list
// copy: if copy is true,it means not to check the suffix name and copy all files.
// doCopy: if doCopy is true,it means not to check the suffix name and doCopy all files.
func copyFileAndGetMetaFiles(
ctx context.Context,
srcFs, dstFs fileservice.FileService,
dir string,
backup types.TS,
decodeFunc func(string) (types.TS, types.TS, string),
copy bool,
) ([]*taeFile, []*checkpoint.MetaFile, []fileservice.DirEntry, error) {
decoder func(string) ioutil.TSRangeFile,
doCopy bool,
) ([]*taeFile, []ioutil.TSRangeFile, []fileservice.DirEntry, error) {
files, err := fileservice.SortedList(srcFs.List(ctx, dir))
if err != nil {
return nil, nil, nil, err
}
taeFileList := make([]*taeFile, 0, len(files))
metaFiles := make([]*checkpoint.MetaFile, 0)
metaFiles := make([]ioutil.TSRangeFile, 0)
var checksum []byte
for i, file := range files {
if file.IsDir {
panic("not support dir")
}
start, end, ext := decodeFunc(file.Name)
if !backup.IsEmpty() && start.GE(&backup) {
meta := decoder(file.Name)
meta.SetIdx(i)

if !backup.IsEmpty() && meta.GetStart().GE(&backup) {
logutil.Infof("[Backup] skip file %v", file.Name)
continue
}
if copy || ext == blockio.AcctExt || ext == blockio.SnapshotExt {
if doCopy || meta.IsAcctExt() || meta.IsSnapshotExt() {
checksum, err = CopyFileWithRetry(ctx, srcFs, dstFs, file.Name, dir)
if err != nil {
return nil, nil, nil, err
Expand All @@ -465,19 +468,16 @@ func copyFileAndGetMetaFiles(
})
}

if copy || ext == blockio.CheckpointExt || ext == blockio.GCFullExt {
metaFile := checkpoint.NewMetaFile(i, start, end, file.Name)
metaFiles = append(metaFiles, metaFile)
if doCopy || meta.IsCKPFile() || meta.IsFullGCExt() {
metaFiles = append(metaFiles, meta)
}
}

if len(metaFiles) == 0 {
return taeFileList, metaFiles, files, nil
}
sort.Slice(metaFiles, func(i, j int) bool {
end1 := metaFiles[i].GetEnd()
end2 := metaFiles[j].GetEnd()
return end1.LT(&end2)
return metaFiles[i].GetEnd().LT(metaFiles[j].GetEnd())
})

return taeFileList, metaFiles, files, nil
Expand All @@ -492,17 +492,17 @@ func CopyGCDir(
var checksum []byte

taeFileList, metaFiles, files, err := copyFileAndGetMetaFiles(
ctx, srcFs, dstFs, dir, backup, blockio.DecodeGCMetadataFileName, false)
ctx, srcFs, dstFs, dir, backup, ioutil.DecodeGCMetadataName, false,
)
if err != nil {
return nil, err
}

copyFiles := make([]*checkpoint.MetaFile, 0)
copyFiles := make([]ioutil.TSRangeFile, 0)

for _, metaFile := range metaFiles {
name := metaFile.GetName()
window := gc.NewGCWindow(common.DebugAllocator, srcFs)
err = window.ReadTable(ctx, gc.GCMetaDir+name, srcFs)
err = window.ReadTable(ctx, metaFile.GetGCFullName(), srcFs)
if err != nil {
return nil, err
}
Expand All @@ -519,7 +519,7 @@ func CopyGCDir(
}
filesList = append(filesList, &taeFile{
path: object.ObjectName().String(),
size: files[metaFile.GetIndex()].Size,
size: files[metaFile.GetIdx()].Size,
checksum: checksum,
needCopy: true,
ts: backup,
Expand All @@ -534,13 +534,12 @@ func CopyGCDir(
for i, metaFile := range copyFiles {
name := metaFile.GetName()
if i == len(metaFiles)-1 {
end := metaFile.GetEnd()
if !min.IsEmpty() && end.LT(&min) {
if !min.IsEmpty() && metaFile.GetEnd().LT(&min) {
// It means that the gc consumption is too slow, and the gc water level needs to be raised.
// Otherwise, the gc will not work after the cluster is restored because it cannot find the checkpoint.
// The gc water level is determined by the name of the meta,
// so the name of the last gc meta needs to be modified.
name = blockio.UpdateGCMetadataFileName(name, end, min)
name = ioutil.InheritGCMetadataName(name, metaFile.GetEnd(), &min)
}
}
checksum, err = CopyFileWithRetry(ctx, srcFs, dstFs, metaFile.GetName(), dir, name)
Expand All @@ -549,7 +548,7 @@ func CopyGCDir(
}
taeFileList = append(taeFileList, &taeFile{
path: dir + string(os.PathSeparator) + name,
size: files[metaFile.GetIndex()].Size,
size: files[metaFile.GetIdx()].Size,
checksum: checksum,
needCopy: true,
ts: backup,
Expand All @@ -563,11 +562,14 @@ func CopyCheckpointDir(
srcFs, dstFs fileservice.FileService,
dir string, backup types.TS,
) ([]*taeFile, types.TS, error) {
decodeFunc := func(name string) (types.TS, types.TS, string) {
start, end, _ := blockio.DecodeCheckpointMetadataFileName(name)
return start, end, ""
decoder := func(name string) ioutil.TSRangeFile {
meta := ioutil.DecodeCKPMetaName(name)
meta.SetExt("")
return meta
}
taeFileList, metaFiles, _, err := copyFileAndGetMetaFiles(ctx, srcFs, dstFs, dir, backup, decodeFunc, true)
taeFileList, metaFiles, _, err := copyFileAndGetMetaFiles(
ctx, srcFs, dstFs, dir, backup, decoder, true,
)
if err != nil {
return nil, types.TS{}, err
}
Expand All @@ -577,7 +579,7 @@ func CopyCheckpointDir(
for i := len(metaFiles) - 1; i >= 0; i-- {
ckpStart := metaFiles[i].GetStart()
if ckpStart.IsEmpty() {
minTs = metaFiles[i].GetEnd()
minTs = *metaFiles[i].GetEnd()
break
}
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/objectio/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,16 @@ func (bm BlockObject) GenerateBlockInfo(objName ObjectName, sorted bool) BlockIn

return blkInfo
}

func SumSizeOfBlocks(blocks []BlockObject) (size, osize uint32) {
for _, block := range blocks {
meta := block.GetMeta()
count := meta.BlockHeader().MetaColumnCount()
for i := 0; i < int(count); i++ {
col := block.MustGetColumn(uint16(i))
size += col.Location().Length()
osize += col.Location().OriginSize()
}
}
return
}
82 changes: 82 additions & 0 deletions pkg/objectio/ckputil/funcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ckputil

import (
"context"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/fileservice"
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
)

// GetMaxTSOfCompactCKP returns the max ts of the compact checkpoint
func GetMaxTSOfCompactCKP(
ctx context.Context,
fs fileservice.FileService,
) (ts types.TS, err error) {
var files []ioutil.TSRangeFile
if files, err = ioutil.ListTSRangeFiles(
ctx, ioutil.GetCheckpointDir(), fs,
); err != nil {
return
}
for _, file := range files {
if file.IsCompactExt() {
if ts.LT(file.GetEnd()) {
ts = *file.GetEnd()
}
}
}
return
}

// ListCKPMetaNames returns the names of all checkpoint meta files
func ListCKPMetaNames(
ctx context.Context,
fs fileservice.FileService,
) (files []string, err error) {
var tsFiles []ioutil.TSRangeFile
if tsFiles, err = ioutil.ListTSRangeFiles(
ctx, ioutil.GetCheckpointDir(), fs,
); err != nil {
return
}
for _, tsFile := range tsFiles {
if tsFile.IsMetadataFile() {
files = append(files, tsFile.GetName())
}
}
return
}

// ListCKPMetaFiles returns all checkpoint meta files
func ListCKPMetaFiles(
ctx context.Context,
fs fileservice.FileService,
) (files []ioutil.TSRangeFile, err error) {
var tsFiles []ioutil.TSRangeFile
if tsFiles, err = ioutil.ListTSRangeFiles(
ctx, ioutil.GetCheckpointDir(), fs,
); err != nil {
return
}
for _, tsFile := range tsFiles {
if tsFile.IsMetadataFile() {
files = append(files, tsFile)
}
}
return
}
15 changes: 15 additions & 0 deletions pkg/objectio/ckputil/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2021 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ckputil
Loading
Loading