Skip to content

Commit

Permalink
fix(backup): manually synchronizing backup volumes.
Browse files Browse the repository at this point in the history
When users make a requst to synchronize the backup target by
a new Backup Volume APIs `syncBackupTarget`, it will also trigger
the synchronization for thee backup volumes of the backup target.

Ref: 7982

Signed-off-by: James Lu <james.lu@suse.com>
  • Loading branch information
mantissahz authored and derekbit committed May 10, 2024
1 parent 22c952c commit e1a2b4c
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 12 deletions.
111 changes: 110 additions & 1 deletion api/backup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"fmt"
"net/http"

"github.com/gorilla/mux"
Expand All @@ -18,7 +19,61 @@ func (s *Server) BackupTargetList(w http.ResponseWriter, req *http.Request) erro
if err != nil {
return errors.Wrap(err, "failed to list backup targets")
}
apiContext.Write(toBackupTargetCollection(backupTargets))
apiContext.Write(toBackupTargetCollection(backupTargets, apiContext))
return nil
}

func (s *Server) BackupTargetSyncAll(w http.ResponseWriter, req *http.Request) error {
var input SyncBackupResource

apiContext := api.GetApiContext(req)
if err := apiContext.Read(&input); err != nil {
return err
}

bts, err := s.m.ListBackupTargetsSorted()
if err != nil {
return errors.Wrap(err, "failed to list backup targets")
}

if input.SyncAllBackupTargets {
for _, bt := range bts {
if _, err := s.m.SyncBackupTarget(bt); err != nil {
logrus.WithError(err).Warnf("Failed to synchronize backup target %v", bt.Name)
}
}
}

apiContext.Write(toBackupTargetCollection(bts, apiContext))
return nil
}

func (s *Server) BackupTargetSync(w http.ResponseWriter, req *http.Request) error {
var input SyncBackupResource

apiContext := api.GetApiContext(req)
if err := apiContext.Read(&input); err != nil {
return err
}

backupTargetName := mux.Vars(req)["backupTargetName"]
if backupTargetName == "" {
return fmt.Errorf("backup target name is required")
}

bt, err := s.m.GetBackupTarget(backupTargetName)
if err != nil {
return errors.Wrapf(err, "failed to get backup target %v", backupTargetName)
}

if input.SyncBackupTarget {
bt, err = s.m.SyncBackupTarget(bt)
if err != nil {
return errors.Wrapf(err, "failed to synchronize backup target %v", backupTargetName)
}
}

apiContext.Write(toBackupTargetResource(bt, apiContext))
return nil
}

Expand Down Expand Up @@ -54,6 +109,60 @@ func (s *Server) BackupVolumeGet(w http.ResponseWriter, req *http.Request) error
return nil
}

func (s *Server) BackupVolumeSyncAll(w http.ResponseWriter, req *http.Request) error {
var input SyncBackupResource

apiContext := api.GetApiContext(req)
if err := apiContext.Read(&input); err != nil {
return err
}

bvs, err := s.m.ListBackupVolumesSorted()
if err != nil {
return errors.Wrap(err, "failed to list backup volumes")
}

if input.SyncAllBackupVolumes {
for _, bv := range bvs {
if _, err := s.m.SyncBackupVolume(bv); err != nil {
logrus.WithError(err).Warnf("Failed to synchronize backup volume %v", bv.Name)
}
}
}

apiContext.Write(toBackupVolumeCollection(bvs, apiContext))
return nil
}

func (s *Server) SyncBackupVolume(w http.ResponseWriter, req *http.Request) error {
var input SyncBackupResource

apiContext := api.GetApiContext(req)
if err := apiContext.Read(&input); err != nil {
return err
}

volName := mux.Vars(req)["volName"]
if volName == "" {
return fmt.Errorf("backup volume name is required")
}

bv, err := s.m.GetBackupVolume(volName)
if err != nil {
return errors.Wrapf(err, "failed to get backup volume '%s'", volName)
}

if input.SyncBackupVolume {
bv, err = s.m.SyncBackupVolume(bv)
if err != nil {
return errors.Wrapf(err, "failed to synchronize backup volume %v", volName)
}
}

apiContext.Write(toBackupVolumeResource(bv, apiContext))
return nil
}

func (s *Server) BackupVolumeDelete(w http.ResponseWriter, req *http.Request) error {
volName := mux.Vars(req)["volName"]
if err := s.m.DeleteBackupVolume(volName); err != nil {
Expand Down
71 changes: 63 additions & 8 deletions api/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ type BackupVolume struct {
StorageClassName string `json:"storageClassName"`
}

// SyncBackupResource is used for the Backup*Sync* actions
type SyncBackupResource struct {
SyncAllBackupTargets bool `json:"syncAllBackupTargets"`
SyncBackupTarget bool `json:"syncBackupTarget"`
SyncAllBackupVolumes bool `json:"syncAllBackupVolumes"`
SyncBackupVolume bool `json:"syncBackupVolume"`
}

type Backup struct {
client.Resource

Expand Down Expand Up @@ -556,6 +564,16 @@ type VolumeRecurringJobInput struct {
longhorn.VolumeRecurringJob
}

type BackupTargetListOutput struct {
Data []BackupTarget `json:"data"`
Type string `json:"type"`
}

type BackupVolumeListOutput struct {
Data []BackupVolume `json:"data"`
Type string `json:"type"`
}

type BackupListOutput struct {
Data []Backup `json:"data"`
Type string `json:"type"`
Expand All @@ -581,10 +599,10 @@ func NewSchema() *client.Schemas {
schemas.AddType("detachInput", DetachInput{})
schemas.AddType("snapshotInput", SnapshotInput{})
schemas.AddType("snapshotCRInput", SnapshotCRInput{})
schemas.AddType("backupTarget", BackupTarget{})
schemas.AddType("backup", Backup{})
schemas.AddType("backupInput", BackupInput{})
schemas.AddType("backupStatus", BackupStatus{})
schemas.AddType("syncBackupResource", SyncBackupResource{})
schemas.AddType("orphan", Orphan{})
schemas.AddType("restoreStatus", RestoreStatus{})
schemas.AddType("purgeStatus", PurgeStatus{})
Expand Down Expand Up @@ -644,6 +662,7 @@ func NewSchema() *client.Schemas {
volumeSchema(schemas.AddType("volume", Volume{}))
snapshotSchema(schemas.AddType("snapshot", Snapshot{}))
snapshotCRSchema(schemas.AddType("snapshotCR", SnapshotCR{}))
backupTargetSchema(schemas.AddType("backupTarget", BackupTarget{}))
backupVolumeSchema(schemas.AddType("backupVolume", BackupVolume{}))
backupBackingImageSchema(schemas.AddType("backupBackingImage", BackupBackingImage{}))
settingSchema(schemas.AddType("setting", Setting{}))
Expand All @@ -654,6 +673,8 @@ func NewSchema() *client.Schemas {
diskSchema(schemas.AddType("diskUpdateInput", DiskUpdateInput{}))
diskInfoSchema(schemas.AddType("diskInfo", DiskInfo{}))
kubernetesStatusSchema(schemas.AddType("kubernetesStatus", longhorn.KubernetesStatus{}))
backupTargetListOutputSchema(schemas.AddType("backupTargetListOutput", BackupTargetListOutput{}))
backupVolumeListOutputSchema(schemas.AddType("backupVolumeListOutput", BackupVolumeListOutput{}))
backupListOutputSchema(schemas.AddType("backupListOutput", BackupListOutput{}))
snapshotListOutputSchema(schemas.AddType("snapshotListOutput", SnapshotListOutput{}))
systemBackupSchema(schemas.AddType("systemBackup", SystemBackup{}))
Expand Down Expand Up @@ -814,9 +835,21 @@ func kubernetesStatusSchema(status *client.Schema) {
status.ResourceFields["workloadsStatus"] = workloadsStatus
}

func backupTargetSchema(backupTarget *client.Schema) {
backupTarget.CollectionMethods = []string{"GET"}
backupTarget.ResourceMethods = []string{"GET", "PUT"}

backupTarget.ResourceActions = map[string]client.Action{
"backupTargetSync": {
Input: "syncBackupResource",
Output: "backupTargetListOutput",
},
}
}

func backupVolumeSchema(backupVolume *client.Schema) {
backupVolume.CollectionMethods = []string{"GET"}
backupVolume.ResourceMethods = []string{"GET", "DELETE"}
backupVolume.ResourceMethods = []string{"GET", "PUT", "DELETE"}
backupVolume.ResourceActions = map[string]client.Action{
"backupList": {
Output: "backupListOutput",
Expand All @@ -829,6 +862,10 @@ func backupVolumeSchema(backupVolume *client.Schema) {
Input: "backupInput",
Output: "backupVolume",
},
"backupVolumeSync": {
Input: "syncBackupResource",
Output: "backupVolumeListOutput",
},
}
}

Expand Down Expand Up @@ -1186,6 +1223,18 @@ func snapshotCRSchema(snapshotCR *client.Schema) {
snapshotCR.ResourceFields["children"] = children
}

func backupTargetListOutputSchema(backupTargetList *client.Schema) {
data := backupTargetList.ResourceFields["data"]
data.Type = "array[backupTarget]"
backupTargetList.ResourceFields["data"] = data
}

func backupVolumeListOutputSchema(backupVolumeList *client.Schema) {
data := backupVolumeList.ResourceFields["data"]
data.Type = "array[backupVolume]"
backupVolumeList.ResourceFields["data"] = data
}

func backupListOutputSchema(backupList *client.Schema) {
data := backupList.ResourceFields["data"]
data.Type = "array[backup]"
Expand Down Expand Up @@ -1699,7 +1748,7 @@ func toVolumeRecurringJobCollection(recurringJobs map[string]*longhorn.VolumeRec
return &client.GenericCollection{Data: data, Collection: client.Collection{ResourceType: "volumeRecurringJob"}}
}

func toBackupTargetResource(bt *longhorn.BackupTarget) *BackupTarget {
func toBackupTargetResource(bt *longhorn.BackupTarget, apiContext *api.ApiContext) *BackupTarget {
if bt == nil {
return nil
}
Expand All @@ -1711,13 +1760,18 @@ func toBackupTargetResource(bt *longhorn.BackupTarget) *BackupTarget {
Links: map[string]string{},
},
BackupTarget: engineapi.BackupTarget{
Name: bt.Name,
BackupTargetURL: bt.Spec.BackupTargetURL,
CredentialSecret: bt.Spec.CredentialSecret,
PollInterval: bt.Spec.PollInterval.Duration.String(),
Available: bt.Status.Available,
Message: types.GetCondition(bt.Status.Conditions, longhorn.BackupTargetConditionTypeUnavailable).Message,
},
}
res.Actions = map[string]string{
"backupTargetSync": apiContext.UrlBuilder.ActionLink(res.Resource, "backupTargetSync"),
}

return res
}

Expand All @@ -1744,17 +1798,18 @@ func toBackupVolumeResource(bv *longhorn.BackupVolume, apiContext *api.ApiContex
StorageClassName: bv.Status.StorageClassName,
}
b.Actions = map[string]string{
"backupList": apiContext.UrlBuilder.ActionLink(b.Resource, "backupList"),
"backupGet": apiContext.UrlBuilder.ActionLink(b.Resource, "backupGet"),
"backupDelete": apiContext.UrlBuilder.ActionLink(b.Resource, "backupDelete"),
"backupList": apiContext.UrlBuilder.ActionLink(b.Resource, "backupList"),
"backupGet": apiContext.UrlBuilder.ActionLink(b.Resource, "backupGet"),
"backupDelete": apiContext.UrlBuilder.ActionLink(b.Resource, "backupDelete"),
"backupVolumeSync": apiContext.UrlBuilder.ActionLink(b.Resource, "backupVolumeSync"),
}
return b
}

func toBackupTargetCollection(bts []*longhorn.BackupTarget) *client.GenericCollection {
func toBackupTargetCollection(bts []*longhorn.BackupTarget, apiContext *api.ApiContext) *client.GenericCollection {
data := []interface{}{}
for _, bt := range bts {
data = append(data, toBackupTargetResource(bt))
data = append(data, toBackupTargetResource(bt, apiContext))
}
return &client.GenericCollection{Data: data, Collection: client.Collection{ResourceType: "backupTarget"}}
}
Expand Down
15 changes: 12 additions & 3 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,22 @@ func NewRouter(s *Server) *mux.Router {
}

r.Methods("GET").Path("/v1/backuptargets").Handler(f(schemas, s.BackupTargetList))
r.Methods("PUT").Path("/v1/backuptargets").Handler(f(schemas, s.BackupTargetSyncAll))
backupTargetActions := map[string]func(http.ResponseWriter, *http.Request) error{
"backupTargetSync": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupTargetSync),
}
for name, action := range backupTargetActions {
r.Methods("POST").Path("/v1/backuptargets/{backupTargetName}").Queries("action", name).Handler(f(schemas, action))
}
r.Methods("GET").Path("/v1/backupvolumes").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeList)))
r.Methods("PUT").Path("/v1/backupvolumes").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeSyncAll)))
r.Methods("GET").Path("/v1/backupvolumes/{volName}").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeGet)))
r.Methods("DELETE").Path("/v1/backupvolumes/{volName}").Handler(f(schemas, s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupVolumeDelete)))
backupActions := map[string]func(http.ResponseWriter, *http.Request) error{
"backupList": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupList),
"backupGet": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupGet),
"backupDelete": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupDelete),
"backupList": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupList),
"backupGet": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupGet),
"backupDelete": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.BackupDelete),
"backupVolumeSync": s.fwd.Handler(s.fwd.HandleProxyRequestByNodeID, s.fwd.GetHTTPAddressByNodeID(NodeHasDefaultEngineImage(s.m)), s.SyncBackupVolume),
}
for name, action := range backupActions {
r.Methods("POST").Path("/v1/backupvolumes/{volName}").Queries("action", name).Handler(f(schemas, action))
Expand Down
1 change: 1 addition & 0 deletions engineapi/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ type Volume struct {
}

type BackupTarget struct {
Name string `json:"name"`
BackupTargetURL string `json:"backupTargetURL"`
CredentialSecret string `json:"credentialSecret"`
PollInterval string `json:"pollInterval"`
Expand Down
25 changes: 25 additions & 0 deletions manager/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (

const (
BackupStatusQueryInterval = 2 * time.Second

// minSyncBackupTargetIntervalSec interval in seconds to synchronize backup target to avoid frequent sync
minSyncBackupTargetIntervalSec = 10
)

func (m *VolumeManager) ListSnapshotInfos(volumeName string) (map[string]*longhorn.SnapshotInfo, error) {
Expand Down Expand Up @@ -328,6 +331,19 @@ func (m *VolumeManager) ListBackupTargetsSorted() ([]*longhorn.BackupTarget, err
return backupTargets, nil
}

func (m *VolumeManager) GetBackupTarget(backupTargetName string) (*longhorn.BackupTarget, error) {
return m.ds.GetBackupTarget(backupTargetName)
}

func (m *VolumeManager) SyncBackupTarget(backupTarget *longhorn.BackupTarget) (*longhorn.BackupTarget, error) {
now := metav1.Time{Time: time.Now().UTC()}
if now.Sub(backupTarget.Spec.SyncRequestedAt.Time).Seconds() < minSyncBackupTargetIntervalSec {
return nil, errors.Errorf("cannot synchronize backup target '%v' in %v seconds", backupTarget.Name, minSyncBackupTargetIntervalSec)
}
backupTarget.Spec.SyncRequestedAt = metav1.Time{Time: time.Now().UTC()}
return m.ds.UpdateBackupTarget(backupTarget)
}

func (m *VolumeManager) ListBackupVolumes() (map[string]*longhorn.BackupVolume, error) {
return m.ds.ListBackupVolumes()
}
Expand Down Expand Up @@ -363,6 +379,15 @@ func (m *VolumeManager) GetBackupVolume(volumeName string) (*longhorn.BackupVolu
return backupVolume, err
}

func (m *VolumeManager) SyncBackupVolume(backupVolume *longhorn.BackupVolume) (*longhorn.BackupVolume, error) {
now := metav1.Time{Time: time.Now().UTC()}
if now.Sub(backupVolume.Spec.SyncRequestedAt.Time).Seconds() < minSyncBackupTargetIntervalSec {
return nil, errors.Errorf("failed to synchronize backup volume '%v' in %v seconds", backupVolume.Name, minSyncBackupTargetIntervalSec)
}
backupVolume.Spec.SyncRequestedAt = metav1.Time{Time: time.Now().UTC()}
return m.ds.UpdateBackupVolume(backupVolume)
}

func (m *VolumeManager) DeleteBackupVolume(volumeName string) error {
return m.ds.DeleteBackupVolume(volumeName)
}
Expand Down

0 comments on commit e1a2b4c

Please sign in to comment.