Skip to content
This repository has been archived by the owner on Aug 12, 2024. It is now read-only.

Commit

Permalink
address more PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Lanford <joe.lanford@gmail.com>
  • Loading branch information
joelanford committed Aug 3, 2022
1 parent 97c1329 commit 8f0ada7
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 21 deletions.
18 changes: 9 additions & 9 deletions cmd/rukpakctl/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
Expand Down Expand Up @@ -101,12 +101,17 @@ one version to the next.
}
fmt.Printf("bundledeployment.core.rukpak.io %q applied\n", bundleDeploymentName)

rukpakCAs, err := rukpakctl.GetClusterCA(ctx, cl, systemNamespace, caSecretName)
rukpakCAs, err := rukpakctl.GetClusterCA(ctx, cl, types.NamespacedName{Namespace: systemNamespace, Name: caSecretName})
if err != nil {
log.Fatal(err)
}

bundleName, err := getBundleName(ctx, cfg, bundleLabels)
dynCl, err := dynamic.NewForConfig(cfg)
if err != nil {
log.Fatalf("build dynamic client: %v", err)
}

bundleName, err := getBundleName(ctx, dynCl, bundleLabels)
if err != nil {
log.Fatalf("failed to get bundle name: %v", err)
}
Expand Down Expand Up @@ -166,12 +171,7 @@ func buildBundleDeployment(bdName string, bundleLabels map[string]string, biPCN,
}}
}

func getBundleName(ctx context.Context, cfg *rest.Config, bundleLabels map[string]string) (string, error) {
dynCl, err := dynamic.NewForConfig(cfg)
if err != nil {
return "", fmt.Errorf("build dynamic client: %v", err)
}

func getBundleName(ctx context.Context, dynCl dynamic.Interface, bundleLabels map[string]string) (string, error) {
watch, err := dynCl.Resource(rukpakv1alpha1.GroupVersion.WithResource("bundles")).Watch(ctx, metav1.ListOptions{Watch: true, LabelSelector: labels.FormatLabels(bundleLabels)})
if err != nil {
return "", fmt.Errorf("watch bundles: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/rukpakctl/ca.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (

// GetClusterCA returns an x509.CertPool by reading the contents of a Kubernetes Secret. It uses the provided
// client to get the requested secret and then loads the contents of the secret's "ca.crt" key into the cert pool.
func GetClusterCA(ctx context.Context, cl client.Reader, ns, secretName string) (*x509.CertPool, error) {
func GetClusterCA(ctx context.Context, cl client.Reader, secretKey types.NamespacedName) (*x509.CertPool, error) {
caSecret := &corev1.Secret{}
if err := cl.Get(ctx, types.NamespacedName{Namespace: ns, Name: secretName}, caSecret); err != nil {
if err := cl.Get(ctx, secretKey, caSecret); err != nil {
return nil, fmt.Errorf("get rukpak certificate authority: %v", err)
}
certPool := x509.NewCertPool()
Expand Down
11 changes: 11 additions & 0 deletions internal/rukpakctl/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,23 @@ func (bu *BundleUploader) Upload(ctx context.Context, bundleName string, bundleF
return false, err
}

// cancel is called by the upload goroutine after the upload completes,
// thus ensuring the port-forwarding goroutine exits, which allows the
// errgroup.Wait() call to unblock.
ctx, cancel := context.WithCancel(ctx)

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return pf.Start(ctx)
})

// Create a pipe to conserve memory. We don't need to buffer the entire bundle
// tar.gz prior to sending it. To use a pipe, we start a writer goroutine and
// a reader goroutine such that the reader reads as soon as the writer writes.
// The reader continues reading until it receives io.EOF, so we need to close
// writer (triggering the io.EOF) as soon as we finish writing. We close the
// writer with `bundleWriter.CloseWithError` so that an error encountered
// writing the FS to a tar.gz stream can be processed by the reader.
bundleReader, bundleWriter := io.Pipe()
eg.Go(func() error {
return bundleWriter.CloseWithError(util.FSToTarGZ(bundleWriter, bundleFS))
Expand Down
7 changes: 6 additions & 1 deletion internal/source/unpacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
rukpakv1alpha1 "github.com/operator-framework/rukpak/api/v1alpha1"
)

const (
// uploadClientTimeout is the timeout to be used with http connections to upload manager.
uploadClientTimeout = time.Second * 10
)

// Unpacker unpacks bundle content, either synchronously or asynchronously and
// returns a Result, which conveys information about the progress of unpacking
// the bundle content.
Expand Down Expand Up @@ -126,7 +131,7 @@ func NewDefaultUnpacker(mgr ctrl.Manager, namespace, provisionerName, unpackImag
rukpakv1alpha1.SourceTypeUpload: &Upload{
baseDownloadURL: baseUploadManagerURL,
bearerToken: mgr.GetConfig().BearerToken,
client: http.Client{Timeout: 10 * time.Second, Transport: httpTransport},
client: http.Client{Timeout: uploadClientTimeout, Transport: httpTransport},
},
}), nil
}
15 changes: 6 additions & 9 deletions test/e2e/plain_provisioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ var _ = Describe("plain provisioner bundle", func() {
err := c.Create(ctx, bundle)
Expect(err).To(BeNil())

rootCAs, err := rukpakctl.GetClusterCA(ctx, c, defaultSystemNamespace, "rukpak-ca")
rootCAs, err := rukpakctl.GetClusterCA(ctx, c, types.NamespacedName{Namespace: defaultSystemNamespace, Name: "rukpak-ca"})
Expect(err).To(BeNil())

bu := rukpakctl.BundleUploader{
Expand All @@ -1004,15 +1004,12 @@ var _ = Describe("plain provisioner bundle", func() {
})

It("can unpack the bundle successfully", func() {
Eventually(func() error {
Eventually(func() (*rukpakv1alpha1.Bundle, error) {
if err := c.Get(ctx, client.ObjectKeyFromObject(bundle), bundle); err != nil {
return err
}
if bundle.Status.Phase != rukpakv1alpha1.PhaseUnpacked {
return errors.New("bundle is not unpacked")
return nil, err
}
return nil
}).Should(BeNil())
return bundle, nil
}).Should(WithTransform(func(b *rukpakv1alpha1.Bundle) string { return b.Status.Phase }, Equal(rukpakv1alpha1.PhaseUnpacked)))
})
})

Expand Down Expand Up @@ -1044,7 +1041,7 @@ var _ = Describe("plain provisioner bundle", func() {
err := c.Create(ctx, bundle)
Expect(err).To(BeNil())

rootCAs, err := rukpakctl.GetClusterCA(ctx, c, defaultSystemNamespace, "rukpak-ca")
rootCAs, err := rukpakctl.GetClusterCA(ctx, c, types.NamespacedName{Namespace: defaultSystemNamespace, Name: "rukpak-ca"})
Expect(err).To(BeNil())

bu := rukpakctl.BundleUploader{
Expand Down

0 comments on commit 8f0ada7

Please sign in to comment.