Skip to content

Commit

Permalink
(feature): add direct image registry client Unpacker implementation (
Browse files Browse the repository at this point in the history
…#145)

(feat): direct image registry client unpacker

Signed-off-by: Bryce Palmer <bpalmer@redhat.com>
  • Loading branch information
everettraven authored Oct 10, 2023
1 parent 8a90d8d commit 9f3ba06
Show file tree
Hide file tree
Showing 22 changed files with 2,056 additions and 327 deletions.
15 changes: 8 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ CATALOGD_NAMESPACE ?= catalogd-system

# E2E configuration
TESTDATA_DIR ?= testdata
CONTAINER_RUNTIME ?= docker


##@ General

Expand Down Expand Up @@ -67,7 +67,11 @@ test-e2e: $(GINKGO) ## Run the e2e tests
$(GINKGO) --tags $(GO_BUILD_TAGS) $(E2E_FLAGS) -trace -progress $(FOCUS) test/e2e

e2e: KIND_CLUSTER_NAME=catalogd-e2e
e2e: run kind-load-test-artifacts test-e2e kind-cluster-cleanup ## Run e2e test suite on local kind cluster
e2e: DEPLOY_TARGET=e2e
e2e: kind-cluster image-registry install test-e2e kind-cluster-cleanup ## Run e2e test suite on local kind cluster

image-registry: ## Setup in-cluster image registry
./test/tools/imageregistry/registry.sh

.PHONY: tidy
tidy: ## Update dependencies
Expand Down Expand Up @@ -147,17 +151,14 @@ kind-load: $(KIND) ## Load the built images onto the local cluster
$(KIND) export kubeconfig --name $(KIND_CLUSTER_NAME)
$(KIND) load docker-image $(IMAGE) --name $(KIND_CLUSTER_NAME)

kind-load-test-artifacts: $(KIND) ## Load the e2e testdata container images into a kind cluster
$(CONTAINER_RUNTIME) build $(TESTDATA_DIR)/catalogs -f $(TESTDATA_DIR)/catalogs/test-catalog.Dockerfile -t localhost/testdata/catalogs/test-catalog:e2e
$(KIND) load docker-image localhost/testdata/catalogs/test-catalog:e2e --name $(KIND_CLUSTER_NAME)

.PHONY: install
install: build-container kind-load deploy wait ## Install local catalogd

DEPLOY_TARGET ?= default
.PHONY: deploy
deploy: $(KUSTOMIZE) ## Deploy Catalogd to the K8s cluster specified in ~/.kube/config.
cd config/manager && $(KUSTOMIZE) edit set image controller=$(IMAGE)
$(KUSTOMIZE) build config/default | kubectl apply -f -
$(KUSTOMIZE) build config/${DEPLOY_TARGET} | kubectl apply -f -

.PHONY: undeploy
undeploy: $(KUSTOMIZE) ## Undeploy Catalogd from the K8s cluster specified in ~/.kube/config.
Expand Down
75 changes: 64 additions & 11 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,31 @@ limitations under the License.
package main

import (
"context"
"flag"
"fmt"
"net/http"
"net/url"
"os"
"path/filepath"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
"k8s.io/client-go/metadata"
_ "k8s.io/client-go/plugin/pkg/client/auth"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"

"github.com/go-logr/logr"
"github.com/spf13/pflag"

"github.com/operator-framework/catalogd/internal/source"
Expand All @@ -56,6 +62,8 @@ var (
setupLog = ctrl.Log.WithName("setup")
)

const storageDir = "catalogs"

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))

Expand All @@ -68,25 +76,22 @@ func main() {
metricsAddr string
enableLeaderElection bool
probeAddr string
unpackImage string
profiling bool
catalogdVersion bool
systemNamespace string
storageDir string
catalogServerAddr string
httpExternalAddr string
cacheDir string
)
flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
// TODO: should we move the unpacker to some common place? Or... hear me out... should catalogd just be a rukpak provisioner?
flag.StringVar(&unpackImage, "unpack-image", "quay.io/operator-framework/rukpak:v0.12.0", "The unpack image to use when unpacking catalog images")
flag.StringVar(&systemNamespace, "system-namespace", "", "The namespace catalogd uses for internal state, configuration, and workloads")
flag.StringVar(&storageDir, "catalogs-storage-dir", "/var/cache/catalogs", "The directory in the filesystem where unpacked catalog content will be stored and served from")
flag.StringVar(&catalogServerAddr, "catalogs-server-addr", ":8083", "The address where the unpacked catalogs' content will be accessible")
flag.StringVar(&httpExternalAddr, "http-external-address", "http://catalogd-catalogserver.catalogd-system.svc", "The external address at which the http server is reachable.")
flag.StringVar(&cacheDir, "cache-dir", "/var/cache/", "The directory in the filesystem that catalogd will use for file based caching")
flag.BoolVar(&profiling, "profiling", false, "enable profiling endpoints to allow for using pprof")
flag.BoolVar(&catalogdVersion, "version", false, "print the catalogd version and exit")
opts := zap.Options{
Expand All @@ -105,8 +110,8 @@ func main() {
}

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
cfg := ctrl.GetConfigOrDie()
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
Expand All @@ -123,7 +128,12 @@ func main() {
systemNamespace = podNamespace()
}

unpacker, err := source.NewDefaultUnpacker(mgr, systemNamespace, unpackImage)
if err := os.MkdirAll(cacheDir, 0700); err != nil {
setupLog.Error(err, "unable to create cache directory")
os.Exit(1)
}

unpacker, err := source.NewDefaultUnpacker(systemNamespace, cacheDir)
if err != nil {
setupLog.Error(err, "unable to create unpacker")
os.Exit(1)
Expand All @@ -133,7 +143,8 @@ func main() {
if features.CatalogdFeatureGate.Enabled(features.HTTPServer) {
metrics.Registry.MustRegister(catalogdmetrics.RequestDurationMetric)

if err := os.MkdirAll(storageDir, 0700); err != nil {
storeDir := filepath.Join(cacheDir, storageDir)
if err := os.MkdirAll(storeDir, 0700); err != nil {
setupLog.Error(err, "unable to create storage directory for catalogs")
os.Exit(1)
}
Expand All @@ -143,7 +154,8 @@ func main() {
setupLog.Error(err, "unable to create base storage URL")
os.Exit(1)
}
localStorage = storage.LocalDir{RootDir: storageDir, BaseURL: baseStorageURL}

localStorage = storage.LocalDir{RootDir: storeDir, BaseURL: baseStorageURL}
shutdownTimeout := 30 * time.Second
catalogServer := server.Server{
Kind: "catalogs",
Expand Down Expand Up @@ -189,8 +201,20 @@ func main() {
}
}

metaClient, err := metadata.NewForConfig(cfg)
if err != nil {
setupLog.Error(err, "unable to setup client for garbage collection")
os.Exit(1)
}

ctx := ctrl.SetupSignalHandler()
if err := unpackStartupGarbageCollection(ctx, filepath.Join(cacheDir, source.UnpackCacheDir), setupLog, metaClient); err != nil {
setupLog.Error(err, "running garbage collection")
os.Exit(1)
}

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
Expand All @@ -203,3 +227,32 @@ func podNamespace() string {
}
return string(namespace)
}

func unpackStartupGarbageCollection(ctx context.Context, cachePath string, log logr.Logger, metaClient metadata.Interface) error {
getter := metaClient.Resource(v1alpha1.GroupVersion.WithResource("catalogs"))
metaList, err := getter.List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("error listing catalogs: %w", err)
}

expectedCatalogs := sets.New[string]()
for _, meta := range metaList.Items {
expectedCatalogs.Insert(meta.GetName())
}

cacheDirEntries, err := os.ReadDir(cachePath)
if err != nil {
return fmt.Errorf("error reading cache directory: %w", err)
}
for _, cacheDirEntry := range cacheDirEntries {
if cacheDirEntry.IsDir() && expectedCatalogs.Has(cacheDirEntry.Name()) {
continue
}
if err := os.RemoveAll(filepath.Join(cachePath, cacheDirEntry.Name())); err != nil {
return fmt.Errorf("error removing cache directory entry %q: %w ", cacheDirEntry.Name(), err)
}

log.Info("deleted unexpected cache directory entry", "path", cacheDirEntry.Name(), "isDir", cacheDirEntry.IsDir())
}
return nil
}
97 changes: 97 additions & 0 deletions cmd/manager/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package main

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/metadata/fake"

"github.com/operator-framework/catalogd/api/core/v1alpha1"
)

func TestUnpackStartupGarbageCollection(t *testing.T) {
for _, tt := range []struct {
name string
existCatalogs []*metav1.PartialObjectMetadata
notExistCatalogs []*metav1.PartialObjectMetadata
wantErr bool
}{
{
name: "successful garbage collection",
existCatalogs: []*metav1.PartialObjectMetadata{
{
TypeMeta: metav1.TypeMeta{
Kind: "Catalog",
APIVersion: v1alpha1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "one",
},
},
{
TypeMeta: metav1.TypeMeta{
Kind: "Catalog",
APIVersion: v1alpha1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "two",
},
},
},
notExistCatalogs: []*metav1.PartialObjectMetadata{
{
TypeMeta: metav1.TypeMeta{
Kind: "Catalog",
APIVersion: v1alpha1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "three",
},
},
},
},
} {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
cachePath := t.TempDir()
scheme := runtime.NewScheme()
require.NoError(t, metav1.AddMetaToScheme(scheme))

allCatalogs := append(tt.existCatalogs, tt.notExistCatalogs...)
for _, catalog := range allCatalogs {
require.NoError(t, os.MkdirAll(filepath.Join(cachePath, catalog.Name, "fakedigest"), os.ModePerm))
}

runtimeObjs := []runtime.Object{}
for _, catalog := range tt.existCatalogs {
runtimeObjs = append(runtimeObjs, catalog)
}

metaClient := fake.NewSimpleMetadataClient(scheme, runtimeObjs...)

err := unpackStartupGarbageCollection(ctx, cachePath, logr.Discard(), metaClient)
if !tt.wantErr {
assert.NoError(t, err)
entries, err := os.ReadDir(cachePath)
require.NoError(t, err)
assert.Len(t, entries, len(tt.existCatalogs))
for _, catalog := range tt.existCatalogs {
assert.DirExists(t, filepath.Join(cachePath, catalog.Name))
}

for _, catalog := range tt.notExistCatalogs {
assert.NoDirExists(t, filepath.Join(cachePath, catalog.Name))
}
} else {
assert.Error(t, err)
}
})
}
}
30 changes: 30 additions & 0 deletions config/e2e/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- ../default

patches:
- patch: |-
apiVersion: apps/v1
kind: Deployment
metadata:
name: controller-manager
namespace: system
spec:
template:
spec:
containers:
- name: manager
volumeMounts:
- mountPath: /etc/ssl/certs/
name: ca-certs
readOnly: true
volumes:
- name: ca-certs
configMap:
name: docker-registry.catalogd-e2e.svc
defaultMode: 0644
optional: false
items:
- key: ca-certificates.crt
path: ca-certificates.crt
8 changes: 3 additions & 5 deletions config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,14 @@ spec:
- ./manager
args:
- --leader-elect
- --health-probe-bind-address=:8081
- --metrics-bind-address=127.0.0.1:8080
- --catalogs-storage-dir=/var/cache/catalogs
- --feature-gates=HTTPServer=true
- --http-external-address=http://catalogd-catalogserver.catalogd-system.svc
image: controller:latest
name: manager
volumeMounts:
- name: catalog-cache
mountPath: /var/cache/catalogs
- name: cache
mountPath: /var/cache/
securityContext:
allowPrivilegeEscalation: false
capabilities:
Expand All @@ -108,5 +106,5 @@ spec:
serviceAccountName: controller-manager
terminationGracePeriodSeconds: 10
volumes:
- name: catalog-cache
- name: cache
emptyDir: {}
13 changes: 13 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,16 @@ rules:
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: manager-role
namespace: system
rules:
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
Loading

0 comments on commit 9f3ba06

Please sign in to comment.