diff --git a/Makefile b/Makefile index ebdbb2b5..70872595 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # Variables BINARY_NAME=milvus-backup -VERSION=$(shell git describe --tags --always --dirty) +VERSION=$(shell git describe --tags --always) COMMIT=$(shell git rev-parse --short HEAD) DATE=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ') diff --git a/configs/backup.yaml b/configs/backup.yaml index c6703811..286ab555 100644 --- a/configs/backup.yaml +++ b/configs/backup.yaml @@ -21,9 +21,8 @@ milvus: # Related configuration of minio, which is responsible for data persistence for Milvus. minio: - # cloudProvider: "minio" # deprecated use storageType instead + # Milvus storage configs, make them the same with milvus config storageType: "minio" # support storage type: local, minio, s3, aws, gcp, ali(aliyun), azure, tc(tencent) - address: localhost # Address of MinIO/S3 port: 9000 # Port of MinIO/S3 accessKeyID: minioadmin # accessKeyID of MinIO/S3 @@ -31,14 +30,15 @@ minio: useSSL: false # Access to MinIO/S3 with SSL useIAM: false iamEndpoint: "" - bucketName: "a-bucket" # Milvus Bucket name in MinIO/S3, make it the same as your milvus instance rootPath: "files" # Milvus storage root path in MinIO/S3, make it the same as your milvus instance - # only for azure + # Backup storage configs, the storage you want to put the backup data + backupStorageType: "minio" # support storage type: local, minio, s3, aws, gcp, ali(aliyun), azure, tc(tencent) + backupAddress: localhost # Address of MinIO/S3 + backupPort: 9000 # Port of MinIO/S3 backupAccessKeyID: minioadmin # accessKeyID of MinIO/S3 backupSecretAccessKey: minioadmin # MinIO/S3 encryption string - backupBucketName: "a-bucket" # Bucket name to store backup data. Backup data will store to backupBucketName/backupRootPath backupRootPath: "backup" # Rootpath to store backup data. Backup data will store to backupBucketName/backupRootPath @@ -60,4 +60,9 @@ backup: gcPause: enable: true seconds: 7200 - address: http://localhost:9091 \ No newline at end of file + address: http://localhost:9091 + + # If you need to backup or restore data between two different storage systems, + # direct client-side copying is not supported. + # Set this option to true to enable data transfer through Milvus Backup. + copyByServer: "false" diff --git a/core/backup_context.go b/core/backup_context.go index 93abe508..b7355dde 100644 --- a/core/backup_context.go +++ b/core/backup_context.go @@ -45,7 +45,11 @@ type BackupContext struct { milvusClient *MilvusClient // data storage client - storageClient *storage.ChunkManager + milvusStorageClient storage.ChunkManager + backupStorageClient storage.ChunkManager + backupCopier *storage.Copier + restoreCopier *storage.Copier + milvusBucketName string backupBucketName string milvusRootPath string @@ -82,13 +86,28 @@ func CreateMilvusClient(ctx context.Context, params paramtable.BackupParams) (go return c, nil } -func CreateStorageClient(ctx context.Context, params paramtable.BackupParams) (storage.ChunkManager, error) { +// Deprecated +func createStorageClient(ctx context.Context, params paramtable.BackupParams) (storage.ChunkManager, error) { minioEndPoint := params.MinioCfg.Address + ":" + params.MinioCfg.Port log.Debug("Start minio client", zap.String("address", minioEndPoint), zap.String("bucket", params.MinioCfg.BucketName), zap.String("backupBucket", params.MinioCfg.BackupBucketName)) - minioClient, err := storage.NewChunkManager(ctx, params) + + storageConfig := &storage.StorageConfig{ + StorageType: params.MinioCfg.StorageType, + Address: minioEndPoint, + BucketName: params.MinioCfg.BucketName, + AccessKeyID: params.MinioCfg.AccessKeyID, + SecretAccessKeyID: params.MinioCfg.SecretAccessKey, + UseSSL: params.MinioCfg.UseSSL, + UseIAM: params.MinioCfg.UseIAM, + IAMEndpoint: params.MinioCfg.IAMEndpoint, + RootPath: params.MinioCfg.RootPath, + CreateBucket: true, + } + + minioClient, err := storage.NewChunkManager(ctx, params, storageConfig) return minioClient, err } @@ -135,16 +154,94 @@ func (b *BackupContext) getMilvusClient() *MilvusClient { return b.milvusClient } -func (b *BackupContext) getStorageClient() storage.ChunkManager { - if b.storageClient == nil { - storageClient, err := CreateStorageClient(b.ctx, b.params) +func (b *BackupContext) getMilvusStorageClient() storage.ChunkManager { + if b.milvusStorageClient == nil { + minioEndPoint := b.params.MinioCfg.Address + ":" + b.params.MinioCfg.Port + log.Debug("create milvus storage client", + zap.String("address", minioEndPoint), + zap.String("bucket", b.params.MinioCfg.BucketName), + zap.String("backupBucket", b.params.MinioCfg.BackupBucketName)) + + storageConfig := &storage.StorageConfig{ + StorageType: b.params.MinioCfg.StorageType, + Address: minioEndPoint, + BucketName: b.params.MinioCfg.BucketName, + AccessKeyID: b.params.MinioCfg.AccessKeyID, + SecretAccessKeyID: b.params.MinioCfg.SecretAccessKey, + UseSSL: b.params.MinioCfg.UseSSL, + UseIAM: b.params.MinioCfg.UseIAM, + IAMEndpoint: b.params.MinioCfg.IAMEndpoint, + RootPath: b.params.MinioCfg.RootPath, + CreateBucket: true, + } + + storageClient, err := storage.NewChunkManager(b.ctx, b.params, storageConfig) if err != nil { log.Error("failed to initial storage client", zap.Error(err)) panic(err) } - b.storageClient = &storageClient + b.milvusStorageClient = storageClient } - return *b.storageClient + return b.milvusStorageClient +} + +func (b *BackupContext) getBackupStorageClient() storage.ChunkManager { + if b.backupStorageClient == nil { + minioEndPoint := b.params.MinioCfg.BackupAddress + ":" + b.params.MinioCfg.BackupPort + log.Debug("create backup storage client", + zap.String("address", minioEndPoint), + zap.String("bucket", b.params.MinioCfg.BucketName), + zap.String("backupBucket", b.params.MinioCfg.BackupBucketName)) + + storageConfig := &storage.StorageConfig{ + StorageType: b.params.MinioCfg.BackupStorageType, + Address: minioEndPoint, + BucketName: b.params.MinioCfg.BackupBucketName, + AccessKeyID: b.params.MinioCfg.BackupAccessKeyID, + SecretAccessKeyID: b.params.MinioCfg.BackupSecretAccessKey, + UseSSL: b.params.MinioCfg.BackupUseSSL, + UseIAM: b.params.MinioCfg.BackupUseIAM, + IAMEndpoint: b.params.MinioCfg.BackupIAMEndpoint, + RootPath: b.params.MinioCfg.BackupRootPath, + CreateBucket: true, + } + + storageClient, err := storage.NewChunkManager(b.ctx, b.params, storageConfig) + if err != nil { + log.Error("failed to initial storage client", zap.Error(err)) + panic(err) + } + b.backupStorageClient = storageClient + } + return b.backupStorageClient +} + +func (b *BackupContext) getBackupCopier() *storage.Copier { + if b.backupCopier == nil { + b.backupCopier = storage.NewCopier( + b.getMilvusStorageClient(), + b.getBackupStorageClient(), + storage.CopyOption{ + WorkerNum: b.params.BackupCfg.BackupCopyDataParallelism, + RPS: RPS, + CopyByServer: b.params.BackupCfg.CopyByServer, + }) + } + return b.backupCopier +} + +func (b *BackupContext) getRestoreCopier() *storage.Copier { + if b.restoreCopier == nil { + b.restoreCopier = storage.NewCopier( + b.getBackupStorageClient(), + b.getMilvusStorageClient(), + storage.CopyOption{ + WorkerNum: b.params.BackupCfg.BackupCopyDataParallelism, + RPS: RPS, + CopyByServer: b.params.BackupCfg.CopyByServer, + }) + } + return b.restoreCopier } func (b *BackupContext) getBackupCollectionWorkerPool() *common.WorkerPool { @@ -308,7 +405,7 @@ func (b *BackupContext) ListBackups(ctx context.Context, request *backuppb.ListB } // 1, trigger inner sync to get the newest backup list in the milvus cluster - backupPaths, _, err := b.getStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false) + backupPaths, _, err := b.getBackupStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false) if err != nil { log.Error("Fail to list backup directory", zap.Error(err)) resp.Code = backuppb.ResponseCode_Fail @@ -394,7 +491,7 @@ func (b *BackupContext) DeleteBackup(ctx context.Context, request *backuppb.Dele BackupName: request.GetBackupName(), }) // always trigger a remove to make sure it is deleted - err := b.getStorageClient().RemoveWithPrefix(ctx, b.backupBucketName, BackupDirPath(b.backupRootPath, request.GetBackupName())) + err := b.getBackupStorageClient().RemoveWithPrefix(ctx, b.backupBucketName, BackupDirPath(b.backupRootPath, request.GetBackupName())) if getResp.GetCode() == backuppb.ResponseCode_Request_Object_Not_Found { resp.Code = backuppb.ResponseCode_Request_Object_Not_Found @@ -465,7 +562,7 @@ func (b *BackupContext) readBackup(ctx context.Context, bucketName string, backu partitionMetaPath := backupMetaDirPath + SEPERATOR + PARTITION_META_FILE segmentMetaPath := backupMetaDirPath + SEPERATOR + SEGMENT_META_FILE - exist, err := b.getStorageClient().Exist(ctx, bucketName, backupMetaPath) + exist, err := b.getBackupStorageClient().Exist(ctx, bucketName, backupMetaPath) if err != nil { log.Error("check backup meta file failed", zap.String("path", backupMetaPath), zap.Error(err)) return nil, err @@ -475,22 +572,22 @@ func (b *BackupContext) readBackup(ctx context.Context, bucketName string, backu return nil, err } - backupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, backupMetaPath) + backupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, backupMetaPath) if err != nil { log.Error("Read backup meta failed", zap.String("path", backupMetaPath), zap.Error(err)) return nil, err } - collectionBackupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, collectionMetaPath) + collectionBackupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, collectionMetaPath) if err != nil { log.Error("Read collection meta failed", zap.String("path", collectionMetaPath), zap.Error(err)) return nil, err } - partitionBackupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, partitionMetaPath) + partitionBackupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, partitionMetaPath) if err != nil { log.Error("Read partition meta failed", zap.String("path", partitionMetaPath), zap.Error(err)) return nil, err } - segmentBackupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, segmentMetaPath) + segmentBackupMetaBytes, err := b.getBackupStorageClient().Read(ctx, bucketName, segmentMetaPath) if err != nil { log.Error("Read segment meta failed", zap.String("path", segmentMetaPath), zap.Error(err)) return nil, err @@ -573,7 +670,7 @@ func (b *BackupContext) Check(ctx context.Context) string { "backup-rootpath: %s\n", version, b.milvusBucketName, b.milvusRootPath, b.backupBucketName, b.backupRootPath) - paths, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR, false) + paths, _, err := b.getMilvusStorageClient().ListWithPrefix(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR, false) if err != nil { return "Failed to connect to storage milvus path\n" + info + err.Error() } @@ -582,27 +679,27 @@ func (b *BackupContext) Check(ctx context.Context) string { return "Milvus storage is empty. Please verify whether your cluster is really empty. If not, the configs(minio address, port, bucket, rootPath) may be wrong\n" + info } - paths, _, err = b.getStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false) + paths, _, err = b.getBackupStorageClient().ListWithPrefix(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR, false) if err != nil { return "Failed to connect to storage backup path " + info + err.Error() } CHECK_PATH := "milvus_backup_check_" + time.Now().String() - err = b.getStorageClient().Write(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, []byte{1}) + err = b.getMilvusStorageClient().Write(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, []byte{1}) if err != nil { return "Failed to connect to storage milvus path\n" + info + err.Error() } defer func() { - b.getStorageClient().Remove(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH) + b.getMilvusStorageClient().Remove(ctx, b.milvusBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH) }() - err = b.getStorageClient().Copy(ctx, b.milvusBucketName, b.backupBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, b.backupRootPath+SEPERATOR+CHECK_PATH) + err = b.getMilvusStorageClient().Copy(ctx, b.milvusBucketName, b.backupBucketName, b.milvusRootPath+SEPERATOR+CHECK_PATH, b.backupRootPath+SEPERATOR+CHECK_PATH) if err != nil { return "Failed to copy file from milvus storage to backup storage\n" + info + err.Error() } defer func() { - b.getStorageClient().Remove(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR+CHECK_PATH) + b.getBackupStorageClient().Remove(ctx, b.backupBucketName, b.backupRootPath+SEPERATOR+CHECK_PATH) }() return "Succeed to connect to milvus and storage.\n" + info diff --git a/core/backup_context_test.go b/core/backup_context_test.go index 88cfb6d2..773df830 100644 --- a/core/backup_context_test.go +++ b/core/backup_context_test.go @@ -228,7 +228,7 @@ func TestGetBackupFaultBackup(t *testing.T) { resp := backupContext.CreateBackup(context, req) assert.Equal(t, backuppb.ResponseCode_Success, resp.GetCode()) - backupContext.getStorageClient().RemoveWithPrefix(context, params.MinioCfg.BackupBucketName, BackupMetaPath(params.MinioCfg.BackupRootPath, resp.GetData().GetName())) + backupContext.getMilvusStorageClient().RemoveWithPrefix(context, params.MinioCfg.BackupBucketName, BackupMetaPath(params.MinioCfg.BackupRootPath, resp.GetData().GetName())) backup := backupContext.GetBackup(context, &backuppb.GetBackupRequest{ BackupName: randBackupName, diff --git a/core/backup_impl_create_backup.go b/core/backup_impl_create_backup.go index 306d042c..672d7d56 100644 --- a/core/backup_impl_create_backup.go +++ b/core/backup_impl_create_backup.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" "github.com/zilliztech/milvus-backup/core/proto/backuppb" + "github.com/zilliztech/milvus-backup/core/storage" "github.com/zilliztech/milvus-backup/core/utils" "github.com/zilliztech/milvus-backup/internal/log" "github.com/zilliztech/milvus-backup/internal/util/retry" @@ -55,7 +56,7 @@ func (b *BackupContext) CreateBackup(ctx context.Context, request *backuppb.Crea request.BackupName = "backup_" + fmt.Sprint(time.Now().UTC().Format("2006_01_02_15_04_05_")) + fmt.Sprint(time.Now().Nanosecond()) } if request.GetBackupName() != "" { - exist, err := b.getStorageClient().Exist(b.ctx, b.backupBucketName, b.backupRootPath+SEPERATOR+request.GetBackupName()) + exist, err := b.getBackupStorageClient().Exist(b.ctx, b.backupBucketName, b.backupRootPath+SEPERATOR+request.GetBackupName()) if err != nil { errMsg := fmt.Sprintf("fail to check whether exist backup with name: %s", request.GetBackupName()) log.Error(errMsg, zap.Error(err)) @@ -777,12 +778,12 @@ func (b *BackupContext) writeBackupInfoMeta(ctx context.Context, id string) erro } log.Debug("channel cp meta", zap.String("value", string(channelCPsBytes))) - b.getStorageClient().Write(ctx, b.backupBucketName, BackupMetaPath(b.backupRootPath, backupInfo.GetName()), output.BackupMetaBytes) - b.getStorageClient().Write(ctx, b.backupBucketName, CollectionMetaPath(b.backupRootPath, backupInfo.GetName()), output.CollectionMetaBytes) - b.getStorageClient().Write(ctx, b.backupBucketName, PartitionMetaPath(b.backupRootPath, backupInfo.GetName()), output.PartitionMetaBytes) - b.getStorageClient().Write(ctx, b.backupBucketName, SegmentMetaPath(b.backupRootPath, backupInfo.GetName()), output.SegmentMetaBytes) - b.getStorageClient().Write(ctx, b.backupBucketName, FullMetaPath(b.backupRootPath, backupInfo.GetName()), output.FullMetaBytes) - b.getStorageClient().Write(ctx, b.backupBucketName, ChannelCPMetaPath(b.backupRootPath, backupInfo.GetName()), channelCPsBytes) + b.getBackupStorageClient().Write(ctx, b.backupBucketName, BackupMetaPath(b.backupRootPath, backupInfo.GetName()), output.BackupMetaBytes) + b.getBackupStorageClient().Write(ctx, b.backupBucketName, CollectionMetaPath(b.backupRootPath, backupInfo.GetName()), output.CollectionMetaBytes) + b.getBackupStorageClient().Write(ctx, b.backupBucketName, PartitionMetaPath(b.backupRootPath, backupInfo.GetName()), output.PartitionMetaBytes) + b.getBackupStorageClient().Write(ctx, b.backupBucketName, SegmentMetaPath(b.backupRootPath, backupInfo.GetName()), output.SegmentMetaBytes) + b.getBackupStorageClient().Write(ctx, b.backupBucketName, FullMetaPath(b.backupRootPath, backupInfo.GetName()), output.FullMetaBytes) + b.getBackupStorageClient().Write(ctx, b.backupBucketName, ChannelCPMetaPath(b.backupRootPath, backupInfo.GetName()), channelCPsBytes) log.Info("finish writeBackupInfoMeta", zap.String("path", BackupDirPath(b.backupRootPath, backupInfo.GetName())), @@ -840,7 +841,7 @@ func (b *BackupContext) copySegment(ctx context.Context, backupBinlogPath string } //binlog := binlog - exist, err := b.getStorageClient().Exist(ctx, b.milvusBucketName, binlog.GetLogPath()) + exist, err := b.getMilvusStorageClient().Exist(ctx, b.milvusBucketName, binlog.GetLogPath()) if err != nil { log.Info("Fail to check file exist", zap.Error(err), @@ -855,7 +856,8 @@ func (b *BackupContext) copySegment(ctx context.Context, backupBinlogPath string } err = retry.Do(ctx, func() error { - return b.getStorageClient().Copy(ctx, b.milvusBucketName, b.backupBucketName, binlog.GetLogPath(), targetPath) + attr := storage.ObjectAttr{Key: binlog.GetLogPath()} + return b.getBackupCopier().Copy(ctx, attr, targetPath, b.milvusBucketName, b.backupBucketName) }, retry.Sleep(2*time.Second), retry.Attempts(5)) if err != nil { log.Info("Fail to copy file after retry", @@ -885,7 +887,7 @@ func (b *BackupContext) copySegment(ctx context.Context, backupBinlogPath string } //binlog := binlog - exist, err := b.getStorageClient().Exist(ctx, b.milvusBucketName, binlog.GetLogPath()) + exist, err := b.getMilvusStorageClient().Exist(ctx, b.milvusBucketName, binlog.GetLogPath()) if err != nil { log.Info("Fail to check file exist", zap.Error(err), @@ -899,7 +901,8 @@ func (b *BackupContext) copySegment(ctx context.Context, backupBinlogPath string return errors.New("Binlog file not exist " + binlog.GetLogPath()) } err = retry.Do(ctx, func() error { - return b.getStorageClient().Copy(ctx, b.milvusBucketName, b.backupBucketName, binlog.GetLogPath(), targetPath) + attr := storage.ObjectAttr{Key: binlog.GetLogPath()} + return b.getBackupCopier().Copy(ctx, attr, targetPath, b.milvusBucketName, b.backupBucketName) }, retry.Sleep(2*time.Second), retry.Attempts(5)) if err != nil { log.Info("Fail to copy file after retry", @@ -930,7 +933,7 @@ func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackup insertPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "insert_log", segmentBackupInfo.GetCollectionId(), segmentBackupInfo.GetPartitionId(), segmentBackupInfo.GetSegmentId()) log.Debug("insertPath", zap.String("bucket", b.milvusBucketName), zap.String("insertPath", insertPath)) - fieldsLogDir, _, err := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, insertPath, false) + fieldsLogDir, _, err := b.getMilvusStorageClient().ListWithPrefix(ctx, b.milvusBucketName, insertPath, false) // handle segment level isL0 := false if len(fieldsLogDir) == 0 { @@ -943,7 +946,7 @@ func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackup log.Debug("fieldsLogDir", zap.String("bucket", b.milvusBucketName), zap.Any("fieldsLogDir", fieldsLogDir)) insertLogs := make([]*backuppb.FieldBinlog, 0) for _, fieldLogDir := range fieldsLogDir { - binlogPaths, sizes, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, fieldLogDir, false) + binlogPaths, sizes, _ := b.getMilvusStorageClient().ListWithPrefix(ctx, b.milvusBucketName, fieldLogDir, false) fieldIdStr := strings.Replace(strings.Replace(fieldLogDir, insertPath, "", 1), SEPERATOR, "", -1) fieldId, _ := strconv.ParseInt(fieldIdStr, 10, 64) binlogs := make([]*backuppb.Binlog, 0) @@ -961,10 +964,10 @@ func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackup } deltaLogPath := fmt.Sprintf("%s%s/%v/%v/%v/", rootPath, "delta_log", segmentBackupInfo.GetCollectionId(), segmentBackupInfo.GetPartitionId(), segmentBackupInfo.GetSegmentId()) - deltaFieldsLogDir, _, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, deltaLogPath, false) + deltaFieldsLogDir, _, _ := b.getMilvusStorageClient().ListWithPrefix(ctx, b.milvusBucketName, deltaLogPath, false) deltaLogs := make([]*backuppb.FieldBinlog, 0) for _, deltaFieldLogDir := range deltaFieldsLogDir { - binlogPaths, sizes, _ := b.getStorageClient().ListWithPrefix(ctx, b.milvusBucketName, deltaFieldLogDir, false) + binlogPaths, sizes, _ := b.getMilvusStorageClient().ListWithPrefix(ctx, b.milvusBucketName, deltaFieldLogDir, false) fieldIdStr := strings.Replace(strings.Replace(deltaFieldLogDir, deltaLogPath, "", 1), SEPERATOR, "", -1) fieldId, _ := strconv.ParseInt(fieldIdStr, 10, 64) binlogs := make([]*backuppb.Binlog, 0) @@ -987,10 +990,10 @@ func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackup } //statsLogPath := fmt.Sprintf("%s/%s/%v/%v/%v/", b.params.MinioCfg.RootPath, "stats_log", collectionID, partitionID, segmentID) - //statsFieldsLogDir, _, _ := b.storageClient.ListWithPrefix(ctx, b.milvusBucketName, statsLogPath, false) + //statsFieldsLogDir, _, _ := b.milvusStorageClient.ListWithPrefix(ctx, b.milvusBucketName, statsLogPath, false) //statsLogs := make([]*backuppb.FieldBinlog, 0) //for _, statsFieldLogDir := range statsFieldsLogDir { - // binlogPaths, sizes, _ := b.storageClient.ListWithPrefix(ctx, b.milvusBucketName, statsFieldLogDir, false) + // binlogPaths, sizes, _ := b.milvusStorageClient.ListWithPrefix(ctx, b.milvusBucketName, statsFieldLogDir, false) // fieldIdStr := strings.Replace(strings.Replace(statsFieldLogDir, statsLogPath, "", 1), SEPERATOR, "", -1) // fieldId, _ := strconv.ParseInt(fieldIdStr, 10, 64) // binlogs := make([]*backuppb.Binlog, 0) diff --git a/core/backup_impl_restore_backup.go b/core/backup_impl_restore_backup.go index ed67f523..bf08571f 100644 --- a/core/backup_impl_restore_backup.go +++ b/core/backup_impl_restore_backup.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "path" "strings" "time" @@ -15,6 +16,7 @@ import ( "go.uber.org/zap" "github.com/zilliztech/milvus-backup/core/proto/backuppb" + "github.com/zilliztech/milvus-backup/core/storage" "github.com/zilliztech/milvus-backup/core/utils" "github.com/zilliztech/milvus-backup/internal/common" "github.com/zilliztech/milvus-backup/internal/log" @@ -562,11 +564,12 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup tempDir := fmt.Sprintf("restore-temp-%s-%s-%s%s", parentTaskID, task.TargetDbName, task.TargetCollectionName, SEPERATOR) isSameBucket := b.milvusBucketName == backupBucketName + isSameStorage := b.getMilvusStorageClient().Config().StorageType == b.getBackupStorageClient().Config().StorageType // clean the temporary file defer func() { if !isSameBucket && !b.params.BackupCfg.KeepTempFiles { log.Info("Delete temporary file", zap.String("dir", tempDir)) - err := b.getStorageClient().RemoveWithPrefix(ctx, b.milvusBucketName, tempDir) + err := b.getMilvusStorageClient().RemoveWithPrefix(ctx, b.milvusBucketName, tempDir) if err != nil { log.Warn("Delete temporary file failed", zap.Error(err)) } @@ -577,22 +580,24 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup copyAndBulkInsert := func(dbName, collectionName, partitionName string, files []string, isL0 bool, skipDiskQuotaCheck bool) error { realFiles := make([]string, len(files)) // if milvus bucket and backup bucket are not the same, should copy the data first - if !isSameBucket { - log.Info("milvus bucket and backup bucket are not the same, copy the data first", zap.Strings("files", files)) + if !isSameBucket || !isSameStorage { + log.Info("milvus and backup store in different bucket, copy the data first", zap.Strings("files", files), zap.String("copyDataPath", tempDir)) for i, file := range files { // empty delta file, no need to copy if file == "" { realFiles[i] = file } else { - log.Debug("Copy temporary restore file", zap.String("from", file), zap.String("to", tempDir+file)) + tempFilekey := path.Join(tempDir, strings.Replace(file, b.params.MinioCfg.BackupRootPath, "", 1)) + log.Debug("Copy temporary restore file", zap.String("from", file), zap.String("to", tempFilekey)) err := retry.Do(ctx, func() error { - return b.getStorageClient().Copy(ctx, backupBucketName, b.milvusBucketName, file, tempDir+file) + attr := storage.ObjectAttr{Key: file} + return b.getRestoreCopier().Copy(ctx, attr, tempFilekey, backupBucketName, b.milvusBucketName) }, retry.Sleep(2*time.Second), retry.Attempts(5)) if err != nil { log.Error("fail to copy backup date from backup bucket to restore target milvus bucket after retry", zap.Error(err)) return err } - realFiles[i] = tempDir + file + realFiles[i] = tempFilekey } } } else { @@ -845,7 +850,7 @@ func (b *BackupContext) getBackupPartitionPaths(ctx context.Context, bucketName insertPath := fmt.Sprintf("%s/%s/%s/%v/%v/", backupPath, BINGLOG_DIR, INSERT_LOG_DIR, partition.GetCollectionId(), partition.GetPartitionId()) deltaPath := fmt.Sprintf("%s/%s/%s/%v/%v/", backupPath, BINGLOG_DIR, DELTA_LOG_DIR, partition.GetCollectionId(), partition.GetPartitionId()) - exist, err := b.getStorageClient().Exist(ctx, bucketName, deltaPath) + exist, err := b.getBackupStorageClient().Exist(ctx, bucketName, deltaPath) if err != nil { log.Warn("check binlog exist fail", zap.Error(err)) return []string{}, 0, err @@ -877,7 +882,7 @@ func (b *BackupContext) getBackupPartitionPathsWithGroupID(ctx context.Context, } } - exist, err := b.getStorageClient().Exist(ctx, bucketName, deltaPath) + exist, err := b.getBackupStorageClient().Exist(ctx, bucketName, deltaPath) if err != nil { log.Warn("check binlog exist fail", zap.Error(err)) return []string{}, 0, err diff --git a/core/milvus_storage_test.go b/core/milvus_storage_test.go index 9ca83cd3..1c109c6a 100644 --- a/core/milvus_storage_test.go +++ b/core/milvus_storage_test.go @@ -27,16 +27,19 @@ func newMinioChunkManager(ctx context.Context, bucketName string) (*storage.Mini secretAccessKey, _ := Params.Load("minio.secretAccessKey") useSSLStr, _ := Params.Load("minio.useSSL") useSSL, _ := strconv.ParseBool(useSSLStr) - client, err := storage.NewMinioChunkManager(ctx, - storage.Address(endPoint), - storage.AccessKeyID(accessKeyID), - storage.SecretAccessKeyID(secretAccessKey), - storage.UseSSL(useSSL), - storage.BucketName(bucketName), - storage.UseIAM(false), - storage.IAMEndpoint(""), - storage.CreateBucket(true), - ) + + storageConfig := &storage.StorageConfig{ + StorageType: "minio", + Address: endPoint, + AccessKeyID: accessKeyID, + SecretAccessKeyID: secretAccessKey, + UseSSL: useSSL, + CreateBucket: true, + UseIAM: false, + IAMEndpoint: "", + } + + client, err := storage.NewMinioChunkManagerWithConfig(ctx, storageConfig) return client, err } @@ -90,7 +93,7 @@ func TestReadMilvusData(t *testing.T) { context := context.Background() //backupContext := CreateBackupContext(context, params) - client, err := CreateStorageClient(context, params) + client, err := createStorageClient(context, params) assert.NoError(t, err) paths, _, err := client.ListWithPrefix(context, params.MinioCfg.BucketName, "file/insert_log/437296492118216229/437296492118216230/", true) assert.NoError(t, err) diff --git a/core/paramtable/params.go b/core/paramtable/params.go index 17da26d0..80994dce 100644 --- a/core/paramtable/params.go +++ b/core/paramtable/params.go @@ -43,6 +43,8 @@ type BackupConfig struct { GcPauseEnable bool GcPauseSeconds int GcPauseAddress string + + CopyByServer bool } func (p *BackupConfig) init(base *BaseTable) { @@ -56,6 +58,7 @@ func (p *BackupConfig) init(base *BaseTable) { p.initGcPauseEnable() p.initGcPauseSeconds() p.initGcPauseAddress() + p.initCopyByServer() } func (p *BackupConfig) initMaxSegmentGroupSize() { @@ -101,6 +104,15 @@ func (p *BackupConfig) initGcPauseAddress() { p.GcPauseAddress = address } +func (p *BackupConfig) initCopyByServer() { + copyByServer := p.Base.LoadWithDefault("backup.copyByServer", "false") + var err error + p.CopyByServer, err = strconv.ParseBool(copyByServer) + if err != nil { + panic("parse bool CopyByServer:" + err.Error()) + } +} + type MilvusConfig struct { Base *BaseTable @@ -194,6 +206,9 @@ var supportedStorageType = map[string]bool{ type MinioConfig struct { Base *BaseTable + StorageType string + // Deprecated + CloudProvider string Address string Port string AccessKeyID string @@ -202,15 +217,18 @@ type MinioConfig struct { BucketName string RootPath string UseIAM bool - CloudProvider string IAMEndpoint string + BackupStorageType string + BackupAddress string + BackupPort string BackupAccessKeyID string BackupSecretAccessKey string + BackupUseSSL bool BackupBucketName string BackupRootPath string - - StorageType string + BackupUseIAM bool + BackupIAMEndpoint string } func (p *MinioConfig) init(base *BaseTable) { @@ -228,10 +246,16 @@ func (p *MinioConfig) init(base *BaseTable) { p.initCloudProvider() p.initIAMEndpoint() + p.initBackupStorageType() + p.initBackupAddress() + p.initBackupPort() p.initBackupAccessKeyID() p.initBackupSecretAccessKey() + p.initBackupUseSSL() p.initBackupBucketName() p.initBackupRootPath() + p.initBackupUseIAM() + p.initBackupIAMEndpoint() } func (p *MinioConfig) initAddress() { @@ -256,7 +280,11 @@ func (p *MinioConfig) initSecretAccessKey() { func (p *MinioConfig) initUseSSL() { usessl := p.Base.LoadWithDefault("minio.useSSL", DefaultMinioUseSSL) - p.UseSSL, _ = strconv.ParseBool(usessl) + var err error + p.UseSSL, err = strconv.ParseBool(usessl) + if err != nil { + panic("parse bool useIAM:" + err.Error()) + } } func (p *MinioConfig) initBucketName() { @@ -290,36 +318,88 @@ func (p *MinioConfig) initIAMEndpoint() { p.IAMEndpoint = iamEndpoint } +func (p *MinioConfig) initStorageType() { + engine := p.Base.LoadWithDefault("storage.storageType", + p.Base.LoadWithDefault("minio.storageType", + p.Base.LoadWithDefault("minio.cloudProvider", DefaultStorageType))) + if !supportedStorageType[engine] { + panic("unsupported storage type:" + engine) + } + p.StorageType = engine +} + +func (p *MinioConfig) initBackupStorageType() { + engine := p.Base.LoadWithDefault("storage.backupStorageType", + p.Base.LoadWithDefault("minio.backupStorageType", + p.StorageType)) + if !supportedStorageType[engine] { + panic("unsupported storage type:" + engine) + } + p.BackupStorageType = engine +} + +func (p *MinioConfig) initBackupAddress() { + endpoint := p.Base.LoadWithDefault("minio.backupAddress", + p.Base.LoadWithDefault("minio.address", DefaultMinioAddress)) + p.BackupAddress = endpoint +} + +func (p *MinioConfig) initBackupPort() { + port := p.Base.LoadWithDefault("minio.backupPort", + p.Base.LoadWithDefault("minio.port", DefaultMinioPort)) + p.BackupPort = port +} + +func (p *MinioConfig) initBackupUseSSL() { + usessl := p.Base.LoadWithDefault("minio.backupUseSSL", + p.Base.LoadWithDefault("minio.useSSL", DefaultMinioUseSSL)) + var err error + p.BackupUseSSL, err = strconv.ParseBool(usessl) + if err != nil { + panic("parse bool backupUseSSL:" + err.Error()) + } +} + +func (p *MinioConfig) initBackupUseIAM() { + useIAM := p.Base.LoadWithDefault("minio.backupUseIAM", + p.Base.LoadWithDefault("minio.useIAM", DefaultMinioUseIAM)) + var err error + p.BackupUseIAM, err = strconv.ParseBool(useIAM) + if err != nil { + panic("parse bool backupUseIAM:" + err.Error()) + } +} + +func (p *MinioConfig) initBackupIAMEndpoint() { + iamEndpoint := p.Base.LoadWithDefault("minio.backupIamEndpoint", + p.Base.LoadWithDefault("minio.iamEndpoint", DefaultMinioIAMEndpoint)) + p.BackupIAMEndpoint = iamEndpoint +} + func (p *MinioConfig) initBackupAccessKeyID() { - keyID := p.Base.LoadWithDefault("minio.backupAccessKeyID", DefaultMinioAccessKey) + keyID := p.Base.LoadWithDefault("minio.backupAccessKeyID", + p.Base.LoadWithDefault("minio.accessKeyID", DefaultMinioAccessKey)) p.BackupAccessKeyID = keyID } func (p *MinioConfig) initBackupSecretAccessKey() { - key := p.Base.LoadWithDefault("minio.backupSecretAccessKey", DefaultMinioSecretAccessKey) + key := p.Base.LoadWithDefault("minio.secretAccessKey", + p.Base.LoadWithDefault("minio.backupSecretAccessKey", DefaultMinioSecretAccessKey)) p.BackupSecretAccessKey = key } func (p *MinioConfig) initBackupBucketName() { - bucketName := p.Base.LoadWithDefault("minio.backupBucketName", DefaultMinioBackupBucketName) + bucketName := p.Base.LoadWithDefault("minio.backupBucketName", + p.Base.LoadWithDefault("minio.bucketName", DefaultMinioBackupBucketName)) p.BackupBucketName = bucketName } func (p *MinioConfig) initBackupRootPath() { - rootPath := p.Base.LoadWithDefault("minio.backupRootPath", DefaultMinioBackupRootPath) + rootPath := p.Base.LoadWithDefault("minio.backupRootPath", + p.Base.LoadWithDefault("minio.rootPath", DefaultMinioBackupRootPath)) p.BackupRootPath = rootPath } -func (p *MinioConfig) initStorageType() { - engine := p.Base.LoadWithDefault("storage.storageType", - p.Base.LoadWithDefault("minio.storageType", - p.Base.LoadWithDefault("minio.cloudProvider", DefaultStorageType))) - if !supportedStorageType[engine] { - panic("unsupported storage type:" + engine) - } - p.StorageType = engine -} - type HTTPConfig struct { Base *BaseTable diff --git a/core/storage/azure_chunk_manager.go b/core/storage/azure_chunk_manager.go index e4f094af..746ee20e 100644 --- a/core/storage/azure_chunk_manager.go +++ b/core/storage/azure_chunk_manager.go @@ -38,16 +38,37 @@ import ( type AzureChunkManager struct { aos *AzureObjectStorage + config *StorageConfig //cli *azblob.Client // ctx context.Context - //bucketName string - //rootPath string + //BucketName string + //RootPath string +} + +func (mcm *AzureChunkManager) UploadObject(ctx context.Context, i UploadObjectInput) error { + //TODO implement me + panic("implement me") +} + +func (mcm *AzureChunkManager) GetObject(ctx context.Context, bucket, key string) (*Object, error) { + //TODO implement me + panic("implement me") +} + +func (mcm *AzureChunkManager) HeadObject(ctx context.Context, bucket, key string) (ObjectAttr, error) { + //TODO implement me + panic("implement me") +} + +func (mcm *AzureChunkManager) ListObjectsPage(ctx context.Context, bucket, prefix string) (ListObjectsPaginator, error) { + //TODO implement me + panic("implement me") } var _ ChunkManager = (*AzureChunkManager)(nil) -func NewAzureChunkManager(ctx context.Context, c *config) (*AzureChunkManager, error) { - aos, err := newAzureObjectStorageWithConfig(ctx, c) +func NewAzureChunkManager(ctx context.Context, config *StorageConfig) (*AzureChunkManager, error) { + aos, err := newAzureObjectStorageWithConfig(ctx, config) if err != nil { return nil, err } @@ -57,19 +78,19 @@ func NewAzureChunkManager(ctx context.Context, c *config) (*AzureChunkManager, e // return nil, err //} mcm := &AzureChunkManager{ - aos: aos, + aos: aos, + config: config, //cli: cli, - //bucketName: c.bucketName, - //rootPath: strings.TrimLeft(c.rootPath, "/"), + //BucketName: c.BucketName, + //RootPath: strings.TrimLeft(c.RootPath, "/"), } log.Info("Azure chunk manager init success.") return mcm, nil } -// RootPath returns minio root path. -//func (mcm *AzureChunkManager) RootPath() string { -// return mcm.rootPath -//} +func (mcm *AzureChunkManager) Config() *StorageConfig { + return mcm.config +} func (mcm *AzureChunkManager) Copy(ctx context.Context, fromBucketName string, toBucketName string, fromPath string, toPath string) error { objectkeys, _, err := mcm.ListWithPrefix(ctx, fromBucketName, fromPath, true) @@ -120,7 +141,6 @@ func (mcm *AzureChunkManager) Size(ctx context.Context, bucketName string, fileP return objectInfo, nil } -// // Write writes the data to minio storage. func (mcm *AzureChunkManager) Write(ctx context.Context, bucketName string, filePath string, content []byte) error { err := mcm.putObject(ctx, bucketName, filePath, bytes.NewReader(content), int64(len(content))) @@ -339,7 +359,7 @@ func (mcm *AzureChunkManager) ListWithPrefix(ctx context.Context, bucketName str } func (mcm *AzureChunkManager) getObject(ctx context.Context, bucketName, objectName string, offset int64, size int64) (FileReader, error) { - //resp, err := mcm.cli.DownloadStream(ctx, bucketName, objectName, nil) + //resp, err := mcm.cli.DownloadStream(ctx, BucketName, objectName, nil) //if err != nil { // return nil, fmt.Errorf("storage: azure download stream %w", err) //} diff --git a/core/storage/azure_object_storage.go b/core/storage/azure_object_storage.go index 34ea568b..9f624173 100644 --- a/core/storage/azure_object_storage.go +++ b/core/storage/azure_object_storage.go @@ -51,15 +51,15 @@ type innerAzureClient struct { type AzureObjectStorage struct { //Client *service.Client clients map[string]*innerAzureClient - //config *config + //StorageConfig *StorageConfig } -//func NewAzureClient(ctx context.Context, cfg *config) (*azblob.Client, error) { -// cred, err := azblob.NewSharedKeyCredential(cfg.accessKeyID, cfg.secretAccessKeyID) +//func NewAzureClient(ctx context.Context, cfg *StorageConfig) (*azblob.Client, error) { +// cred, err := azblob.NewSharedKeyCredential(cfg.AccessKeyID, cfg.SecretAccessKeyID) // if err != nil { // return nil, fmt.Errorf("storage: new azure shared key credential %w", err) // } -// endpoint := fmt.Sprintf("https://%s.blob.core.windows.net", cfg.accessKeyID) +// endpoint := fmt.Sprintf("https://%s.blob.core.windows.net", cfg.AccessKeyID) // cli, err := azblob.NewClientWithSharedKeyCredential(endpoint, cred, nil) // if err != nil { // return nil, fmt.Errorf("storage: new azure aos %w", err) @@ -68,22 +68,22 @@ type AzureObjectStorage struct { // return cli, nil //} -func newAzureObjectStorageWithConfig(ctx context.Context, c *config) (*AzureObjectStorage, error) { - client, err := newAzureObjectClient(ctx, c.address, c.accessKeyID, c.secretAccessKeyID, c.bucketName, c.useIAM, c.createBucket) +func newAzureObjectStorageWithConfig(ctx context.Context, c *StorageConfig) (*AzureObjectStorage, error) { + client, err := newAzureObjectClient(ctx, c.Address, c.AccessKeyID, c.SecretAccessKeyID, c.BucketName, c.UseIAM, c.CreateBucket) if err != nil { return nil, err } - backupClient, err := newAzureObjectClient(ctx, c.address, c.backupAccessKeyID, c.backupSecretAccessKeyID, c.backupBucketName, c.useIAM, c.createBucket) + backupClient, err := newAzureObjectClient(ctx, c.Address, c.backupAccessKeyID, c.backupSecretAccessKeyID, c.backupBucketName, c.UseIAM, c.CreateBucket) if err != nil { return nil, err } clients := map[string]*innerAzureClient{ - c.bucketName: client, + c.BucketName: client, c.backupBucketName: backupClient, } return &AzureObjectStorage{ clients: clients, - //config: c, + //StorageConfig: c, }, nil } @@ -178,7 +178,7 @@ func (aos *AzureObjectStorage) ListObjects(ctx context.Context, bucketName strin pager := aos.clients[bucketName].client.NewContainerClient(bucketName).NewListBlobsFlatPager(&azblob.ListBlobsFlatOptions{ Prefix: &prefix, }) - // pager := aos.Client.NewContainerClient(bucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ + // pager := aos.Client.NewContainerClient(BucketName).NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{ // Prefix: &prefix, // }) diff --git a/core/storage/chunk_manager.go b/core/storage/chunk_manager.go index 850c895e..947435b5 100644 --- a/core/storage/chunk_manager.go +++ b/core/storage/chunk_manager.go @@ -6,47 +6,59 @@ import ( "github.com/zilliztech/milvus-backup/core/paramtable" ) -func NewChunkManager(ctx context.Context, params paramtable.BackupParams) (ChunkManager, error) { - engine := params.MinioCfg.StorageType - switch engine { +// ChunkManager is to manager chunks. +type ChunkManager interface { + Config() *StorageConfig + + // Write writes @content to @filePath. + Write(ctx context.Context, bucketName string, filePath string, content []byte) error + // Exist returns true if @filePath exists. + Exist(ctx context.Context, bucketName string, filePath string) (bool, error) + // Read reads @filePath and returns content. + Read(ctx context.Context, bucketName string, filePath string) ([]byte, error) + // ListWithPrefix list all objects with same @prefix + ListWithPrefix(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []int64, error) + // Remove delete @filePath. + Remove(ctx context.Context, bucketName string, filePath string) error + // RemoveWithPrefix remove files with same @prefix. + RemoveWithPrefix(ctx context.Context, bucketName string, prefix string) error + // Copy files from fromPath into toPath recursively + Copy(ctx context.Context, fromBucketName string, toBucketName string, fromPath string, toPath string) error + + // ListObjectsPage paginate list of all objects + ListObjectsPage(ctx context.Context, bucket, prefix string) (ListObjectsPaginator, error) + // HeadObject determine if an object exists, and you have permission to access it. + HeadObject(ctx context.Context, bucket, key string) (ObjectAttr, error) + // GetObject get an object + GetObject(ctx context.Context, bucket, key string) (*Object, error) + // UploadObject stream upload an object + UploadObject(ctx context.Context, i UploadObjectInput) error +} + +func NewChunkManager(ctx context.Context, params paramtable.BackupParams, config *StorageConfig) (ChunkManager, error) { + switch config.StorageType { case paramtable.Local: - return newLocalChunkManagerWithParams(ctx, params) + return NewLocalChunkManager(ctx, config) case paramtable.CloudProviderAzure: - return newAzureChunkManagerWithParams(ctx, params) + // todo @wayblink + return newAzureChunkManagerWithParams(ctx, params, config) default: - return newMinioChunkManagerWithParams(ctx, params) + return NewMinioChunkManagerWithConfig(ctx, config) } } -func newMinioChunkManagerWithParams(ctx context.Context, params paramtable.BackupParams) (*MinioChunkManager, error) { - c := newDefaultConfig() - c.address = params.MinioCfg.Address + ":" + params.MinioCfg.Port - c.accessKeyID = params.MinioCfg.AccessKeyID - c.secretAccessKeyID = params.MinioCfg.SecretAccessKey - c.useSSL = params.MinioCfg.UseSSL - c.bucketName = params.MinioCfg.BackupBucketName - c.rootPath = params.MinioCfg.RootPath - //c.cloudProvider = params.MinioCfg.CloudProvider - c.storageType = params.MinioCfg.StorageType - c.useIAM = params.MinioCfg.UseIAM - c.iamEndpoint = params.MinioCfg.IAMEndpoint - c.createBucket = true - return newMinioChunkManagerWithConfig(ctx, c) -} - -func newAzureChunkManagerWithParams(ctx context.Context, params paramtable.BackupParams) (*AzureChunkManager, error) { +func newAzureChunkManagerWithParams(ctx context.Context, params paramtable.BackupParams, config *StorageConfig) (*AzureChunkManager, error) { c := newDefaultConfig() - c.address = params.MinioCfg.Address + ":" + params.MinioCfg.Port - c.accessKeyID = params.MinioCfg.AccessKeyID - c.secretAccessKeyID = params.MinioCfg.SecretAccessKey - c.useSSL = params.MinioCfg.UseSSL - c.bucketName = params.MinioCfg.BucketName - c.rootPath = params.MinioCfg.RootPath - //c.cloudProvider = params.MinioCfg.CloudProvider - c.storageType = params.MinioCfg.StorageType - c.useIAM = params.MinioCfg.UseIAM - c.iamEndpoint = params.MinioCfg.IAMEndpoint - c.createBucket = true + c.Address = params.MinioCfg.Address + ":" + params.MinioCfg.Port + c.AccessKeyID = params.MinioCfg.AccessKeyID + c.SecretAccessKeyID = params.MinioCfg.SecretAccessKey + c.UseSSL = params.MinioCfg.UseSSL + c.BucketName = params.MinioCfg.BucketName + c.RootPath = params.MinioCfg.RootPath + c.StorageType = params.MinioCfg.StorageType + c.UseIAM = params.MinioCfg.UseIAM + c.IAMEndpoint = params.MinioCfg.IAMEndpoint + c.CreateBucket = true c.backupAccessKeyID = params.MinioCfg.BackupAccessKeyID c.backupSecretAccessKeyID = params.MinioCfg.BackupSecretAccessKey @@ -55,13 +67,3 @@ func newAzureChunkManagerWithParams(ctx context.Context, params paramtable.Backu return NewAzureChunkManager(ctx, c) } - -func newLocalChunkManagerWithParams(ctx context.Context, params paramtable.BackupParams) (*LocalChunkManager, error) { - c := newDefaultConfig() - c.rootPath = params.MinioCfg.RootPath - //c.cloudProvider = params.MinioCfg.CloudProvider - c.storageType = params.MinioCfg.StorageType - c.backupRootPath = params.MinioCfg.BackupRootPath - - return NewLocalChunkManager(ctx, c) -} diff --git a/core/storage/copier.go b/core/storage/copier.go new file mode 100644 index 00000000..3d0afccb --- /dev/null +++ b/core/storage/copier.go @@ -0,0 +1,286 @@ +package storage + +import ( + "bufio" + "context" + "fmt" + "io" + "strings" + + "go.uber.org/atomic" + "go.uber.org/zap" + "golang.org/x/time/rate" + + "github.com/zilliztech/milvus-backup/internal/common" + "github.com/zilliztech/milvus-backup/internal/log" +) + +const ( + _32M = 32 << 20 + _100M = 100 << 20 +) + +const _copyWorkerNum = 10 + +// limReader speed limit reader +type limReader struct { + r io.Reader + lim *rate.Limiter + ctx context.Context +} + +func (r *limReader) Read(p []byte) (int, error) { + n, err := r.r.Read(p) + if err != nil { + return n, err + } + + if err := r.lim.WaitN(r.ctx, n); err != nil { + return n, err + } + + return n, err +} + +type CopyOption struct { + // BytePS byte/s copy speed limit, 0 is unlimited, default is unlimited + BytePS float64 + // WorkerNum the number of copy task worker, default is 10 + WorkerNum int + // RPS the number of copy requests initiated per second, 0 is unlimited, default is unlimited + RPS int32 + // BufSizeByte the size of the buffer that the copier can use, default is 100MB + BufSizeByte int + // CopyByServer copy data through server when can't copy by client directly + CopyByServer bool +} + +type Copier struct { + src ChunkManager + dest ChunkManager + + // lim stream copy speed limiter + lim *rate.Limiter + workerNum int + bufSizeBytePerWorker int + rps int32 + + totalSize atomic.Uint64 + totalCnt atomic.Uint64 + + size atomic.Uint64 + cnt atomic.Uint64 + + useCopyByServer bool +} + +func NewCopier(src, dest ChunkManager, opt CopyOption) *Copier { + var lim *rate.Limiter + if opt.BytePS != 0 { + lim = rate.NewLimiter(rate.Limit(opt.BytePS), _32M) + } + + workerNum := _copyWorkerNum + if opt.WorkerNum != 0 { + workerNum = opt.WorkerNum + } + bufSizeBytePerWorker := _100M / workerNum + if opt.BufSizeByte != 0 { + bufSizeBytePerWorker = opt.BufSizeByte / workerNum + } + + return &Copier{ + src: src, + dest: dest, + lim: lim, + useCopyByServer: opt.CopyByServer, + workerNum: workerNum, + bufSizeBytePerWorker: bufSizeBytePerWorker, + } +} + +type Process struct { + TotalSize uint64 + TotalCnt uint64 + + Size uint64 + Cnt uint64 +} + +func (c *Copier) Process() Process { + return Process{ + TotalSize: c.totalSize.Load(), + TotalCnt: c.totalCnt.Load(), + + Size: c.size.Load(), + Cnt: c.cnt.Load(), + } +} + +type CopyPathInput struct { + SrcBucket string + SrcPrefix string + + DestBucket string + DestKeyFn func(attr ObjectAttr) string + + // optional + CopySuffix string + + // OnSuccess when an object copy success, this func will be call + // May be executed concurrently, please pay attention to thread safety + OnSuccess func(attr ObjectAttr) +} + +// getAttrs get all attrs under bucket/prefix +func (c *Copier) getAttrs(ctx context.Context, bucket, prefix string, copySuffix string) ([]ObjectAttr, error) { + var attrs []ObjectAttr + + p, err := c.src.ListObjectsPage(ctx, bucket, prefix) + if err != nil { + return nil, err + } + for p.HasMorePages() { + page, err := p.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("storage: copier list objects %w", err) + } + for _, attr := range page.Contents { + if attr.IsEmpty() { + continue + } + + if copySuffix != "" && !strings.HasSuffix(attr.Key, copySuffix) { + continue + } + + attrs = append(attrs, attr) + c.totalSize.Add(uint64(attr.Length)) + c.cnt.Add(1) + } + } + + return attrs, nil +} + +// CopyPrefix Copy all files under src path +func (c *Copier) CopyPrefix(ctx context.Context, i CopyPathInput) error { + srcAttrs, err := c.getAttrs(ctx, i.SrcBucket, i.SrcPrefix, i.CopySuffix) + if err != nil { + return fmt.Errorf("storage: copier get src attrs %w", err) + } + + wp, err := common.NewWorkerPool(ctx, c.workerNum, c.rps) + if err != nil { + return fmt.Errorf("storage: copier new worker pool %w", err) + } + wp.Start() + fn := c.selectCopyFn() + for _, srcAttr := range srcAttrs { + attr := srcAttr + job := func(ctx context.Context) error { + destKey := i.DestKeyFn(attr) + // copy + destAttr, err := c.dest.HeadObject(ctx, i.DestBucket, destKey) + if err != nil || !attr.SameAs(destAttr) { + if err := fn(ctx, attr, destKey, i.SrcBucket, i.DestBucket); err != nil { + return fmt.Errorf("storage: copier copy object %w", err) + } + } + // check + destAttr, err = c.dest.HeadObject(ctx, i.DestBucket, destKey) + if err != nil { + return fmt.Errorf("storage: after copy %w", err) + } + if destAttr.Length != attr.Length { + return fmt.Errorf("storage: dest len %d != src len %d", destAttr.Length, attr.Length) + } + + if i.OnSuccess != nil { + i.OnSuccess(attr) + } + c.cnt.Add(1) + + return nil + } + + wp.Submit(job) + } + wp.Done() + + if err := wp.Wait(); err != nil { + return fmt.Errorf("storage: copier copy prefix %w", err) + } + return nil +} + +func (c *Copier) Copy(ctx context.Context, attr ObjectAttr, destPrefix, srcBucket, destBucket string) error { + fn := c.selectCopyFn() + srcAttrs, err := c.getAttrs(ctx, srcBucket, attr.Key, "") + if err != nil { + return fmt.Errorf("storage: copier get src attrs %w", err) + } + for _, srcAttr := range srcAttrs { + destKey := strings.Replace(srcAttr.Key, attr.Key, destPrefix, 1) + err := fn(ctx, srcAttr, destKey, srcBucket, destBucket) + if err != nil { + return err + } + } + return nil +} + +type copyFn func(ctx context.Context, attr ObjectAttr, destKey, srcBucket, destBucket string) error + +func (c *Copier) selectCopyFn() copyFn { + if c.useCopyByServer { + return c.copyByServer + } + return c.copyRemote +} + +func (c *Copier) copyRemote(ctx context.Context, attr ObjectAttr, destKey, srcBucket, destBucket string) error { + log.Debug("copyRemote", zap.String("srcBucket", srcBucket), zap.String("destBucket", destBucket), zap.String("key", attr.Key), zap.String("destKey", destKey)) + if err := c.dest.Copy(ctx, srcBucket, destBucket, attr.Key, destKey); err != nil { + return fmt.Errorf("storage: copier copy object %w", err) + } + + return nil +} + +func (c *Copier) copyByServer(ctx context.Context, attr ObjectAttr, destKey, srcBucket, destBucket string) error { + log.Debug("copyByServer", zap.String("srcBucket", srcBucket), zap.String("destBucket", destBucket), zap.String("key", attr.Key), zap.String("destKey", destKey)) + obj, err := c.src.GetObject(ctx, srcBucket, attr.Key) + if err != nil { + log.Warn("storage: copier get object", zap.String("bucket", srcBucket), zap.String("key", attr.Key), zap.Error(err)) + return err + } + defer obj.Body.Close() + + body := c.newProcessReader(bufio.NewReaderSize(obj.Body, c.bufSizeBytePerWorker)) + if c.lim != nil { + body = &limReader{r: body, lim: c.lim, ctx: ctx} + } + i := UploadObjectInput{Body: body, Bucket: destBucket, Key: destKey, Size: attr.Length} + if err := c.dest.UploadObject(ctx, i); err != nil { + log.Warn("storage: copier upload object", zap.String("bucket", destBucket), zap.String("key", destKey), zap.Error(err)) + return err + } + + return nil +} + +type processReader struct { + src io.Reader + len *atomic.Uint64 +} + +func (r *processReader) Read(p []byte) (int, error) { + n, err := r.src.Read(p) + r.len.Add(uint64(n)) + return n, err +} + +func (c *Copier) newProcessReader(src io.Reader) io.Reader { + return &processReader{src: src, len: &c.size} +} diff --git a/core/storage/local_chunk_manager.go b/core/storage/local_chunk_manager.go index 833d038a..71fa338e 100644 --- a/core/storage/local_chunk_manager.go +++ b/core/storage/local_chunk_manager.go @@ -27,7 +27,6 @@ import ( "strings" "github.com/cockroachdb/errors" - "github.com/zilliztech/milvus-backup/internal/log" ) @@ -39,18 +38,24 @@ func WrapErrFileNotFound(key string) error { type LocalChunkManager struct { rootPath string backupRootPath string + config *StorageConfig } var _ ChunkManager = (*LocalChunkManager)(nil) // NewLocalChunkManager create a new local manager object. -func NewLocalChunkManager(ctx context.Context, c *config) (*LocalChunkManager, error) { +func NewLocalChunkManager(ctx context.Context, config *StorageConfig) (*LocalChunkManager, error) { return &LocalChunkManager{ - rootPath: c.rootPath, - backupRootPath: c.backupRootPath, + rootPath: config.RootPath, + backupRootPath: config.backupRootPath, + config: config, }, nil } +func (mcm *LocalChunkManager) Config() *StorageConfig { + return mcm.config +} + // RootPath returns lcm root path. func (lcm *LocalChunkManager) RootPath() string { return lcm.rootPath @@ -201,6 +206,119 @@ func (lcm *LocalChunkManager) Copy(ctx context.Context, fromBucketName string, t } } +func (lcm *LocalChunkManager) UploadObject(ctx context.Context, i UploadObjectInput) error { + dir := path.Dir(i.Key) + exist, err := lcm.Exist(ctx, i.Bucket, dir) + if err != nil { + return err + } + if !exist { + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + return WrapErrFileNotFound(i.Key) + } + } + + // Open or create a local file + file, err := os.Create(i.Key) + if err != nil { + fmt.Println("Error creating file:", err) + return err + } + //defer file.Close() + + // Write the contents of the reader to the file using io.Copy + _, err = io.Copy(file, i.Body) + if err != nil { + fmt.Println("Error writing to file:", err) + return err + } + + fmt.Println("Successfully written to file!") + return nil +} + +type fileReader struct { + *os.File +} + +type FileObject struct { + Key string + Size int64 +} + +type LocalListObjectsPaginator struct { + Files []ObjectAttr + //PageSize int + //TotalPages int + //CurrentPage int + currentFile int +} + +func (p *LocalListObjectsPaginator) HasMorePages() bool { + return p.currentFile < len(p.Files) +} + +func (p *LocalListObjectsPaginator) NextPage(_ context.Context) (*Page, error) { + if p.currentFile >= len(p.Files) { + return nil, errors.New("storage: no more pages") + } + + contents := make([]ObjectAttr, 0) + obj := p.Files[p.currentFile] + contents = append(contents, obj) + p.currentFile = p.currentFile + 1 + + return &Page{Contents: contents}, nil +} + +func (lcm *LocalChunkManager) GetObject(ctx context.Context, bucket, key string) (*Object, error) { + // Open the local file + f, err := os.Open(key) + if err != nil { + return nil, err + } + size, err := lcm.Size(ctx, bucket, key) + if err != nil { + return nil, err + } + + // Wrap the file in the SeekableReadCloser interface + var src SeekableReadCloser = &fileReader{f} + return &Object{Length: size, Body: src}, nil +} + +func (lcm *LocalChunkManager) HeadObject(ctx context.Context, bucket, key string) (ObjectAttr, error) { + size, err := lcm.Size(ctx, bucket, key) + if err != nil { + return ObjectAttr{}, err + } + return ObjectAttr{Key: key, Length: size}, nil +} + +func (lcm *LocalChunkManager) ListObjectsPage(ctx context.Context, bucket, prefix string) (ListObjectsPaginator, error) { + files, sizes, err := lcm.ListWithPrefix(ctx, bucket, prefix, true) + if err != nil { + return nil, err + } + + // Create file objects to simulate S3 objects + fileObjects := []ObjectAttr{} + for i, fileKey := range files { + fileObjects = append(fileObjects, ObjectAttr{ + Key: fileKey, + Length: sizes[i], + }) + } + + paginator := &LocalListObjectsPaginator{ + Files: fileObjects, + currentFile: 0, + } + + return paginator, nil +} + func CopyDir(source string, dest string) (err error) { // get properties of source dir sourceinfo, err := os.Stat(source) diff --git a/core/storage/minio_chunk_manager.go b/core/storage/minio_chunk_manager.go index 42e02fd3..a37aa3ce 100644 --- a/core/storage/minio_chunk_manager.go +++ b/core/storage/minio_chunk_manager.go @@ -5,18 +5,18 @@ import ( "context" "errors" "fmt" - "github.com/zilliztech/milvus-backup/core/storage/tencent" - "golang.org/x/sync/errgroup" "io" "strings" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/zilliztech/milvus-backup/core/paramtable" "github.com/zilliztech/milvus-backup/core/storage/aliyun" "github.com/zilliztech/milvus-backup/core/storage/gcp" + "github.com/zilliztech/milvus-backup/core/storage/tencent" "github.com/zilliztech/milvus-backup/internal/log" "github.com/zilliztech/milvus-backup/internal/util/errorutil" "github.com/zilliztech/milvus-backup/internal/util/retry" @@ -42,76 +42,67 @@ var CheckBucketRetryAttempts uint = 20 type MinioChunkManager struct { *minio.Client - // ctx context.Context + provider string bucketName string rootPath string + + config *StorageConfig } var _ ChunkManager = (*MinioChunkManager)(nil) -// NewMinioChunkManager create a new local manager object. -// Do not call this directly! Use factory.NewPersistentStorageChunkManager instead. -func NewMinioChunkManager(ctx context.Context, opts ...Option) (*MinioChunkManager, error) { - c := newDefaultConfig() - for _, opt := range opts { - opt(c) - } - - return newMinioChunkManagerWithConfig(ctx, c) -} - -func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunkManager, error) { +func NewMinioChunkManagerWithConfig(ctx context.Context, config *StorageConfig) (*MinioChunkManager, error) { var creds *credentials.Credentials var newMinioFn = minio.New var bucketLookupType = minio.BucketLookupAuto - switch c.storageType { + switch config.StorageType { case paramtable.CloudProviderAliyun: // auto doesn't work for aliyun, so we set to dns deliberately bucketLookupType = minio.BucketLookupDNS - if c.useIAM { + if config.UseIAM { newMinioFn = aliyun.NewMinioClient } else { - creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "") + creds = credentials.NewStaticV4(config.AccessKeyID, config.SecretAccessKeyID, "") } case paramtable.CloudProviderAli: // auto doesn't work for aliyun, so we set to dns deliberately bucketLookupType = minio.BucketLookupDNS - if c.useIAM { + if config.UseIAM { newMinioFn = aliyun.NewMinioClient } else { - creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "") + creds = credentials.NewStaticV4(config.AccessKeyID, config.SecretAccessKeyID, "") } case paramtable.CloudProviderGCP: newMinioFn = gcp.NewMinioClient - if !c.useIAM { - creds = credentials.NewStaticV2(c.accessKeyID, c.secretAccessKeyID, "") + if !config.UseIAM { + creds = credentials.NewStaticV2(config.AccessKeyID, config.SecretAccessKeyID, "") } case paramtable.CloudProviderTencentShort: bucketLookupType = minio.BucketLookupDNS newMinioFn = tencent.NewMinioClient - if !c.useIAM { - creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "") + if !config.UseIAM { + creds = credentials.NewStaticV4(config.AccessKeyID, config.SecretAccessKeyID, "") } case paramtable.CloudProviderTencent: bucketLookupType = minio.BucketLookupDNS newMinioFn = tencent.NewMinioClient - if !c.useIAM { - creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "") + if !config.UseIAM { + creds = credentials.NewStaticV4(config.AccessKeyID, config.SecretAccessKeyID, "") } default: // aws, minio - if c.useIAM { + if config.UseIAM { creds = credentials.NewIAM("") } else { - creds = credentials.NewStaticV4(c.accessKeyID, c.secretAccessKeyID, "") + creds = credentials.NewStaticV4(config.AccessKeyID, config.SecretAccessKeyID, "") } } minioOpts := &minio.Options{ BucketLookup: bucketLookupType, Creds: creds, - Secure: c.useSSL, + Secure: config.UseSSL, } - minIOClient, err := newMinioFn(c.address, minioOpts) + minIOClient, err := newMinioFn(config.Address, minioOpts) // options nil or invalid formatted endpoint, don't need to retry if err != nil { return nil, err @@ -119,21 +110,21 @@ func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunk var bucketExists bool // check valid in first query checkBucketFn := func() error { - bucketExists, err = minIOClient.BucketExists(ctx, c.bucketName) + bucketExists, err = minIOClient.BucketExists(ctx, config.BucketName) if err != nil { - log.Warn("failed to check blob bucket exist", zap.String("bucket", c.bucketName), zap.Error(err)) + log.Warn("failed to check blob bucket exist", zap.String("bucket", config.BucketName), zap.Error(err)) return err } if !bucketExists { - if c.createBucket { - log.Info("blob bucket not exist, create bucket.", zap.Any("bucket name", c.bucketName)) - err := minIOClient.MakeBucket(ctx, c.bucketName, minio.MakeBucketOptions{}) + if config.CreateBucket { + log.Info("blob bucket not exist, create bucket.", zap.Any("bucket name", config.BucketName)) + err := minIOClient.MakeBucket(ctx, config.BucketName, minio.MakeBucketOptions{}) if err != nil { - log.Warn("failed to create blob bucket", zap.String("bucket", c.bucketName), zap.Error(err)) + log.Warn("failed to create blob bucket", zap.String("bucket", config.BucketName), zap.Error(err)) return err } } else { - return fmt.Errorf("bucket %s not Existed", c.bucketName) + return fmt.Errorf("bucket %s not Existed", config.BucketName) } } return nil @@ -145,13 +136,19 @@ func newMinioChunkManagerWithConfig(ctx context.Context, c *config) (*MinioChunk mcm := &MinioChunkManager{ Client: minIOClient, - bucketName: c.bucketName, + bucketName: config.BucketName, + provider: config.StorageType, + config: config, } - mcm.rootPath = mcm.normalizeRootPath(c.rootPath) - log.Info("minio chunk manager init success.", zap.String("bucketname", c.bucketName), zap.String("root", mcm.RootPath())) + mcm.rootPath = mcm.normalizeRootPath(config.RootPath) + log.Info("minio chunk manager init success.", zap.String("bucketname", config.BucketName), zap.String("root", mcm.RootPath())) return mcm, nil } +func (mcm *MinioChunkManager) Config() *StorageConfig { + return mcm.config +} + // normalizeRootPath func (mcm *MinioChunkManager) normalizeRootPath(rootPath string) string { // no leading "/" @@ -231,7 +228,7 @@ func (mcm *MinioChunkManager) MultiWrite(ctx context.Context, bucketName string, // Exist checks whether chunk is saved to minio storage. func (mcm *MinioChunkManager) Exist(ctx context.Context, bucketName string, filePath string) (bool, error) { - //_, err := mcm.Client.StatObject(ctx, bucketName, filePath, minio.StatObjectOptions{}) + //_, err := mcm.Client.StatObject(ctx, BucketName, filePath, minio.StatObjectOptions{}) paths, _, err := mcm.ListWithPrefix(ctx, bucketName, filePath, false) if err != nil { errResponse := minio.ToErrorResponse(err) @@ -254,7 +251,7 @@ func (mcm *MinioChunkManager) Exist(ctx context.Context, bucketName string, file // Read reads the minio storage data if exists. func (mcm *MinioChunkManager) Read(ctx context.Context, bucketName string, filePath string) ([]byte, error) { - //object, err := mcm.Client.GetObject(ctx, bucketName, filePath, minio.GetObjectOptions{}) + //object, err := mcm.Client.GetObject(ctx, BucketName, filePath, minio.GetObjectOptions{}) //if err != nil { // log.Warn("failed to get object", zap.String("path", filePath), zap.Error(err)) // return nil, err @@ -457,21 +454,73 @@ func (mcm *MinioChunkManager) Copy(ctx context.Context, fromBucketName string, t return nil } -// Learn from file.ReadFile -func Read(r io.Reader, size int64) ([]byte, error) { - data := make([]byte, 0, size) - for { - if len(data) >= cap(data) { - d := append(data[:cap(data)], 0) - data = d[:len(data)] +const _defaultPageSize = 1000 + +type MinioListObjectPaginator struct { + cli *minio.Client + + objCh <-chan minio.ObjectInfo + pageSize int32 + hasMore bool +} + +func (p *MinioListObjectPaginator) HasMorePages() bool { return p.hasMore } + +func (p *MinioListObjectPaginator) NextPage(_ context.Context) (*Page, error) { + if !p.hasMore { + return nil, errors.New("storage: gcp no more pages") + } + + contents := make([]ObjectAttr, 0, p.pageSize) + for obj := range p.objCh { + if obj.Err != nil { + return nil, fmt.Errorf("storage list objs %w", obj.Err) } - n, err := r.Read(data[len(data):cap(data)]) - data = data[:len(data)+n] - if err != nil { - if err == io.EOF { - err = nil - } - return data, err + contents = append(contents, ObjectAttr{Key: obj.Key, Length: obj.Size, ETag: obj.ETag}) + if len(contents) == int(p.pageSize) { + return &Page{Contents: contents}, nil } } + p.hasMore = false + + return &Page{Contents: contents}, nil +} + +func (mcm *MinioChunkManager) HeadObject(ctx context.Context, bucket, key string) (ObjectAttr, error) { + attr, err := mcm.Client.StatObject(ctx, bucket, key, minio.StatObjectOptions{}) + if err != nil { + return ObjectAttr{}, fmt.Errorf("storage: %s head object %w", mcm.provider, err) + } + + return ObjectAttr{Key: attr.Key, Length: attr.Size, ETag: attr.ETag}, nil +} + +func (mcm *MinioChunkManager) ListObjectsPage(ctx context.Context, bucket, prefix string) (ListObjectsPaginator, error) { + objCh := mcm.Client.ListObjects(ctx, bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: true}) + return &MinioListObjectPaginator{cli: mcm.Client, objCh: objCh, pageSize: _defaultPageSize, hasMore: true}, nil +} + +func (mcm *MinioChunkManager) GetObject(ctx context.Context, bucket, key string) (*Object, error) { + obj, err := mcm.Client.GetObject(ctx, bucket, key, minio.GetObjectOptions{}) + if err != nil { + return nil, fmt.Errorf("storage: %s get object %w", mcm.provider, err) + } + attr, err := obj.Stat() + if err != nil { + return nil, fmt.Errorf("storage: %s get object attr %w", mcm.provider, err) + } + return &Object{Length: attr.Size, Body: obj}, nil +} + +func (mcm *MinioChunkManager) UploadObject(ctx context.Context, i UploadObjectInput) error { + opt := minio.PutObjectOptions{} + size := int64(-1) + if i.Size > 0 { + size = i.Size + } + if _, err := mcm.Client.PutObject(ctx, i.Bucket, i.Key, i.Body, size, opt); err != nil { + return fmt.Errorf("storage: %s upload object %s %w", mcm.provider, i.Key, err) + } + + return nil } diff --git a/core/storage/options.go b/core/storage/options.go index a68f8649..db976f82 100644 --- a/core/storage/options.go +++ b/core/storage/options.go @@ -1,20 +1,17 @@ package storage // Option for setting params used by chunk manager client. -type config struct { - address string - bucketName string - accessKeyID string - secretAccessKeyID string - useSSL bool - createBucket bool - rootPath string - useIAM bool - iamEndpoint string - - // deprecated - cloudProvider string - storageType string +type StorageConfig struct { + StorageType string + Address string + BucketName string + AccessKeyID string + SecretAccessKeyID string + UseSSL bool + CreateBucket bool + RootPath string + UseIAM bool + IAMEndpoint string backupAccessKeyID string backupSecretAccessKeyID string @@ -22,62 +19,56 @@ type config struct { backupRootPath string } -func newDefaultConfig() *config { - return &config{} +func newDefaultConfig() *StorageConfig { + return &StorageConfig{} } -// Option is used to config the retry function. -type Option func(*config) +// Option is used to StorageConfig the retry function. +type Option func(*StorageConfig) func Address(addr string) Option { - return func(c *config) { - c.address = addr + return func(c *StorageConfig) { + c.Address = addr } } func BucketName(bucketName string) Option { - return func(c *config) { - c.bucketName = bucketName + return func(c *StorageConfig) { + c.BucketName = bucketName } } func AccessKeyID(accessKeyID string) Option { - return func(c *config) { - c.accessKeyID = accessKeyID + return func(c *StorageConfig) { + c.AccessKeyID = accessKeyID } } func SecretAccessKeyID(secretAccessKeyID string) Option { - return func(c *config) { - c.secretAccessKeyID = secretAccessKeyID + return func(c *StorageConfig) { + c.SecretAccessKeyID = secretAccessKeyID } } func UseSSL(useSSL bool) Option { - return func(c *config) { - c.useSSL = useSSL + return func(c *StorageConfig) { + c.UseSSL = useSSL } } func CreateBucket(createBucket bool) Option { - return func(c *config) { - c.createBucket = createBucket - } -} - -func RootPath(rootPath string) Option { - return func(c *config) { - c.rootPath = rootPath + return func(c *StorageConfig) { + c.CreateBucket = createBucket } } func UseIAM(useIAM bool) Option { - return func(c *config) { - c.useIAM = useIAM + return func(c *StorageConfig) { + c.UseIAM = useIAM } } func IAMEndpoint(iamEndpoint string) Option { - return func(c *config) { - c.iamEndpoint = iamEndpoint + return func(c *StorageConfig) { + c.IAMEndpoint = iamEndpoint } } diff --git a/core/storage/types.go b/core/storage/types.go index 46166033..c85a9581 100644 --- a/core/storage/types.go +++ b/core/storage/types.go @@ -10,20 +10,99 @@ type FileReader interface { io.Closer } -// ChunkManager is to manager chunks. -type ChunkManager interface { - // Write writes @content to @filePath. - Write(ctx context.Context, bucketName string, filePath string, content []byte) error - // Exist returns true if @filePath exists. - Exist(ctx context.Context, bucketName string, filePath string) (bool, error) - // Read reads @filePath and returns content. - Read(ctx context.Context, bucketName string, filePath string) ([]byte, error) - // ListWithPrefix list all objects with same @prefix - ListWithPrefix(ctx context.Context, bucketName string, prefix string, recursive bool) ([]string, []int64, error) - // Remove delete @filePath. - Remove(ctx context.Context, bucketName string, filePath string) error - // RemoveWithPrefix remove files with same @prefix. - RemoveWithPrefix(ctx context.Context, bucketName string, prefix string) error - // Copy files from fromPath into toPath recursively - Copy(ctx context.Context, fromBucketName string, toBucketName string, fromPath string, toPath string) error +type CopyObjectInput struct { + SrcCli ChunkManager + + SrcBucket string + SrcKey string + + DestBucket string + DestKey string +} + +type ListObjectPageInput struct { + // The name of the bucket + Bucket string + // Limits the response to keys that begin with the specified prefix + Prefix string +} + +type GetObjectInput struct { + Bucket string + Key string +} + +type DeleteObjectsInput struct { + Bucket string + Keys []string +} + +type DeletePrefixInput struct { + Bucket string + Prefix string +} + +type UploadObjectInput struct { + Bucket string + Key string + Body io.Reader + + // The size of the file to be uploaded, if unknown, set to 0 or negative + // Configuring this parameter can help reduce memory usage. + Size int64 +} + +type SeekableReadCloser interface { + io.ReaderAt + io.Seeker + io.ReadCloser +} + +type Object struct { + Length int64 + Body SeekableReadCloser +} + +type ObjectAttr struct { + Key string + Length int64 + + // The documentation for s3 says, ETag may NOT be an MD5 digest of the object data. + ETag string +} + +// SameAs returns true if two ObjectAttr are the same. +// If two ObjectAttr have same length and ETag, they are considered. +func (o *ObjectAttr) SameAs(other ObjectAttr) bool { + return o.Length == other.Length && o.ETag == other.ETag +} + +func (o *ObjectAttr) IsEmpty() bool { return o.Length == 0 } + +type Page struct { + Contents []ObjectAttr +} + +type ListObjectsPaginator interface { + HasMorePages() bool + NextPage(ctx context.Context) (*Page, error) +} + +// Learn from file.ReadFile +func Read(r io.Reader, size int64) ([]byte, error) { + data := make([]byte, 0, size) + for { + if len(data) >= cap(data) { + d := append(data[:cap(data)], 0) + data = d[:len(data)] + } + n, err := r.Read(data[len(data):cap(data)]) + data = data[:len(data)+n] + if err != nil { + if err == io.EOF { + err = nil + } + return data, err + } + } } diff --git a/scripts/gen_swag.sh b/scripts/gen_swag.sh index 6d704f61..f7822877 100755 --- a/scripts/gen_swag.sh +++ b/scripts/gen_swag.sh @@ -1,7 +1,5 @@ #!/usr/bin/env bash -PWD := $(shell pwd) - PROGRAM=${PWD} GOPATH=$(go env GOPATH) BACK_PROTO_DIR=$PROGRAM/core/proto/