diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index e1c88806d0e3..58e18a07a9ac 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -576,6 +576,7 @@ func (e *ETCD) Register(handler http.Handler) (http.Handler, error) { e.config.Runtime.LeaderElectedClusterControllerStarts[version.Program+"-etcd"] = func(ctx context.Context) { registerEndpointsHandlers(ctx, e) registerMemberHandlers(ctx, e) + registerSnapshotHandlers(ctx, e) } } diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index 0721b22aed27..084cb915a9a6 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -143,6 +143,7 @@ func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.Conf }, Compressed: strings.HasSuffix(snapshot, compressedExtension), metadataSource: extraMetadata, + nodeSource: s.nodeName, } uploadInfo, err := s.uploadSnapshot(ctx, snapshotKey, snapshot) @@ -394,6 +395,7 @@ func (s *S3) listSnapshots(ctx context.Context) (map[string]snapshotFile, error) }, Status: successfulSnapshotStatus, Compressed: compressed, + nodeSource: obj.UserMetadata[nodeNameKey], } sfKey := generateSnapshotName(sf) snapshots[sfKey] = sf diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index 888a5d6398fb..52d432e2a107 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -10,6 +10,7 @@ import ( "math/rand" "net/http" "os" + "path" "path/filepath" "runtime" "sort" @@ -17,7 +18,9 @@ import ( "strings" "time" + apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" "github.com/k3s-io/k3s/pkg/daemons/config" + "github.com/k3s-io/k3s/pkg/util" "github.com/k3s-io/k3s/pkg/version" "github.com/minio/minio-go/v7" "github.com/pkg/errors" @@ -30,22 +33,29 @@ import ( "go.uber.org/zap" "golang.org/x/sync/semaphore" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" + "k8s.io/utils/pointer" ) const ( maxConcurrentSnapshots = 1 - pruneStepSize = 5 compressedExtension = ".zip" metadataDir = ".metadata" + errorTTL = 24 * time.Hour ) var ( snapshotExtraMetadataConfigMapName = version.Program + "-etcd-snapshot-extra-metadata" - snapshotConfigMapName = version.Program + "-etcd-snapshots" + labelStorageNode = "etcd." + version.Program + ".cattle.io/snapshot-storage-node" + annotationLocalReconciled = "etcd." + version.Program + ".cattle.io/local-snapshots-timestamp" + annotationS3Reconciled = "etcd." + version.Program + ".cattle.io/s3-snapshots-timestamp" // snapshotDataBackoff will retry at increasing steps for up to ~30 seconds. // If the ConfigMap update fails, the list won't be reconciled again until next time @@ -171,7 +181,7 @@ func (e *ETCD) decompressSnapshot(snapshotDir, snapshotFile string) (string, err defer ss.Close() if _, err := io.Copy(decompressed, ss); err != nil { - os.Remove("") + os.Remove(decompressed.Name()) return "", err } } @@ -271,7 +281,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } logrus.Errorf("Failed to take etcd snapshot: %v", err) if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to save local snapshot failure data to configmap") + return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } } @@ -311,7 +321,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to save local snapshot data to configmap") + return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil { @@ -353,7 +363,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } } if err := e.addSnapshotData(*sf); err != nil { - return errors.Wrap(err, "failed to save snapshot data to configmap") + return errors.Wrap(err, "failed to sync ETCDSnapshotFile") } if err := e.s3.snapshotRetention(ctx); err != nil { logrus.Errorf("Failed to apply s3 snapshot retention policy: %v", err) @@ -398,7 +408,10 @@ type snapshotFile struct { S3 *s3Config `json:"s3Config,omitempty"` Compressed bool `json:"compressed"` + // these fields are used for the internal representation of the snapshot + // to populate other fields before serialization to the legacy configmap. metadataSource *v1.ConfigMap `json:"-"` + nodeSource string `json:"-"` } // listLocalSnapshots provides a list of the currently stored @@ -542,7 +555,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { } if e.config.EtcdS3 { - if err := e.s3.deleteSnapshot(s); err != nil { + if err := e.s3.deleteSnapshot(ctx, s); err != nil { if isNotExist(err) { logrus.Infof("Snapshot %s not found in S3", s) } else { @@ -583,63 +596,67 @@ func marshalSnapshotFile(sf snapshotFile) ([]byte, error) { return json.Marshal(sf) } -// AddSnapshotData adds the given snapshot file information to the snapshot configmap, using the existing extra metadata -// available at the time. This is primarily necessary to record failures, as successful snapshots will have a file on disk -// or S3 that will be found when reconciling. +// addSnapshotData syncs an internal snapshotFile representation to an ETCDSnapshotFile resource +// of the same name. Resources will be created or updated as necessary. func (e *ETCD) addSnapshotData(sf snapshotFile) error { - // make sure the core.Factory is initialized. There can - // be a race between this core code startup. - for e.config.Runtime.Core == nil { + // make sure the K3s factory is initialized. + for e.config.Runtime.K3s == nil { runtime.Gosched() } + snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() sfKey := generateSnapshotName(sf) - marshalledSnapshotFile, err := marshalSnapshotFile(sf) - if err != nil { - return err - } - pruneCount := pruneStepSize - var lastErr error + var esf *apisv1.ETCDSnapshotFile return retry.OnError(snapshotDataBackoff, func(err error) bool { - return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) || isTooLargeError(err) - }, func() error { - snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) - - if apierrors.IsNotFound(getErr) { - cm := v1.ConfigMap{ + return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) + }, func() (err error) { + // Get current object or create new one + esf, err = snapshots.Get(sfKey, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return err + } + esf = &apisv1.ETCDSnapshotFile{ ObjectMeta: metav1.ObjectMeta{ - Name: snapshotConfigMapName, - Namespace: metav1.NamespaceSystem, + Name: sfKey, }, - Data: map[string]string{sfKey: string(marshalledSnapshotFile)}, } - _, err := e.config.Runtime.Core.Core().V1().ConfigMap().Create(&cm) - return err } - if snapshotConfigMap.Data == nil { - snapshotConfigMap.Data = make(map[string]string) - } + // mutate object + existing := esf.DeepCopyObject() + sf.toETCDSnapshotFile(esf) - // If the configmap update was rejected due to size, drop the oldest entries from the map. - // We will continue to remove an increasing number of old snapshots from the map until the request succeeds, - // or the number we would attempt to remove exceeds the number stored. - if isTooLargeError(lastErr) { - logrus.Warnf("Snapshot configmap is too large, attempting to elide %d oldest snapshots from list", pruneCount) - if err := pruneConfigMap(snapshotConfigMap, pruneCount); err != nil { - return err + // create or update as necessary + if esf.CreationTimestamp.IsZero() { + _, err = snapshots.Create(esf) + if err == nil { + // Only emit an event for the snapshot when creating the resource + e.emitEvent(esf) } - pruneCount += pruneStepSize + } else if !equality.Semantic.DeepEqual(existing, esf) { + _, err = snapshots.Update(esf) } - - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshotFile) - - _, lastErr = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) - return lastErr + return err }) } +func (e *ETCD) emitEvent(esf *apisv1.ETCDSnapshotFile) { + if e.config.Runtime.Event == nil { + return + } + if esf.Status.Error != nil { + message := "Snapshot save failed on " + esf.Spec.NodeName + if esf.Status.Error.Message != nil { + message += ": " + *esf.Status.Error.Message + } + e.config.Runtime.Event.Event(esf, v1.EventTypeWarning, "ETCDSnapshotFailed", message) + } else { + e.config.Runtime.Event.Event(esf, v1.EventTypeNormal, "ETCDSnapshotCreated", "Snapshot saved on "+esf.Spec.NodeName) + } +} + // generateSnapshotName generates a derived name for the snapshot that is safe for use // as a resource name or configmap key. func generateSnapshotName(sf snapshotFile) string { @@ -651,33 +668,7 @@ func generateSnapshotName(sf snapshotFile) string { return name.SafeConcatName(nodeName, snapshotName) } -// pruneConfigMap drops the oldest entries from the configMap. -// Note that the actual snapshot files are not removed, just the entries that track them in the configmap. -func pruneConfigMap(snapshotConfigMap *v1.ConfigMap, pruneCount int) error { - if pruneCount > len(snapshotConfigMap.Data) { - return errors.New("unable to reduce snapshot ConfigMap size by eliding old snapshots") - } - - var snapshotFiles []snapshotFile - retention := len(snapshotConfigMap.Data) - pruneCount - for name := range snapshotConfigMap.Data { - basename, compressed := strings.CutSuffix(name, compressedExtension) - ts, _ := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) - snapshotFiles = append(snapshotFiles, snapshotFile{Name: name, CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed}) - } - - // sort newest-first so we can prune entries past the retention count - sort.Slice(snapshotFiles, func(i, j int) bool { - return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt) - }) - - for _, snapshotFile := range snapshotFiles[retention:] { - delete(snapshotConfigMap.Data, snapshotFile.Name) - } - return nil -} - -// ReconcileSnapshotData reconciles snapshot data in the snapshot ConfigMap. +// ReconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources. // It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots // and reconcile snapshots from S3. func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { @@ -687,167 +678,159 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { runtime.Gosched() } - logrus.Infof("Reconciling etcd snapshot data in %s ConfigMap", snapshotConfigMapName) - defer logrus.Infof("Reconciliation of snapshot data in %s ConfigMap complete", snapshotConfigMapName) + logrus.Infof("Reconciling ETCDSnapshotFile resources") + defer logrus.Infof("Reconciliation of ETCDSnapshotFile resources complete") - pruneCount := pruneStepSize - var lastErr error - return retry.OnError(retry.DefaultBackoff, func(err error) bool { - return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) || isTooLargeError(err) - }, func() error { - snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) - if apierrors.IsNotFound(getErr) { - cm := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: snapshotConfigMapName, - Namespace: metav1.NamespaceSystem, - }, - } - cm, err := e.config.Runtime.Core.Core().V1().ConfigMap().Create(cm) - if err != nil { - return err - } - snapshotConfigMap = cm - } + // Get snapshots from local filesystem + snapshotFiles, err := e.listLocalSnapshots() + if err != nil { + return err + } - logrus.Debugf("Attempting to reconcile etcd snapshot data for configmap generation %d", snapshotConfigMap.Generation) - if snapshotConfigMap.Data == nil { - snapshotConfigMap.Data = map[string]string{} - } + nodeNames := []string{os.Getenv("NODE_NAME")} - snapshotFiles, err := e.listLocalSnapshots() - if err != nil { + // Get snapshots from S3 + if e.config.EtcdS3 { + if err := e.initS3IfNil(ctx); err != nil { return err } - // s3ListSuccessful is set to true if we are successful at listing snapshots from S3 to eliminate accidental - // clobbering of S3 snapshots in the configmap due to misconfigured S3 credentials/details - var s3ListSuccessful bool - - if e.config.EtcdS3 { - if err := e.initS3IfNil(ctx); err != nil { - logrus.Warnf("Unable to initialize S3 client: %v", err) - return err + if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil { + logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err) + } else { + for k, v := range s3Snapshots { + snapshotFiles[k] = v } + nodeNames = append(nodeNames, "s3") + } + } - if s3Snapshots, err := e.s3.listSnapshots(ctx); err != nil { - logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err) - } else { - for k, v := range s3Snapshots { - snapshotFiles[k] = v + // Try to load metadata from the legacy configmap, in case any local or s3 snapshots + // were created by an old release that does not write the metadata alongside the snapshot file. + snapshotConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + + if snapshotConfigMap != nil { + for sfKey, sf := range snapshotFiles { + // if the configmap has data for this snapshot, and local metadata is empty, + // deserialize the value from the configmap and attempt to load it. + if cmSnapshotValue := snapshotConfigMap.Data[sfKey]; cmSnapshotValue != "" && sf.Metadata == "" && sf.metadataSource == nil { + if err := json.Unmarshal([]byte(cmSnapshotValue), &sf); err != nil { + logrus.Warnf("Failed to unmarshal configmap data for snapshot %s: %v", sfKey, err) + continue } - s3ListSuccessful = true + snapshotFiles[sfKey] = sf } } + } - nodeName := os.Getenv("NODE_NAME") + labelSelector := &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{{ + Key: labelStorageNode, + Operator: metav1.LabelSelectorOpIn, + Values: nodeNames, + }}, + } - // deletedSnapshots is a map[string]string where key is the configmap key and the value is the marshalled snapshot file - // it will be populated below with snapshots that are either from S3 or on the local node. Notably, deletedSnapshots will - // not contain snapshots that are in the "failed" status - deletedSnapshots := make(map[string]string) - // failedSnapshots is a slice of unmarshaled snapshot files sourced from the configmap - // These are stored unmarshaled so we can sort based on name. - var failedSnapshots []snapshotFile - var failedS3Snapshots []snapshotFile + selector, err := metav1.LabelSelectorAsSelector(labelSelector) + if err != nil { + return err + } - // remove entries for this node and s3 (if S3 is enabled) only - for k, v := range snapshotConfigMap.Data { - var sf snapshotFile - if err := json.Unmarshal([]byte(v), &sf); err != nil { - return err + // List all snapshots matching the selector + snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() + esfList, err := snapshots.List(metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return err + } + + // If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync. + // If a snapshot from Kubernetes was not found on disk/s3, is is gone and can be removed from Kubernetes. + // The one exception to the last rule is failed snapshots - these must be retained for a period of time. + for _, esf := range esfList.Items { + if _, ok := snapshotFiles[esf.Name]; ok { + // exists in both, don't need to sync + delete(snapshotFiles, esf.Name) + } else { + // doesn't exist on disk - if it's an error that hasn't expired yet, leave it, otherwise remove it + if esf.Status.Error != nil && esf.Status.Error.Time != nil { + expires := esf.Status.Error.Time.Add(errorTTL) + if time.Now().Before(expires) { + continue + } } - if (sf.NodeName == nodeName || (sf.NodeName == "s3" && s3ListSuccessful)) && sf.Status != failedSnapshotStatus { - // Only delete the snapshot if the snapshot was not failed - // sf.Status != FailedSnapshotStatus is intentional, as it is possible we are reconciling snapshots stored from older versions that did not set status - deletedSnapshots[generateSnapshotName(sf)] = v // store a copy of the snapshot - delete(snapshotConfigMap.Data, k) - } else if sf.Status == failedSnapshotStatus && sf.NodeName == nodeName && e.config.EtcdSnapshotRetention >= 1 { - // Handle locally failed snapshots. - failedSnapshots = append(failedSnapshots, sf) - delete(snapshotConfigMap.Data, k) - } else if sf.Status == failedSnapshotStatus && e.config.EtcdS3 && sf.NodeName == "s3" && strings.HasPrefix(sf.Name, e.config.EtcdSnapshotName+"-"+nodeName) && e.config.EtcdSnapshotRetention >= 1 { - // If we're operating against S3, we can clean up failed S3 snapshots that failed on this node. - failedS3Snapshots = append(failedS3Snapshots, sf) - delete(snapshotConfigMap.Data, k) + if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil { + logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err) } } + } - // Apply the failed snapshot retention policy to locally failed snapshots - if len(failedSnapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 { - // sort newest-first so we can record only the retention count - sort.Slice(failedSnapshots, func(i, j int) bool { - return failedSnapshots[j].CreatedAt.Before(failedSnapshots[i].CreatedAt) - }) - - for _, dfs := range failedSnapshots[:e.config.EtcdSnapshotRetention] { - sfKey := generateSnapshotName(dfs) - marshalledSnapshot, err := marshalSnapshotFile(dfs) - if err != nil { - logrus.Errorf("Failed to marshal snapshot to store in configmap %v", err) - } else { - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) - } - } + // Any snapshots remaining in the map from disk/s3 were not found in Kubernetes and need to be created + for _, sf := range snapshotFiles { + if err := e.addSnapshotData(sf); err != nil { + logrus.Errorf("Failed to create ETCDSnapshotFile: %v", err) } + } - // Apply the failed snapshot retention policy to the S3 snapshots - if len(failedS3Snapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 { - // sort newest-first so we can record only the retention count - sort.Slice(failedS3Snapshots, func(i, j int) bool { - return failedS3Snapshots[j].CreatedAt.Before(failedS3Snapshots[i].CreatedAt) - }) + // List all snapshots in Kubernetes not stored on S3 or a current etcd node. + // These snapshots are local to a node that no longer runs etcd and cannot be restored. + // If the node rejoins later and has local snapshots, it will reconcile them itself. + labelSelector.MatchExpressions[0].Operator = metav1.LabelSelectorOpNotIn + labelSelector.MatchExpressions[0].Values = []string{"s3"} - for _, dfs := range failedS3Snapshots[:e.config.EtcdSnapshotRetention] { - sfKey := generateSnapshotName(dfs) - marshalledSnapshot, err := marshalSnapshotFile(dfs) - if err != nil { - logrus.Errorf("Failed to marshal snapshot to store in configmap %v", err) - } else { - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) - } - } - } + // Get a list of all etcd nodes currently in the cluster and add them to the selector + nodes := e.config.Runtime.Core.Core().V1().Node() + etcdSelector := labels.Set{util.ETCDRoleLabelKey: "true"} + nodeList, err := nodes.List(metav1.ListOptions{LabelSelector: etcdSelector.String()}) + if err != nil { + return err + } - // save the local entries to the ConfigMap if they are still on disk or in S3. - for _, snapshot := range snapshotFiles { - var sf snapshotFile - sfKey := generateSnapshotName(snapshot) - if v, ok := deletedSnapshots[sfKey]; ok { - // use the snapshot file we have from the existing configmap, and unmarshal it so we can manipulate it - if err := json.Unmarshal([]byte(v), &sf); err != nil { - logrus.Errorf("Error unmarshaling snapshot file: %v", err) - // use the snapshot with info we sourced from disk/S3 (will be missing metadata, but something is better than nothing) - sf = snapshot - } - } else { - sf = snapshot - } + for _, node := range nodeList.Items { + labelSelector.MatchExpressions[0].Values = append(labelSelector.MatchExpressions[0].Values, node.Name) + } - sf.Status = successfulSnapshotStatus // if the snapshot is on disk or in S3, it was successful. - marshalledSnapshot, err := marshalSnapshotFile(sf) - if err != nil { - logrus.Warnf("Failed to marshal snapshot metadata %s to store in configmap, received error: %v", sf.Name, err) - } else { - snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) - } - } + selector, err = metav1.LabelSelectorAsSelector(labelSelector) + if err != nil { + return err + } - // If the configmap update was rejected due to size, drop the oldest entries from the map. - // We will continue to remove an increasing number of old snapshots from the map until the request succeeds, - // or the number we would attempt to remove exceeds the number stored. - if isTooLargeError(lastErr) { - logrus.Warnf("Snapshot configmap is too large, attempting to elide %d oldest snapshots from list", pruneCount) - if err := pruneConfigMap(snapshotConfigMap, pruneCount); err != nil { - return err - } - pruneCount += pruneStepSize + // List and remove all snapshots stored on nodes that do not match the selector + esfList, err = snapshots.List(metav1.ListOptions{LabelSelector: selector.String()}) + if err != nil { + return err + } + + for _, esf := range esfList.Items { + if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil { + logrus.Errorf("Failed to delete ETCDSnapshotFile for non-etcd node %s: %v", esf.Spec.NodeName, err) } + } - logrus.Debugf("Updating snapshot ConfigMap (%s) with %d entries", snapshotConfigMapName, len(snapshotConfigMap.Data)) - _, lastErr = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) - return lastErr - }) + // Update our Node object to note the timestamp of the snapshot storages that have been reconciled + now := time.Now().Round(time.Second).Format(time.RFC3339) + patch := []map[string]string{ + { + "op": "add", + "value": now, + "path": "/metadata/annotations/" + strings.ReplaceAll(annotationLocalReconciled, "/", "~1"), + }, + } + if e.config.EtcdS3 { + patch = append(patch, map[string]string{ + "op": "add", + "value": now, + "path": "/metadata/annotations/" + strings.ReplaceAll(annotationS3Reconciled, "/", "~1"), + }) + } + b, err := json.Marshal(patch) + if err != nil { + return err + } + _, err = nodes.Patch(nodeNames[0], types.JSONPatchType, b) + return err } // setSnapshotFunction schedules snapshots at the configured interval. @@ -871,7 +854,7 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) return nil } - logrus.Infof("Applying local snapshot retention policy: retention: %d, snapshotPrefix: %s, directory: %s", retention, snapshotPrefix, snapshotDir) + logrus.Infof("Applying snapshot retention=%d to local snapshots with prefix %s in %s", retention, snapshotPrefix, snapshotDir) var snapshotFiles []snapshotFile if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error { @@ -914,11 +897,6 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) return nil } -func isTooLargeError(err error) bool { - // There are no helpers for unpacking field validation errors, so we just check for "Too long" in the error string. - return apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long")) -} - func isNotExist(err error) bool { if resp := minio.ToErrorResponse(err); resp.StatusCode == http.StatusNotFound || os.IsNotExist(err) { return true @@ -946,3 +924,116 @@ func saveSnapshotMetadata(snapshotPath string, extraMetadata *v1.ConfigMap) erro } return os.WriteFile(metadataPath, m, 0700) } + +func (sf *snapshotFile) fromETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { + if esf == nil { + panic("cannot convert from nil ETCDSnapshotFile") + } + if esf.Status.ReadyToUse != nil && *esf.Status.ReadyToUse { + sf.Status = successfulSnapshotStatus + } else { + sf.Status = failedSnapshotStatus + } + if esf.Status.Size != nil { + sf.Size = esf.Status.Size.Value() + } + sf.Name = path.Base(esf.Spec.Location) + sf.Location = esf.Spec.Location + sf.CreatedAt = esf.Status.CreationTime + sf.nodeSource = esf.Spec.NodeName + + if esf.Status.Error != nil { + if esf.Status.Error.Time != nil { + sf.CreatedAt = esf.Status.Error.Time + } + message := "etcd snapshot failed" + if esf.Status.Error.Message != nil { + message = *esf.Status.Error.Message + } + sf.Message = base64.StdEncoding.EncodeToString([]byte(message)) + } + + if len(esf.Spec.Metadata) > 0 { + if b, err := json.Marshal(esf.Spec.Metadata); err != nil { + logrus.Warnf("Failed to marshal metadata for %s: %v", esf.Name, err) + } else { + sf.Metadata = base64.StdEncoding.EncodeToString(b) + } + } + + if esf.Spec.S3 == nil { + sf.NodeName = esf.Spec.NodeName + } else { + sf.NodeName = "s3" + sf.S3 = &s3Config{ + Endpoint: esf.Spec.S3.Endpoint, + EndpointCA: esf.Spec.S3.EndpointCA, + SkipSSLVerify: esf.Spec.S3.SkipSSLVerify, + Bucket: esf.Spec.S3.Bucket, + Region: esf.Spec.S3.Region, + Folder: esf.Spec.S3.Prefix, + Insecure: esf.Spec.S3.Insecure, + } + } +} + +func (sf *snapshotFile) toETCDSnapshotFile(esf *apisv1.ETCDSnapshotFile) { + if esf == nil { + panic("cannot convert to nil ETCDSnapshotFile") + } + esf.Status.ReadyToUse = pointer.Bool(sf.Status == successfulSnapshotStatus) + esf.Status.Size = resource.NewQuantity(sf.Size, resource.DecimalSI) + esf.Spec.Location = sf.Location + if sf.nodeSource != "" { + esf.Spec.NodeName = sf.nodeSource + } else { + esf.Spec.NodeName = sf.NodeName + } + + if sf.Message == "" { + esf.Status.CreationTime = sf.CreatedAt + } else { + message, err := base64.StdEncoding.DecodeString(sf.Message) + if err != nil { + logrus.Warnf("Failed to decode error message for %s: %v", esf.Name, err) + } else { + esf.Status.Error = &apisv1.ETCDSnapshotError{ + Time: sf.CreatedAt, + Message: pointer.String(string(message)), + } + } + } + + if sf.metadataSource != nil { + esf.Spec.Metadata = sf.metadataSource.Data + } else if sf.Metadata != "" { + metadata, err := base64.StdEncoding.DecodeString(sf.Metadata) + if err != nil { + logrus.Warnf("Failed to decode metadata for %s: %v", esf.Name, err) + } else { + if err := json.Unmarshal(metadata, &esf.Spec.Metadata); err != nil { + logrus.Warnf("Failed to unmarshal metadata for %s: %v", esf.Name, err) + } + } + } + + if esf.ObjectMeta.Labels == nil { + esf.ObjectMeta.Labels = map[string]string{} + } + + if sf.S3 == nil { + esf.ObjectMeta.Labels[labelStorageNode] = esf.Spec.NodeName + } else { + esf.ObjectMeta.Labels[labelStorageNode] = "s3" + esf.Spec.S3 = &apisv1.ETCDSnapshotS3{ + Endpoint: sf.S3.Endpoint, + EndpointCA: sf.S3.EndpointCA, + SkipSSLVerify: sf.S3.SkipSSLVerify, + Bucket: sf.S3.Bucket, + Region: sf.S3.Region, + Prefix: sf.S3.Folder, + Insecure: sf.S3.Insecure, + } + } + +} diff --git a/pkg/etcd/snapshot_controller.go b/pkg/etcd/snapshot_controller.go new file mode 100644 index 000000000000..972231b6305c --- /dev/null +++ b/pkg/etcd/snapshot_controller.go @@ -0,0 +1,306 @@ +package etcd + +import ( + "context" + "sort" + "strconv" + "strings" + "time" + + apisv1 "github.com/k3s-io/k3s/pkg/apis/k3s.cattle.io/v1" + controllersv1 "github.com/k3s-io/k3s/pkg/generated/controllers/k3s.cattle.io/v1" + "github.com/k3s-io/k3s/pkg/util" + "github.com/k3s-io/k3s/pkg/version" + "github.com/pkg/errors" + controllerv1 "github.com/rancher/wrangler/v2/pkg/generated/controllers/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" + + "github.com/sirupsen/logrus" +) + +const ( + pruneStepSize = 4 + reconcileKey = "_reconcile_" + reconcileInterval = 600 * time.Minute +) + +var ( + snapshotConfigMapName = version.Program + "-etcd-snapshots" +) + +type etcdSnapshotHandler struct { + ctx context.Context + etcd *ETCD + snapshots controllersv1.ETCDSnapshotFileController + configmaps controllerv1.ConfigMapController +} + +func registerSnapshotHandlers(ctx context.Context, etcd *ETCD) { + snapshots := etcd.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile() + e := &etcdSnapshotHandler{ + ctx: ctx, + etcd: etcd, + snapshots: snapshots, + configmaps: etcd.config.Runtime.Core.Core().V1().ConfigMap(), + } + + logrus.Infof("Starting managed etcd snapshot ConfigMap controller") + snapshots.OnChange(ctx, "managed-etcd-snapshots-controller", e.sync) + snapshots.OnRemove(ctx, "managed-etcd-snapshots-controller", e.onRemove) + go wait.JitterUntil(func() { snapshots.Enqueue(reconcileKey) }, reconcileInterval, 0.04, false, ctx.Done()) +} + +func (e *etcdSnapshotHandler) sync(key string, esf *apisv1.ETCDSnapshotFile) (*apisv1.ETCDSnapshotFile, error) { + if key == reconcileKey { + return nil, e.reconcile() + } + if esf == nil || !esf.DeletionTimestamp.IsZero() { + return nil, nil + } + + sf := snapshotFile{} + sf.fromETCDSnapshotFile(esf) + m, err := marshalSnapshotFile(sf) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal snapshot ConfigMap data") + } + marshalledSnapshot := string(m) + + snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return nil, errors.Wrap(err, "failed to get snapshot ConfigMap") + } + snapshotConfigMap = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: snapshotConfigMapName, + Namespace: metav1.NamespaceSystem, + }, + } + } + + if snapshotConfigMap.Data[key] != marshalledSnapshot { + if snapshotConfigMap.Data == nil { + snapshotConfigMap.Data = map[string]string{} + } + snapshotConfigMap.Data[key] = marshalledSnapshot + + // Try to create or update the ConfigMap. If it is too large, prune old entries + // until it fits, or until it cannot be pruned any further. + pruneCount := pruneStepSize + err = retry.OnError(snapshotDataBackoff, isTooLargeError, func() (err error) { + if snapshotConfigMap.CreationTimestamp.IsZero() { + _, err = e.configmaps.Create(snapshotConfigMap) + } else { + _, err = e.configmaps.Update(snapshotConfigMap) + } + + if isTooLargeError(err) { + logrus.Warnf("Snapshot ConfigMap is too large, attempting to elide %d of %d entries to reduce size", pruneCount, len(snapshotConfigMap.Data)) + if perr := pruneConfigMap(snapshotConfigMap, pruneCount); perr != nil { + err = perr + } + // if the entry we're trying to add just got pruned, give up on adding it, + // as it is always going to get pushed off due to being too old to keep. + if _, ok := snapshotConfigMap.Data[key]; !ok { + logrus.Warnf("Snapshot %s has been elided from ConfigMap to reduce size; not requeuing", key) + return nil + } + + pruneCount += pruneStepSize + } + return err + }) + } + + if err != nil { + err = errors.Wrap(err, "failed to sync snapshot to ConfigMap") + } + + return nil, err +} + +func (e *etcdSnapshotHandler) onRemove(key string, esf *apisv1.ETCDSnapshotFile) (*apisv1.ETCDSnapshotFile, error) { + snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, errors.Wrap(err, "failed to get snapshot ConfigMap") + } + + if _, ok := snapshotConfigMap.Data[key]; ok { + delete(snapshotConfigMap.Data, key) + if _, err := e.configmaps.Update(snapshotConfigMap); err != nil { + return nil, errors.Wrap(err, "failed to remove snapshot from ConfigMap") + } + } + + return nil, nil +} + +func (e *etcdSnapshotHandler) reconcile() error { + logrus.Infof("Reconciling snapshot ConfigMap data") + + snapshotConfigMap, err := e.configmaps.Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return errors.Wrap(err, "failed to get snapshot ConfigMap") + } + snapshotConfigMap = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: snapshotConfigMapName, + Namespace: metav1.NamespaceSystem, + }, + } + } + + // Get a list of all etcd nodes currently in the cluster. + // We will use this list to prune local entries for any node that does not exist. + nodes := e.etcd.config.Runtime.Core.Core().V1().Node() + etcdSelector := labels.Set{util.ETCDRoleLabelKey: "true"} + nodeList, err := nodes.List(metav1.ListOptions{LabelSelector: etcdSelector.String()}) + if err != nil { + return err + } + + // Once a node has set the reconcile annotation, it is considered to have + // migrated to using ETCDSnapshotFile resources, and any old configmap + // entries for it can be pruned. Until the annotation is set, we will leave + // its entries alone. + syncedNodes := map[string]bool{} + for _, node := range nodeList.Items { + if _, ok := node.Annotations[annotationLocalReconciled]; ok { + syncedNodes[node.Name] = true + } + if _, ok := node.Annotations[annotationS3Reconciled]; ok { + syncedNodes["s3"] = true + } + } + + if len(syncedNodes) == 0 { + return errors.New("no nodes have reconciled ETCDSnapshotFile resources") + } + + // Get a list of existing snapshots + snapshotList, err := e.snapshots.List(metav1.ListOptions{}) + if err != nil { + return err + } + + snapshots := map[string]*apisv1.ETCDSnapshotFile{} + for i := range snapshotList.Items { + esf := &snapshotList.Items[i] + if esf.DeletionTimestamp.IsZero() { + snapshots[esf.Name] = esf + } + } + + // Make a copy of the configmap for change detection + existing := snapshotConfigMap.DeepCopyObject() + + // Delete any keys missing from synced storages, or associated with missing nodes + for key := range snapshotConfigMap.Data { + if strings.HasPrefix(key, "s3-") { + // If a node has syncd s3 and the key is missing then delete it + if syncedNodes["s3"] && snapshots[key] == nil { + delete(snapshotConfigMap.Data, key) + } + } else if s, ok := strings.CutPrefix(key, "local-"); ok { + // If a matching node has synced and the key is missing then delete it + // If a matching node does not exist, delete the key + // A node is considered to match the snapshot if the snapshot name matches the node name + // after trimming the leading local- prefix and trailing timestamp and extension. + s, _ = strings.CutSuffix(s, ".zip") + s = strings.TrimRight(s, "-012345678") + var matchingNode bool + for _, node := range nodeList.Items { + if strings.HasSuffix(s, node.Name) { + if syncedNodes[node.Name] && snapshots[key] == nil { + delete(snapshotConfigMap.Data, key) + } + matchingNode = true + break + } + } + if !matchingNode { + delete(snapshotConfigMap.Data, key) + } + } + } + + // Ensure keys for existing snapshots + for sfKey, esf := range snapshots { + sf := snapshotFile{} + sf.fromETCDSnapshotFile(esf) + m, err := marshalSnapshotFile(sf) + if err != nil { + logrus.Warnf("Failed to marshal snapshot ConfigMap data for %s", sfKey) + continue + } + marshalledSnapshot := string(m) + snapshotConfigMap.Data[sfKey] = marshalledSnapshot + } + + // If the configmap didn't change, don't bother updating it + if equality.Semantic.DeepEqual(existing, snapshotConfigMap) { + return nil + } + + // Try to create or update the ConfigMap. If it is too large, prune old entries + // until it fits, or until it cannot be pruned any further. + pruneCount := pruneStepSize + return retry.OnError(snapshotDataBackoff, isTooLargeError, func() (err error) { + if snapshotConfigMap.CreationTimestamp.IsZero() { + _, err = e.configmaps.Create(snapshotConfigMap) + } else { + _, err = e.configmaps.Update(snapshotConfigMap) + } + + if isTooLargeError(err) { + logrus.Warnf("Snapshot ConfigMap is too large, attempting to elide %d of %d entries to reduce size", pruneCount, len(snapshotConfigMap.Data)) + if perr := pruneConfigMap(snapshotConfigMap, pruneCount); perr != nil { + err = perr + } + pruneCount += pruneStepSize + } + return err + }) +} + +// pruneConfigMap drops the oldest entries from the configMap. +// Note that the actual snapshot files are not removed, just the entries that track them in the configmap. +func pruneConfigMap(snapshotConfigMap *v1.ConfigMap, pruneCount int) error { + if pruneCount >= len(snapshotConfigMap.Data) { + return errors.New("unable to reduce snapshot ConfigMap size by eliding old snapshots") + } + + var snapshotFiles []snapshotFile + retention := len(snapshotConfigMap.Data) - pruneCount + for name := range snapshotConfigMap.Data { + basename, compressed := strings.CutSuffix(name, compressedExtension) + ts, _ := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) + snapshotFiles = append(snapshotFiles, snapshotFile{Name: name, CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed}) + } + + // sort newest-first so we can prune entries past the retention count + sort.Slice(snapshotFiles, func(i, j int) bool { + return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt) + }) + + for _, snapshotFile := range snapshotFiles[retention:] { + delete(snapshotConfigMap.Data, snapshotFile.Name) + } + return nil +} + +func isTooLargeError(err error) bool { + // There are no helpers for unpacking field validation errors, so we just check for "Too long" in the error string. + return apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long")) +}