From 0d305bdab9147da1d151c83fb765acd5a9070f93 Mon Sep 17 00:00:00 2001 From: Bryce Palmer Date: Mon, 19 Feb 2024 15:18:41 -0500 Subject: [PATCH] (bugfix): make garbage collection a runnable (#231) and add it to the controller manager. Make it log errors instead of exiting. This prevents crashlooping when there are errors in the garbage collection process. Signed-off-by: everettraven --- cmd/manager/main.go | 46 +++------ .../garbagecollection/garbage_collector.go | 94 +++++++++++++++++++ .../garbage_collector_test.go | 7 +- 3 files changed, 108 insertions(+), 39 deletions(-) create mode 100644 internal/garbagecollection/garbage_collector.go rename cmd/manager/main_test.go => internal/garbagecollection/garbage_collector_test.go (92%) diff --git a/cmd/manager/main.go b/cmd/manager/main.go index d3d0ffc2..e4b8b08b 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -17,7 +17,6 @@ limitations under the License. package main import ( - "context" "flag" "fmt" "net/http" @@ -31,10 +30,8 @@ import ( "k8s.io/client-go/metadata" _ "k8s.io/client-go/plugin/pkg/client/auth" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -42,9 +39,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/metrics" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - "github.com/go-logr/logr" "github.com/spf13/pflag" + "github.com/operator-framework/catalogd/internal/garbagecollection" "github.com/operator-framework/catalogd/internal/source" "github.com/operator-framework/catalogd/internal/third_party/server" "github.com/operator-framework/catalogd/internal/version" @@ -82,6 +79,7 @@ func main() { catalogServerAddr string httpExternalAddr string cacheDir string + gcInterval time.Duration ) flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -94,6 +92,7 @@ func main() { flag.StringVar(&httpExternalAddr, "http-external-address", "http://catalogd-catalogserver.catalogd-system.svc", "The external address at which the http server is reachable.") flag.StringVar(&cacheDir, "cache-dir", "/var/cache/", "The directory in the filesystem that catalogd will use for file based caching") flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit") + flag.DurationVar(&gcInterval, "gc-interval", 12*time.Hour, "interval in which garbage collection should be run against the catalog content cache") opts := zap.Options{ Development: true, } @@ -202,8 +201,14 @@ func main() { } ctx := ctrl.SetupSignalHandler() - if err := unpackStartupGarbageCollection(ctx, filepath.Join(cacheDir, source.UnpackCacheDir), setupLog, metaClient); err != nil { - setupLog.Error(err, "running garbage collection") + gc := &garbagecollection.GarbageCollector{ + CachePath: filepath.Join(cacheDir, source.UnpackCacheDir), + Logger: ctrl.Log.WithName("garbage-collector"), + MetadataClient: metaClient, + Interval: gcInterval, + } + if err := mgr.Add(gc); err != nil { + setupLog.Error(err, "problem adding garbage collector to manager") os.Exit(1) } @@ -221,32 +226,3 @@ func podNamespace() string { } return string(namespace) } - -func unpackStartupGarbageCollection(ctx context.Context, cachePath string, log logr.Logger, metaClient metadata.Interface) error { - getter := metaClient.Resource(v1alpha1.GroupVersion.WithResource("catalogs")) - metaList, err := getter.List(ctx, metav1.ListOptions{}) - if err != nil { - return fmt.Errorf("error listing catalogs: %w", err) - } - - expectedCatalogs := sets.New[string]() - for _, meta := range metaList.Items { - expectedCatalogs.Insert(meta.GetName()) - } - - cacheDirEntries, err := os.ReadDir(cachePath) - if err != nil { - return fmt.Errorf("error reading cache directory: %w", err) - } - for _, cacheDirEntry := range cacheDirEntries { - if cacheDirEntry.IsDir() && expectedCatalogs.Has(cacheDirEntry.Name()) { - continue - } - if err := os.RemoveAll(filepath.Join(cachePath, cacheDirEntry.Name())); err != nil { - return fmt.Errorf("error removing cache directory entry %q: %w ", cacheDirEntry.Name(), err) - } - - log.Info("deleted unexpected cache directory entry", "path", cacheDirEntry.Name(), "isDir", cacheDirEntry.IsDir()) - } - return nil -} diff --git a/internal/garbagecollection/garbage_collector.go b/internal/garbagecollection/garbage_collector.go new file mode 100644 index 00000000..0ebb3abc --- /dev/null +++ b/internal/garbagecollection/garbage_collector.go @@ -0,0 +1,94 @@ +package garbagecollection + +import ( + "context" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/metadata" + "sigs.k8s.io/controller-runtime/pkg/manager" + + "github.com/operator-framework/catalogd/api/core/v1alpha1" +) + +var _ manager.Runnable = (*GarbageCollector)(nil) + +// GarbageCollector is an implementation of the manager.Runnable +// interface for running garbage collection on the Catalog content +// cache that is served by the catalogd HTTP server. It runs in a loop +// and will ensure that no cache entries exist for Catalog resources +// that no longer exist. This should only clean up cache entries that +// were missed by the handling of a DELETE event on a Catalog resource. +type GarbageCollector struct { + CachePath string + Logger logr.Logger + MetadataClient metadata.Interface + Interval time.Duration +} + +// Start will start the garbage collector. It will always run once on startup +// and loop until context is canceled after an initial garbage collection run. +// Garbage collection will run again every X amount of time, where X is the +// supplied garbage collection interval. +func (gc *GarbageCollector) Start(ctx context.Context) error { + // Run once on startup + removed, err := runGarbageCollection(ctx, gc.CachePath, gc.MetadataClient) + if err != nil { + gc.Logger.Error(err, "running garbage collection") + } + if len(removed) > 0 { + gc.Logger.Info("removed stale cache entries", "removed entries", removed) + } + + // Loop until context is canceled, running garbage collection + // at the configured interval + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(gc.Interval): + removed, err := runGarbageCollection(ctx, gc.CachePath, gc.MetadataClient) + if err != nil { + gc.Logger.Error(err, "running garbage collection") + } + if len(removed) > 0 { + gc.Logger.Info("removed stale cache entries", "removed entries", removed) + } + } + } +} + +func runGarbageCollection(ctx context.Context, cachePath string, metaClient metadata.Interface) ([]string, error) { + getter := metaClient.Resource(v1alpha1.GroupVersion.WithResource("catalogs")) + metaList, err := getter.List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("error listing catalogs: %w", err) + } + + expectedCatalogs := sets.New[string]() + for _, meta := range metaList.Items { + expectedCatalogs.Insert(meta.GetName()) + } + + cacheDirEntries, err := os.ReadDir(cachePath) + if err != nil { + return nil, fmt.Errorf("error reading cache directory: %w", err) + } + removed := []string{} + for _, cacheDirEntry := range cacheDirEntries { + if cacheDirEntry.IsDir() && expectedCatalogs.Has(cacheDirEntry.Name()) { + continue + } + if err := os.RemoveAll(filepath.Join(cachePath, cacheDirEntry.Name())); err != nil { + return nil, fmt.Errorf("error removing cache directory entry %q: %w ", cacheDirEntry.Name(), err) + } + + removed = append(removed, cacheDirEntry.Name()) + } + return removed, nil +} diff --git a/cmd/manager/main_test.go b/internal/garbagecollection/garbage_collector_test.go similarity index 92% rename from cmd/manager/main_test.go rename to internal/garbagecollection/garbage_collector_test.go index d1619d19..c2e324ec 100644 --- a/cmd/manager/main_test.go +++ b/internal/garbagecollection/garbage_collector_test.go @@ -1,4 +1,4 @@ -package main +package garbagecollection import ( "context" @@ -6,7 +6,6 @@ import ( "path/filepath" "testing" - "github.com/go-logr/logr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -16,7 +15,7 @@ import ( "github.com/operator-framework/catalogd/api/core/v1alpha1" ) -func TestUnpackStartupGarbageCollection(t *testing.T) { +func TestRunGarbageCollection(t *testing.T) { for _, tt := range []struct { name string existCatalogs []*metav1.PartialObjectMetadata @@ -76,7 +75,7 @@ func TestUnpackStartupGarbageCollection(t *testing.T) { metaClient := fake.NewSimpleMetadataClient(scheme, runtimeObjs...) - err := unpackStartupGarbageCollection(ctx, cachePath, logr.Discard(), metaClient) + _, err := runGarbageCollection(ctx, cachePath, metaClient) if !tt.wantErr { assert.NoError(t, err) entries, err := os.ReadDir(cachePath)