Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cross storage backup/restore #398

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Variables
BINARY_NAME=milvus-backup
VERSION=$(shell git describe --tags --always --dirty)
VERSION=$(shell git describe --tags --always)
COMMIT=$(shell git rev-parse --short HEAD)
DATE=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ')

Expand Down
17 changes: 11 additions & 6 deletions configs/backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,24 @@ milvus:

# Related configuration of minio, which is responsible for data persistence for Milvus.
minio:
# cloudProvider: "minio" # deprecated use storageType instead
# Milvus storage configs, make them the same with milvus config
storageType: "minio" # support storage type: local, minio, s3, aws, gcp, ali(aliyun), azure, tc(tencent)

address: localhost # Address of MinIO/S3
port: 9000 # Port of MinIO/S3
accessKeyID: minioadmin # accessKeyID of MinIO/S3
secretAccessKey: minioadmin # MinIO/S3 encryption string
useSSL: false # Access to MinIO/S3 with SSL
useIAM: false
iamEndpoint: ""

bucketName: "a-bucket" # Milvus Bucket name in MinIO/S3, make it the same as your milvus instance
rootPath: "files" # Milvus storage root path in MinIO/S3, make it the same as your milvus instance

# only for azure
# Backup storage configs, the storage you want to put the backup data
backupStorageType: "minio" # support storage type: local, minio, s3, aws, gcp, ali(aliyun), azure, tc(tencent)
backupAddress: localhost # Address of MinIO/S3
backupPort: 9000 # Port of MinIO/S3
backupAccessKeyID: minioadmin # accessKeyID of MinIO/S3
backupSecretAccessKey: minioadmin # MinIO/S3 encryption string

backupBucketName: "a-bucket" # Bucket name to store backup data. Backup data will store to backupBucketName/backupRootPath
backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath

Expand All @@ -60,4 +60,9 @@ backup:
gcPause:
enable: true
seconds: 7200
address: http://localhost:9091
address: http://localhost:9091

# If you need to backup or restore data between two different storage systems,
# direct client-side copying is not supported.
# Set this option to true to enable data transfer through Milvus Backup.
copyByServer: "false"
139 changes: 118 additions & 21 deletions core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ type BackupContext struct {
milvusClient *MilvusClient

// data storage client
storageClient *storage.ChunkManager
milvusStorageClient storage.ChunkManager
backupStorageClient storage.ChunkManager
backupCopier *storage.Copier
restoreCopier *storage.Copier

milvusBucketName string
backupBucketName string
milvusRootPath string
Expand Down Expand Up @@ -81,13 +85,28 @@ func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (go
return c, nil
}

func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (storage.ChunkManager, error) {
// Deprecated
func createStorageClient(ctx context.Context, params paramtable.BackupParams) (storage.ChunkManager, error) {
minioEndPoint := params.MinioCfg.Address + ":" + params.MinioCfg.Port
log.Debug("Start minio client",
zap.String("address", minioEndPoint),
zap.String("bucket", params.MinioCfg.BucketName),
zap.String("backupBucket", params.MinioCfg.BackupBucketName))
minioClient, err := storage.NewChunkManager(ctx, params)

storageConfig := &storage.StorageConfig{
StorageType: params.MinioCfg.StorageType,
Address: minioEndPoint,
BucketName: params.MinioCfg.BucketName,
AccessKeyID: params.MinioCfg.AccessKeyID,
SecretAccessKeyID: params.MinioCfg.SecretAccessKey,
UseSSL: params.MinioCfg.UseSSL,
UseIAM: params.MinioCfg.UseIAM,
IAMEndpoint: params.MinioCfg.IAMEndpoint,
RootPath: params.MinioCfg.RootPath,
CreateBucket: true,
}

minioClient, err := storage.NewChunkManager(ctx, params, storageConfig)
return minioClient, err
}

Expand Down Expand Up @@ -134,16 +153,94 @@ func (b *BackupContext) getMilvusClient() *MilvusClient {
return b.milvusClient
}

func (b *BackupContext) getStorageClient() storage.ChunkManager {
if b.storageClient == nil {
storageClient, err := CreateStorageClient(b.ctx, b.params)
func (b *BackupContext) getMilvusStorageClient() storage.ChunkManager {
if b.milvusStorageClient == nil {
minioEndPoint := b.params.MinioCfg.Address + ":" + b.params.MinioCfg.Port
log.Debug("create milvus storage client",
zap.String("address", minioEndPoint),
zap.String("bucket", b.params.MinioCfg.BucketName),
zap.String("backupBucket", b.params.MinioCfg.BackupBucketName))

storageConfig := &storage.StorageConfig{
StorageType: b.params.MinioCfg.StorageType,
Address: minioEndPoint,
BucketName: b.params.MinioCfg.BucketName,
AccessKeyID: b.params.MinioCfg.AccessKeyID,
SecretAccessKeyID: b.params.MinioCfg.SecretAccessKey,
UseSSL: b.params.MinioCfg.UseSSL,
UseIAM: b.params.MinioCfg.UseIAM,
IAMEndpoint: b.params.MinioCfg.IAMEndpoint,
RootPath: b.params.MinioCfg.RootPath,
CreateBucket: true,
}

storageClient, err := storage.NewChunkManager(b.ctx, b.params, storageConfig)
if err != nil {
log.Error("failed to initial storage client", zap.Error(err))
panic(err)
}
b.storageClient = &storageClient
b.milvusStorageClient = storageClient
}
return *b.storageClient
return b.milvusStorageClient
}

func (b *BackupContext) getBackupStorageClient() storage.ChunkManager {
if b.backupStorageClient == nil {
minioEndPoint := b.params.MinioCfg.BackupAddress + ":" + b.params.MinioCfg.BackupPort
log.Debug("create backup storage client",
zap.String("address", minioEndPoint),
zap.String("bucket", b.params.MinioCfg.BucketName),
zap.String("backupBucket", b.params.MinioCfg.BackupBucketName))

storageConfig := &storage.StorageConfig{
StorageType: b.params.MinioCfg.BackupStorageType,
Address: minioEndPoint,
BucketName: b.params.MinioCfg.BackupBucketName,
AccessKeyID: b.params.MinioCfg.BackupAccessKeyID,
SecretAccessKeyID: b.params.MinioCfg.BackupSecretAccessKey,
UseSSL: b.params.MinioCfg.BackupUseSSL,
UseIAM: b.params.MinioCfg.BackupUseIAM,
IAMEndpoint: b.params.MinioCfg.BackupIAMEndpoint,
RootPath: b.params.MinioCfg.BackupRootPath,
CreateBucket: true,
}

storageClient, err := storage.NewChunkManager(b.ctx, b.params, storageConfig)
if err != nil {
log.Error("failed to initial storage client", zap.Error(err))
panic(err)
}
b.backupStorageClient = storageClient
}
return b.backupStorageClient
}

func (b *BackupContext) getBackupCopier() *storage.Copier {
if b.backupCopier == nil {
b.backupCopier = storage.NewCopier(
b.getMilvusStorageClient(),
b.getBackupStorageClient(),
storage.CopyOption{
WorkerNum: b.params.BackupCfg.BackupCopyDataParallelism,
RPS: RPS,
CopyByServer: b.params.BackupCfg.CopyByServer,
})
}
return b.backupCopier
}

func (b *BackupContext) getRestoreCopier() *storage.Copier {
if b.restoreCopier == nil {
b.restoreCopier = storage.NewCopier(
b.getBackupStorageClient(),
b.getMilvusStorageClient(),
storage.CopyOption{
WorkerNum: b.params.BackupCfg.BackupCopyDataParallelism,
RPS: RPS,
CopyByServer: b.params.BackupCfg.CopyByServer,
})
}
return b.restoreCopier
}

func (b *BackupContext) getBackupCollectionWorkerPool() *common.WorkerPool {
Expand Down Expand Up @@ -307,7 +404,7 @@ func (b *BackupContext) ListBackups(ctx context.Context, request *backuppb.ListB
}

// 1, trigger inner sync to get the newest backup list in the milvus cluster
backupPaths, _, err := b.getStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false)
backupPaths, _, err := b.getBackupStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false)
if err != nil {
log.Error("Fail to list backup directory", zap.Error(err))
resp.Code = backuppb.ResponseCode_Fail
Expand Down Expand Up @@ -393,7 +490,7 @@ func (b *BackupContext) DeleteBackup(ctx context.Context, request *backuppb.Dele
BackupName: request.GetBackupName(),
})
// always trigger a remove to make sure it is deleted
err := b.getStorageClient().RemoveWithPrefix(ctx, b.backupBucketName, BackupDirPath(b.backupRootPath, request.GetBackupName()))
err := b.getBackupStorageClient().RemoveWithPrefix(ctx, b.backupBucketName, BackupDirPath(b.backupRootPath, request.GetBackupName()))

if getResp.GetCode() == backuppb.ResponseCode_Request_Object_Not_Found {
resp.Code = backuppb.ResponseCode_Request_Object_Not_Found
Expand Down Expand Up @@ -434,7 +531,7 @@ func (b *BackupContext) readBackup(ctx context.Context, bucketName string, backu
partitionMetaPath := backupMetaDirPath + SEPERATOR + PARTITION_META_FILE
segmentMetaPath := backupMetaDirPath + SEPERATOR + SEGMENT_META_FILE

exist, err := b.getStorageClient().Exist(ctx, bucketName, backupMetaPath)
exist, err := b.getBackupStorageClient().Exist(ctx, bucketName, backupMetaPath)
if err != nil {
log.Error("check backup meta file failed", zap.String("path", backupMetaPath), zap.Error(err))
return nil, err
Expand All @@ -444,22 +541,22 @@ func (b *BackupContext) readBackup(ctx context.Context, bucketName string, backu
return nil, err
}

backupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, backupMetaPath)
backupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, backupMetaPath)
if err != nil {
log.Error("Read backup meta failed", zap.String("path", backupMetaPath), zap.Error(err))
return nil, err
}
collectionBackupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, collectionMetaPath)
collectionBackupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, collectionMetaPath)
if err != nil {
log.Error("Read collection meta failed", zap.String("path", collectionMetaPath), zap.Error(err))
return nil, err
}
partitionBackupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, partitionMetaPath)
partitionBackupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, partitionMetaPath)
if err != nil {
log.Error("Read partition meta failed", zap.String("path", partitionMetaPath), zap.Error(err))
return nil, err
}
segmentBackupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, segmentMetaPath)
segmentBackupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, segmentMetaPath)
if err != nil {
log.Error("Read segment meta failed", zap.String("path", segmentMetaPath), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -542,7 +639,7 @@ func (b *BackupContext) Check(ctx context.Context) string {
"backup-rootpath: %s\n",
version, b.milvusBucketName, b.milvusRootPath, b.backupBucketName, b.backupRootPath)

paths, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR, false)
paths, _, err := b.getMilvusStorageClient().ListWithPrefix(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR, false)
if err != nil {
return "Failed to connect to storage milvus path\n" + info + err.Error()
}
Expand All @@ -551,27 +648,27 @@ func (b *BackupContext) Check(ctx context.Context) string {
return "Milvus storage is empty. Please verify whether your cluster is really empty. If not, the configs(minio address, port, bucket, rootPath) may be wrong\n" + info
}

paths, _, err = b.getStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false)
paths, _, err = b.getBackupStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false)
if err != nil {
return "Failed to connect to storage backup path " + info + err.Error()
}

CHECK_PATH := "milvus_backup_check_" + time.Now().String()

err = b.getStorageClient().Write(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, []byte{1})
err = b.getMilvusStorageClient().Write(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, []byte{1})
if err != nil {
return "Failed to connect to storage milvus path\n" + info + err.Error()
}
defer func() {
b.getStorageClient().Remove(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH)
b.getMilvusStorageClient().Remove(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH)
}()

err = b.getStorageClient().Copy(ctx, b.milvusBucketName, b.backupBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, b.backupRootPath+SEPERATOR+CHECK_PATH)
err = b.getMilvusStorageClient().Copy(ctx, b.milvusBucketName, b.backupBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, b.backupRootPath+SEPERATOR+CHECK_PATH)
if err != nil {
return "Failed to copy file from milvus storage to backup storage\n" + info + err.Error()
}
defer func() {
b.getStorageClient().Remove(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR+CHECK_PATH)
b.getBackupStorageClient().Remove(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR+CHECK_PATH)
}()

return "Succeed to connect to milvus and storage.\n" + info
Expand Down
2 changes: 1 addition & 1 deletion core/backup_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestGetBackupFaultBackup(t *testing.T) {
resp := backupContext.CreateBackup(context, req)
assert.Equal(t, backuppb.ResponseCode_Success, resp.GetCode())

backupContext.getStorageClient().RemoveWithPrefix(context, params.MinioCfg.BackupBucketName, BackupMetaPath(params.MinioCfg.BackupRootPath, resp.GetData().GetName()))
backupContext.getMilvusStorageClient().RemoveWithPrefix(context, params.MinioCfg.BackupBucketName, BackupMetaPath(params.MinioCfg.BackupRootPath, resp.GetData().GetName()))

backup := backupContext.GetBackup(context, &backuppb.GetBackupRequest{
BackupName: randBackupName,
Expand Down
Loading
Loading