Skip to content

Commit

Permalink
fix(region): snapshot policy with multi disk on same guest fix(region…
Browse files Browse the repository at this point in the history
…): delete overdued snapshots on cleanup snapshot task only
  • Loading branch information
wanyaoqi committed Jan 14, 2025
1 parent e216b8a commit eb19002
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 66 deletions.
105 changes: 59 additions & 46 deletions pkg/compute/models/disks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2804,52 +2804,82 @@ func (manager *SDiskManager) AutoDiskSnapshot(ctx context.Context, userCred mccl
log.Errorf("Get auto snapshot disks id failed: %s", err)
return
}
log.Debugf("auto snapshot %d disks", len(disks))
now := time.Now()
log.Infof("auto snapshot %d disks", len(disks))

guestDps := map[string][]SSnapshotPolicyDisk{}
for i := 0; i < len(disks); i++ {
disk, err := disks[i].GetDisk()
if err != nil {
log.Errorf("get disk error: %v", err)
continue
}

err = func() error {
policy, err := disks[i].GetSnapshotPolicy()
if err != nil {
return errors.Wrapf(err, "GetSnapshotPolicy")
}

if len(disk.ExternalId) == 0 {
err = disk.validateDiskAutoCreateSnapshot()
if err != nil {
return errors.Wrapf(err, "validateDiskAutoCreateSnapshot")
}
}

snapshot, err := disk.CreateSnapshotAuto(ctx, userCred, policy)
if err != nil {
return errors.Wrapf(err, "CreateSnapshotAuto")
if guest := disk.GetGuest(); guest != nil {
if dps, ok := guestDps[guest.Id]; ok {
guestDps[guest.Id] = append(dps, disks[i])
} else {
guestDps[guest.Id] = []SSnapshotPolicyDisk{disks[i]}
}
continue
}

if err = disk.CleanOverduedSnapshots(ctx, userCred, policy, now); err != nil {
log.Errorf("failed clean overdued snapshots %s", err)
}
db.OpsLog.LogEvent(disk, db.ACT_DISK_AUTO_SNAPSHOT, snapshot.Name, userCred)
policy.ExecuteNotify(ctx, userCred, disk.GetName())
return nil
}()
err = manager.DoAutoSnapshot(ctx, userCred, &disks[i], disk, "")
if err != nil {
log.Errorf("auto snapshot %s error: %v", disk.Name, err)
db.OpsLog.LogEvent(disk, db.ACT_DISK_AUTO_SNAPSHOT_FAIL, err.Error(), userCred)
notifyclient.NotifySystemErrorWithCtx(ctx, disk.Id, disk.Name, db.ACT_DISK_AUTO_SNAPSHOT_FAIL, errors.Wrapf(err, "Disk auto create snapshot").Error())
}
}

for gid, diskSnapshotPolicies := range guestDps {
guest := GuestManager.FetchGuestById(gid)
err = manager.OrderCreateDisksSnapshotsBySnapshotPolicy(ctx, userCred, guest, diskSnapshotPolicies)
if err != nil {
log.Errorf("failed start OrderCreateDisksSnapshotsBySnapshotPolicy")
}
}
}

func (self *SDisk) CreateSnapshotAuto(
func (manager *SDiskManager) OrderCreateDisksSnapshotsBySnapshotPolicy(
ctx context.Context, userCred mcclient.TokenCredential, guest *SGuest, snapshotPolicyDisks []SSnapshotPolicyDisk,
) error {
params := jsonutils.NewDict()
params.Set("snapshot_policy_disks", jsonutils.Marshal(snapshotPolicyDisks))

task, err := taskman.TaskManager.NewTask(ctx, "GuestDisksSnapshotPolicyExecuteTask", guest, userCred, params, "", "", nil)
if err != nil {
return errors.Wrapf(err, "NewTask")
}
return task.ScheduleRun(nil)
}

func (manager *SDiskManager) DoAutoSnapshot(
ctx context.Context, userCred mcclient.TokenCredential,
policy *SSnapshotPolicy,
diskSnapshotPolicy *SSnapshotPolicyDisk, disk *SDisk, parentTaskId string,
) error {
policy, err := diskSnapshotPolicy.GetSnapshotPolicy()
if err != nil {
return errors.Wrapf(err, "GetSnapshotPolicy")
}

if len(disk.ExternalId) == 0 {
err = disk.validateDiskAutoCreateSnapshot()
if err != nil {
return errors.Wrapf(err, "validateDiskAutoCreateSnapshot")
}
}

snapshot, err := disk.CreateSnapshotAuto(ctx, userCred, policy, parentTaskId)
if err != nil {
return errors.Wrapf(err, "CreateSnapshotAuto")
}

db.OpsLog.LogEvent(disk, db.ACT_DISK_AUTO_SNAPSHOT, snapshot.Name, userCred)
policy.ExecuteNotify(ctx, userCred, disk.GetName())
return nil
}

func (self *SDisk) CreateSnapshotAuto(
ctx context.Context, userCred mcclient.TokenCredential, policy *SSnapshotPolicy, parentTaskId string,
) (*SSnapshot, error) {
storage, err := self.GetStorage()
if err != nil {
Expand Down Expand Up @@ -2892,30 +2922,13 @@ func (self *SDisk) CreateSnapshotAuto(
}

db.OpsLog.LogEvent(snapshot, db.ACT_CREATE, "disk create snapshot auto", userCred)
err = snapshot.StartSnapshotCreateTask(ctx, userCred, nil, "")
err = snapshot.StartSnapshotCreateTask(ctx, userCred, nil, parentTaskId)
if err != nil {
return nil, errors.Wrap(err, "disk auto snapshot start snapshot task")
}
return snapshot, nil
}

func (self *SDisk) CleanOverduedSnapshots(ctx context.Context, userCred mcclient.TokenCredential, sp *SSnapshotPolicy, now time.Time) error {
snapshot := new(SSnapshot)
err := SnapshotManager.Query().Equals("disk_id", self.Id).
Equals("created_by", api.SNAPSHOT_AUTO).Equals("fake_deleted", false).Asc("created_at").First(snapshot)
if err != nil {
return errors.Wrap(err, "get snapshot")
}
snapshot.SetModelManager(SnapshotManager, snapshot)
if snapshot.ExpiredAt.Before(now) {
err = snapshot.StartSnapshotDeleteTask(ctx, userCred, false, self.Id, 0, 0)
if err != nil {
return err
}
}
return nil
}

func (self *SDisk) StartCreateBackupTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
if task, err := taskman.TaskManager.NewTask(ctx, "DiskCreateBackupTask", self, userCred, nil, parentTaskId, "", nil); err != nil {
log.Errorln(err)
Expand Down
11 changes: 11 additions & 0 deletions pkg/compute/models/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"fmt"
"sync/atomic"
"time"

"yunion.io/x/cloudmux/pkg/cloudprovider"
Expand Down Expand Up @@ -1166,7 +1167,17 @@ func (manager *SSnapshotManager) GetResourceCount() ([]db.SScopeResourceCount, e
return db.CalculateResourceCount(virts, "tenant_id")
}

var SnapshotCleanupTaskRunning int32 = 0

func SnapshotCleanupTaskIsRunning() bool {
return atomic.LoadInt32(&SnapshotCleanupTaskRunning) == 1
}

func (manager *SSnapshotManager) CleanupSnapshots(ctx context.Context, userCred mcclient.TokenCredential, isStart bool) {
if SnapshotCleanupTaskIsRunning() {
log.Errorf("Previous CleanupSnapshots tasks still running !!!")
return
}
var now = time.Now()
var snapshot = new(SSnapshot)
err := manager.Query().
Expand Down
68 changes: 49 additions & 19 deletions pkg/compute/tasks/disk_clean_overdued_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"fmt"
"sync/atomic"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
Expand All @@ -40,10 +41,23 @@ type SnapshotCleanupTask struct {
taskman.STask
}

func (self *SnapshotCleanupTask) taskFailed(ctx context.Context, reason *jsonutils.JSONString) {
log.Infof("SnapshotCleanupTask failed %s", reason)
atomic.CompareAndSwapInt32(&models.SnapshotCleanupTaskRunning, 1, 0)
self.SetStageFailed(ctx, reason)
}

func (self *SnapshotCleanupTask) taskCompleted(ctx context.Context, data jsonutils.JSONObject) {
log.Infof("SnapshotCleanupTask completed %s", data)
atomic.CompareAndSwapInt32(&models.SnapshotCleanupTaskRunning, 1, 0)
self.SetStageComplete(ctx, nil)
}

func (self *SnapshotCleanupTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
atomic.CompareAndSwapInt32(&models.SnapshotCleanupTaskRunning, 0, 1)
now, err := self.Params.GetTime("tick")
if err != nil {
self.SetStageFailed(ctx, jsonutils.NewString("failed get tick"))
self.taskFailed(ctx, jsonutils.NewString("failed get tick"))
return
}
var snapshots = make([]models.SSnapshot, 0)
Expand All @@ -52,49 +66,65 @@ func (self *SnapshotCleanupTask) OnInit(ctx context.Context, obj db.IStandaloneM
Equals("created_by", compute.SNAPSHOT_AUTO).
LE("expired_at", now).All(&snapshots)
if err == sql.ErrNoRows {
self.SetStageComplete(ctx, nil)
self.taskCompleted(ctx, nil)
return
} else if err != nil {
self.SetStageFailed(ctx, jsonutils.NewString(fmt.Sprintf("failed get snapshot %s", err)))
self.taskFailed(ctx, jsonutils.NewString(fmt.Sprintf("failed get snapshot %s", err)))
return
}
self.StartSnapshotsDelete(ctx, snapshots)
snapshotIds := make([]string, len(snapshots))
for i := range snapshots {
snapshotIds[i] = snapshots[i].Id
}
self.StartSnapshotsDelete(ctx, snapshotIds)
}

func (self *SnapshotCleanupTask) OnInitFailed(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
log.Errorf("delete snapshots failed %s", data)
self.OnInit(ctx, obj, data)
}

func (self *SnapshotCleanupTask) StartSnapshotsDelete(ctx context.Context, snapshots []models.SSnapshot) {
snapshot := snapshots[0]
snapshots = snapshots[1:]
if len(snapshots) > 0 {
self.Params.Set("snapshots", jsonutils.Marshal(snapshots))
func (self *SnapshotCleanupTask) StartSnapshotsDelete(ctx context.Context, snapshotIds []string) {
snapshotId := snapshotIds[0]
snapshotIds = snapshotIds[1:]
if len(snapshotIds) > 0 {
self.Params.Set("snapshots", jsonutils.Marshal(snapshotIds))
}
self.SetStage("OnDeleteSnapshot", nil)
snapshot.SetModelManager(models.SnapshotManager, &snapshot)
err := snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId(), 0, 0)

iSnapshot, err := models.SnapshotManager.FetchById(snapshotId)
if err != nil {
self.OnDeleteSnapshotFailed(ctx, self.GetObject(), nil)
return
}
snapshot := iSnapshot.(*models.SSnapshot)
if snapshot.Status == compute.SNAPSHOT_DELETING {
self.OnDeleteSnapshot(ctx, snapshot, nil)
return
}

snapshot.SetModelManager(models.SnapshotManager, snapshot)
err = snapshot.StartSnapshotDeleteTask(ctx, self.UserCred, false, self.GetId(), 0, 0)
if err != nil {
self.OnDeleteSnapshotFailed(ctx, self.GetObject(), nil)
}
}

func (self *SnapshotCleanupTask) OnDeleteSnapshot(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
var snapshots = make([]models.SSnapshot, 0)
err := self.Params.Unmarshal(&snapshots, "snapshots")
var snapshotIds = make([]string, 0)
err := self.Params.Unmarshal(&snapshotIds, "snapshots")
if err != nil {
self.SetStageFailed(ctx, jsonutils.NewString(err.Error()))
self.taskFailed(ctx, jsonutils.NewString(err.Error()))
return
}
if len(snapshots) > 0 {
self.StartSnapshotsDelete(ctx, snapshots)
if len(snapshotIds) > 0 {
self.StartSnapshotsDelete(ctx, snapshotIds)
} else {
self.SetStageComplete(ctx, nil)
self.taskCompleted(ctx, nil)
}
}

func (self *SnapshotCleanupTask) OnDeleteSnapshotFailed(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
log.Errorf("snapshot delete faield %s", data)
self.SetStageFailed(ctx, data)
reason := fmt.Sprintf("snapshot delete faield %s", data)
self.taskFailed(ctx, jsonutils.NewString(reason))
}
1 change: 0 additions & 1 deletion pkg/compute/tasks/guest_disk_snapshot_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func (self *GuestDiskSnapshotTask) OnDiskSnapshotComplete(ctx context.Context, g
return
}

guest.SetStatus(ctx, self.UserCred, api.VM_SNAPSHOT_SUCC, "")
self.TaskComplete(ctx, guest, nil)
}

Expand Down
44 changes: 44 additions & 0 deletions pkg/compute/tasks/snapshot_create_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"

api "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
Expand All @@ -33,6 +35,7 @@ type SnapshotCreateTask struct {

func init() {
taskman.RegisterTask(SnapshotCreateTask{})
taskman.RegisterTask(GuestDisksSnapshotPolicyExecuteTask{})
}

func (self *SnapshotCreateTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
Expand Down Expand Up @@ -72,3 +75,44 @@ func (self *SnapshotCreateTask) OnCreateSnapshot(ctx context.Context, snapshot *
func (self *SnapshotCreateTask) OnCreateSnapshotFailed(ctx context.Context, snapshot *models.SSnapshot, data jsonutils.JSONObject) {
self.TaskFailed(ctx, snapshot, data)
}

type GuestDisksSnapshotPolicyExecuteTask struct {
taskman.STask
}

func (self *GuestDisksSnapshotPolicyExecuteTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
self.OnDiskSnapshot(ctx, obj, data)
}

func (self *GuestDisksSnapshotPolicyExecuteTask) OnDiskSnapshot(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
snapshotPolicyDisks := make([]models.SSnapshotPolicyDisk, 0)
self.Params.Unmarshal(&snapshotPolicyDisks, "snapshot_policy_disks")
if len(snapshotPolicyDisks) == 0 {
self.SetStageComplete(ctx, nil)
return
}
snapshotPolicyDisk := snapshotPolicyDisks[0]
self.Params.Set("snapshot_policy_disks", jsonutils.Marshal(snapshotPolicyDisks[1:]))
self.SetStage("OnDiskSnapshot", nil)

disk, err := snapshotPolicyDisk.GetDisk()
if err != nil {
log.Errorf("disk snapshot policy %s failed get disk %s", snapshotPolicyDisk.SnapshotpolicyId, err)
self.OnDiskSnapshot(ctx, obj, data)
return
}
err = models.DiskManager.DoAutoSnapshot(ctx, self.UserCred, &snapshotPolicyDisk, disk, self.GetTaskId())
if err != nil {
log.Errorf("disk.CreateSnapshotAuto failed %s %s", disk.Id, err)
db.OpsLog.LogEvent(disk, db.ACT_DISK_AUTO_SNAPSHOT_FAIL, err.Error(), self.UserCred)
notifyclient.NotifySystemErrorWithCtx(ctx, disk.Id, disk.Name, db.ACT_DISK_AUTO_SNAPSHOT_FAIL, errors.Wrapf(err, "Disk auto create snapshot").Error())

self.OnDiskSnapshot(ctx, obj, data)
return
}
}

func (self *GuestDisksSnapshotPolicyExecuteTask) OnDiskSnapshotFailed(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
log.Errorf("Guest create snapshot failed %s: %s", obj.GetId(), data)
self.OnDiskSnapshot(ctx, obj, data)
}

0 comments on commit eb19002

Please sign in to comment.