Skip to content

Commit

Permalink
(bugfix): make garbage collection a runnable (#231)
Browse files Browse the repository at this point in the history
and add it to the controller manager. Make it log errors instead of
exiting. This prevents crashlooping when there are errors in the
garbage collection process.

Signed-off-by: everettraven <everettraven@gmail.com>
  • Loading branch information
everettraven committed Feb 19, 2024
1 parent f549f54 commit 0d305bd
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 39 deletions.
46 changes: 11 additions & 35 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package main

import (
"context"
"flag"
"fmt"
"net/http"
Expand All @@ -31,20 +30,18 @@ import (
"k8s.io/client-go/metadata"
_ "k8s.io/client-go/plugin/pkg/client/auth"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"github.com/go-logr/logr"
"github.com/spf13/pflag"

"github.com/operator-framework/catalogd/internal/garbagecollection"
"github.com/operator-framework/catalogd/internal/source"
"github.com/operator-framework/catalogd/internal/third_party/server"
"github.com/operator-framework/catalogd/internal/version"
Expand Down Expand Up @@ -82,6 +79,7 @@ func main() {
catalogServerAddr string
httpExternalAddr string
cacheDir string
gcInterval time.Duration
)
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
Expand All @@ -94,6 +92,7 @@ func main() {
flag.StringVar(&httpExternalAddr, "http-external-address", "http://catalogd-catalogserver.catalogd-system.svc", "The external address at which the http server is reachable.")
flag.StringVar(&cacheDir, "cache-dir", "/var/cache/", "The directory in the filesystem that catalogd will use for file based caching")
flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit")
flag.DurationVar(&gcInterval, "gc-interval", 12*time.Hour, "interval in which garbage collection should be run against the catalog content cache")
opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -202,8 +201,14 @@ func main() {
}

ctx := ctrl.SetupSignalHandler()
if err := unpackStartupGarbageCollection(ctx, filepath.Join(cacheDir, source.UnpackCacheDir), setupLog, metaClient); err != nil {
setupLog.Error(err, "running garbage collection")
gc := &garbagecollection.GarbageCollector{
CachePath: filepath.Join(cacheDir, source.UnpackCacheDir),
Logger: ctrl.Log.WithName("garbage-collector"),
MetadataClient: metaClient,
Interval: gcInterval,
}
if err := mgr.Add(gc); err != nil {
setupLog.Error(err, "problem adding garbage collector to manager")
os.Exit(1)
}

Expand All @@ -221,32 +226,3 @@ func podNamespace() string {
}
return string(namespace)
}

func unpackStartupGarbageCollection(ctx context.Context, cachePath string, log logr.Logger, metaClient metadata.Interface) error {
getter := metaClient.Resource(v1alpha1.GroupVersion.WithResource("catalogs"))
metaList, err := getter.List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("error listing catalogs: %w", err)
}

expectedCatalogs := sets.New[string]()
for _, meta := range metaList.Items {
expectedCatalogs.Insert(meta.GetName())
}

cacheDirEntries, err := os.ReadDir(cachePath)
if err != nil {
return fmt.Errorf("error reading cache directory: %w", err)
}
for _, cacheDirEntry := range cacheDirEntries {
if cacheDirEntry.IsDir() && expectedCatalogs.Has(cacheDirEntry.Name()) {
continue
}
if err := os.RemoveAll(filepath.Join(cachePath, cacheDirEntry.Name())); err != nil {
return fmt.Errorf("error removing cache directory entry %q: %w ", cacheDirEntry.Name(), err)
}

log.Info("deleted unexpected cache directory entry", "path", cacheDirEntry.Name(), "isDir", cacheDirEntry.IsDir())
}
return nil
}
94 changes: 94 additions & 0 deletions internal/garbagecollection/garbage_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package garbagecollection

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/metadata"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/operator-framework/catalogd/api/core/v1alpha1"
)

var _ manager.Runnable = (*GarbageCollector)(nil)

// GarbageCollector is an implementation of the manager.Runnable
// interface for running garbage collection on the Catalog content
// cache that is served by the catalogd HTTP server. It runs in a loop
// and will ensure that no cache entries exist for Catalog resources
// that no longer exist. This should only clean up cache entries that
// were missed by the handling of a DELETE event on a Catalog resource.
type GarbageCollector struct {
CachePath string
Logger logr.Logger
MetadataClient metadata.Interface
Interval time.Duration
}

// Start will start the garbage collector. It will always run once on startup
// and loop until context is canceled after an initial garbage collection run.
// Garbage collection will run again every X amount of time, where X is the
// supplied garbage collection interval.
func (gc *GarbageCollector) Start(ctx context.Context) error {
// Run once on startup
removed, err := runGarbageCollection(ctx, gc.CachePath, gc.MetadataClient)
if err != nil {
gc.Logger.Error(err, "running garbage collection")
}
if len(removed) > 0 {
gc.Logger.Info("removed stale cache entries", "removed entries", removed)
}

// Loop until context is canceled, running garbage collection
// at the configured interval
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(gc.Interval):
removed, err := runGarbageCollection(ctx, gc.CachePath, gc.MetadataClient)
if err != nil {
gc.Logger.Error(err, "running garbage collection")
}
if len(removed) > 0 {
gc.Logger.Info("removed stale cache entries", "removed entries", removed)
}
}
}
}

func runGarbageCollection(ctx context.Context, cachePath string, metaClient metadata.Interface) ([]string, error) {
getter := metaClient.Resource(v1alpha1.GroupVersion.WithResource("catalogs"))
metaList, err := getter.List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("error listing catalogs: %w", err)
}

expectedCatalogs := sets.New[string]()
for _, meta := range metaList.Items {
expectedCatalogs.Insert(meta.GetName())
}

cacheDirEntries, err := os.ReadDir(cachePath)
if err != nil {
return nil, fmt.Errorf("error reading cache directory: %w", err)
}
removed := []string{}
for _, cacheDirEntry := range cacheDirEntries {
if cacheDirEntry.IsDir() && expectedCatalogs.Has(cacheDirEntry.Name()) {
continue
}
if err := os.RemoveAll(filepath.Join(cachePath, cacheDirEntry.Name())); err != nil {
return nil, fmt.Errorf("error removing cache directory entry %q: %w ", cacheDirEntry.Name(), err)
}

removed = append(removed, cacheDirEntry.Name())
}
return removed, nil
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package main
package garbagecollection

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -16,7 +15,7 @@ import (
"github.com/operator-framework/catalogd/api/core/v1alpha1"
)

func TestUnpackStartupGarbageCollection(t *testing.T) {
func TestRunGarbageCollection(t *testing.T) {
for _, tt := range []struct {
name string
existCatalogs []*metav1.PartialObjectMetadata
Expand Down Expand Up @@ -76,7 +75,7 @@ func TestUnpackStartupGarbageCollection(t *testing.T) {

metaClient := fake.NewSimpleMetadataClient(scheme, runtimeObjs...)

err := unpackStartupGarbageCollection(ctx, cachePath, logr.Discard(), metaClient)
_, err := runGarbageCollection(ctx, cachePath, metaClient)
if !tt.wantErr {
assert.NoError(t, err)
entries, err := os.ReadDir(cachePath)
Expand Down

0 comments on commit 0d305bd

Please sign in to comment.