Skip to content

Commit

Permalink
Support backup&restore RBAC (#405)
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink authored Aug 21, 2024
1 parent b76aea5 commit c85a7ac
Show file tree
Hide file tree
Showing 12 changed files with 1,286 additions and 200 deletions.
3 changes: 3 additions & 0 deletions cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (
dbCollections string
force bool
metaOnly bool
rbac bool
)

var createBackupCmd = &cobra.Command{
Expand Down Expand Up @@ -63,6 +64,7 @@ var createBackupCmd = &cobra.Command{
DbCollections: utils.WrapDBCollections(dbCollections),
Force: force,
MetaOnly: metaOnly,
Rbac: rbac,
})

fmt.Println(resp.GetMsg())
Expand All @@ -78,6 +80,7 @@ func init() {
createBackupCmd.Flags().StringVarP(&dbCollections, "database_collections", "a", "", "databases and collections to backup, json format: {\"db1\":[\"c1\", \"c2\"],\"db2\":[]}")
createBackupCmd.Flags().BoolVarP(&force, "force", "f", false, "force backup, will skip flush, should make sure data has been stored into disk when using it")
createBackupCmd.Flags().BoolVarP(&metaOnly, "meta_only", "", false, "only backup collection meta instead of data")
createBackupCmd.Flags().BoolVarP(&rbac, "rbac", "", false, "whether backup RBAC meta")

createBackupCmd.Flags().SortFlags = false

Expand Down
3 changes: 3 additions & 0 deletions cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
restoreDropExistCollection bool
restoreDropExistIndex bool
restoreSkipCreateCollection bool
restoreRBAC bool
)

var restoreBackupCmd = &cobra.Command{
Expand Down Expand Up @@ -92,6 +93,7 @@ var restoreBackupCmd = &cobra.Command{
DropExistCollection: restoreDropExistCollection,
DropExistIndex: restoreDropExistIndex,
SkipCreateCollection: restoreSkipCreateCollection,
Rbac: restoreRBAC,
})

fmt.Println(resp.GetMsg())
Expand All @@ -114,6 +116,7 @@ func init() {
restoreBackupCmd.Flags().BoolVarP(&restoreDropExistCollection, "drop_exist_collection", "", false, "if true, drop existing target collection before create")
restoreBackupCmd.Flags().BoolVarP(&restoreDropExistIndex, "drop_exist_index", "", false, "if true, drop existing index of target collection before create")
restoreBackupCmd.Flags().BoolVarP(&restoreSkipCreateCollection, "skip_create_collection", "", false, "if true, will skip collection, use when collection exist, restore index or data")
restoreBackupCmd.Flags().BoolVarP(&restoreRBAC, "rbac", "", false, "whether restore RBAC meta")

// won't print flags in character order
restoreBackupCmd.Flags().SortFlags = false
Expand Down
33 changes: 32 additions & 1 deletion core/backup_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
Expand Down Expand Up @@ -241,7 +242,7 @@ func (b *BackupContext) GetBackup(ctx context.Context, request *backuppb.GetBack
backupBucketName = request.GetBucketName()
backupPath = request.GetPath() + SEPERATOR + request.GetBackupName()
}
backup, err := b.readBackup(ctx, backupBucketName, backupPath)
backup, err := b.readBackupV2(ctx, backupBucketName, backupPath)
if err != nil {
log.Warn("Fail to read backup",
zap.String("backupBucketName", backupBucketName),
Expand Down Expand Up @@ -427,6 +428,36 @@ func (b *BackupContext) DeleteBackup(ctx context.Context, request *backuppb.Dele
return resp
}

// read backup
// 1. first read backup from full meta
// 2. if full meta not exist, which means backup is a very old version, read from seperate files
func (b *BackupContext) readBackupV2(ctx context.Context, bucketName string, backupPath string) (*backuppb.BackupInfo, error) {
backupMetaDirPath := backupPath + SEPERATOR + META_PREFIX
fullMetaPath := backupMetaDirPath + SEPERATOR + FULL_META_FILE
exist, err := b.getStorageClient().Exist(ctx, bucketName, fullMetaPath)
if err != nil {
log.Error("check full meta file failed", zap.String("path", fullMetaPath), zap.Error(err))
return nil, err
}
if exist {
backupMetaBytes, err := b.getStorageClient().Read(ctx, bucketName, fullMetaPath)
if err != nil {
log.Error("Read backup meta failed", zap.String("path", fullMetaPath), zap.Error(err))
return nil, err
}
backupInfo := &backuppb.BackupInfo{}
err = json.Unmarshal(backupMetaBytes, backupInfo)
if err != nil {
log.Error("Read backup meta failed", zap.String("path", fullMetaPath), zap.Error(err))
return nil, err
}
return backupInfo, nil
} else {
return b.readBackup(ctx, bucketName, backupPath)
}
}

// read backup from seperated meta files
func (b *BackupContext) readBackup(ctx context.Context, bucketName string, backupPath string) (*backuppb.BackupInfo, error) {
backupMetaDirPath := backupPath + SEPERATOR + META_PREFIX
backupMetaPath := backupMetaDirPath + SEPERATOR + BACKUP_META_FILE
Expand Down
73 changes: 73 additions & 0 deletions core/backup_impl_create_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,15 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_SUCCESS
b.meta.UpdateBackup(backupInfo.Id, setStateCode(backuppb.BackupTaskStateCode_BACKUP_SUCCESS), setEndTime(time.Now().UnixNano()/int64(time.Millisecond)))

if request.GetRbac() {
err = b.backupRBAC(ctx, backupInfo)
if err != nil {
backupInfo.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL
backupInfo.ErrorMessage = err.Error()
return err
}
}

// 7, write meta data
err = b.writeBackupInfoMeta(ctx, backupInfo.GetId())
if err != nil {
Expand Down Expand Up @@ -1005,3 +1014,67 @@ func (b *BackupContext) fillSegmentBackupInfo(ctx context.Context, segmentBackup
log.Debug("fill segment info", zap.Int64("segId", segmentBackupInfo.GetSegmentId()), zap.Int64("size", size))
return nil
}

func (b *BackupContext) backupRBAC(ctx context.Context, backupInfo *backuppb.BackupInfo) error {
log.Info("backup RBAC")
rbacMeta, err := b.getMilvusClient().BackupRBAC(ctx)
if err != nil {
log.Error("fail in BackupMeta", zap.Error(err))
return err
}

users := make([]*backuppb.UserInfo, 0)
roles := make([]*backuppb.RoleEntity, 0)
grants := make([]*backuppb.GrantEntity, 0)

for _, user := range rbacMeta.Users {
roles := lo.Map(user.Roles, func(role string, index int) *backuppb.RoleEntity {
return &backuppb.RoleEntity{Name: role}
})
userP := &backuppb.UserInfo{
User: user.Name,
Password: user.Password,
Roles: roles,
}
users = append(users, userP)
}

for _, role := range rbacMeta.Roles {
roleP := &backuppb.RoleEntity{
Name: role.Name,
}
roles = append(roles, roleP)
}

for _, roleGrant := range rbacMeta.RoleGrants {
roleGrantP := &backuppb.GrantEntity{
Role: &backuppb.RoleEntity{
Name: roleGrant.RoleName,
},
Object: &backuppb.ObjectEntity{
Name: roleGrant.Object,
},
ObjectName: roleGrant.ObjectName,
Grantor: &backuppb.GrantorEntity{
User: &backuppb.UserEntity{
Name: roleGrant.GrantorName,
},
Privilege: &backuppb.PrivilegeEntity{
Name: roleGrant.PrivilegeName,
},
},
DbName: roleGrant.DbName,
}
grants = append(grants, roleGrantP)
}

rbacPb := &backuppb.RBACMeta{
Users: users,
Roles: roles,
Grants: grants,
}

log.Info("backup RBAC", zap.Int("users", len(users)), zap.Int("roles", len(roles)), zap.Int("grants", len(grants)))
b.meta.UpdateBackup(backupInfo.Id, setRBACMeta(rbacPb))
return nil
}
63 changes: 63 additions & 0 deletions core/backup_impl_restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
b.cleanRestoreWorkerPool(taskID)
}()

// restore rbac
if request.GetRbac() {
err := b.restoreRBAC(ctx, backup)
if err != nil {
log.Error("fail to restore RBAC", zap.Error(err))
resp.Code = backuppb.ResponseCode_Fail
resp.Msg = fmt.Sprintf("fail to restore RBAC, err: %s", err)
return resp
}
}

// 2, initial restoreCollectionTasks
toRestoreCollectionBackups := make([]*backuppb.CollectionBackupInfo, 0)

Expand Down Expand Up @@ -877,3 +888,55 @@ func (b *BackupContext) getBackupPartitionPathsWithGroupID(ctx context.Context,

return []string{insertPath, deltaPath}, totalSize, nil
}

func (b *BackupContext) restoreRBAC(ctx context.Context, backupInfo *backuppb.BackupInfo) error {
log.Info("restore RBAC")

rbacBackup := backupInfo.GetRbacMeta()
users := make([]*entity.UserInfo, 0)
roles := make([]*entity.Role, 0)
grants := make([]*entity.RoleGrants, 0)

for _, user := range rbacBackup.GetUsers() {
roles := lo.Map(user.GetRoles(), func(role *backuppb.RoleEntity, index int) string {
return role.Name
})
userEntity := &entity.UserInfo{
UserDescription: entity.UserDescription{
Name: user.GetUser(),
Roles: roles,
},
Password: user.Password,
}
users = append(users, userEntity)
}

for _, role := range rbacBackup.GetRoles() {
roleEntity := &entity.Role{
Name: role.GetName(),
}
roles = append(roles, roleEntity)
}

for _, roleGrant := range rbacBackup.GetGrants() {
roleGrantEntity := &entity.RoleGrants{
Object: roleGrant.Object.GetName(),
ObjectName: roleGrant.GetObjectName(),
RoleName: roleGrant.GetRole().GetName(),
GrantorName: roleGrant.GetGrantor().GetUser().GetName(),
PrivilegeName: roleGrant.GetGrantor().GetPrivilege().GetName(),
DbName: roleGrant.GetDbName(),
}
grants = append(grants, roleGrantEntity)
}

rbacMeta := &entity.RBACMeta{
Users: users,
Roles: roles,
RoleGrants: grants,
}

log.Info("restore RBAC", zap.Int("users", len(users)), zap.Int("roles", len(roles)), zap.Int("grants", len(grants)))
err := b.getMilvusClient().RestoreRBAC(ctx, rbacMeta)
return err
}
6 changes: 6 additions & 0 deletions core/backup_meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ func setMilvusVersion(milvusVersion string) BackupOpt {
}
}

func setRBACMeta(rbacMeta *backuppb.RBACMeta) BackupOpt {
return func(backup *backuppb.BackupInfo) {
backup.RbacMeta = rbacMeta
}
}

func (meta *MetaManager) UpdateBackup(backupID string, opts ...BackupOpt) {
meta.mu.Lock()
defer meta.mu.Unlock()
Expand Down
12 changes: 12 additions & 0 deletions core/milvus_sdk_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,15 @@ func (m *MilvusClient) DropIndex(ctx context.Context, db, collName string, index
}
return m.client.DropIndex(ctx, collName, "", gomilvus.WithIndexName(indexName))
}

func (m *MilvusClient) BackupRBAC(ctx context.Context) (*entity.RBACMeta, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.client.BackupRBAC(ctx)
}

func (m *MilvusClient) RestoreRBAC(ctx context.Context, rbacMeta *entity.RBACMeta) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.client.RestoreRBAC(ctx, rbacMeta)
}
58 changes: 58 additions & 0 deletions core/proto/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ message BackupInfo {
repeated CollectionBackupInfo collection_backups = 9;
int64 size = 10;
string milvus_version = 11;
RBACMeta rbac_meta = 12;
}

/**
Expand Down Expand Up @@ -161,6 +162,8 @@ message CreateBackupRequest {
int32 gc_pause_seconds = 9;
// gc pause API address
string gc_pause_address = 10;
// whether backup RBAC
bool rbac = 11;
}

/**
Expand Down Expand Up @@ -278,6 +281,8 @@ message RestoreBackupRequest {
string id = 16;
// if true, skip the diskQuota in Import
bool skipImportDiskQuotaCheck = 17;
// whether restore RBAC
bool rbac = 18;
}

message RestorePartitionTask {
Expand Down Expand Up @@ -475,4 +480,57 @@ message MsgPosition {
message ChannelPosition {
string name = 1;
string position = 2;
}

message RoleEntity {
string name = 1;
}

message UserEntity {
string name = 1;
}

message ObjectEntity {
string name = 1;
}

message PrivilegeEntity {
string name = 1;
}

message GrantorEntity {
UserEntity user = 1;
PrivilegeEntity privilege = 2;
}

message GrantPrivilegeEntity {
repeated GrantorEntity entities = 1;
}

message GrantEntity {
// role
RoleEntity role = 1;
// object
ObjectEntity object = 2;
// object name
string object_name = 3;
// privilege
GrantorEntity grantor = 4;
// db name
string db_name = 5;
}

message UserInfo {
string user = 1;
string password = 2;
repeated RoleEntity roles = 3;
}

message RBACMeta {
// user
repeated UserInfo users = 1;
// role
repeated RoleEntity roles = 2;
// (role, object, previledge)
repeated GrantEntity grants = 3;
}
Loading

0 comments on commit c85a7ac

Please sign in to comment.