diff --git a/cmd/root.go b/cmd/root.go index a09c3142..eb50ead2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -81,6 +81,11 @@ A mutating webhook for Kubernetes, pointing the images to a new location.`, log.Err(err).Str("policy", cfg.ImageCopyPolicy).Msg("parsing image copy policy failed") } + imageCopyDeadline := config.DefaultImageCopyDeadline + if cfg.ImageCopyDeadline != 0 { + imageCopyDeadline = cfg.ImageCopyDeadline + } + imagePullSecretProvider := setupImagePullSecretsProvider() wh, err := webhook.NewImageSwapperWebhookWithOpts( @@ -89,6 +94,7 @@ A mutating webhook for Kubernetes, pointing the images to a new location.`, webhook.ImagePullSecretsProvider(imagePullSecretProvider), webhook.ImageSwapPolicy(imageSwapPolicy), webhook.ImageCopyPolicy(imageCopyPolicy), + webhook.ImageCopyDeadline(imageCopyDeadline), ) if err != nil { log.Err(err).Msg("error creating webhook") diff --git a/docs/configuration.md b/docs/configuration.md index 73136634..52c9f1cb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -38,9 +38,14 @@ The option `imageSwapPolicy` (default: `exists`) defines the mutation strategy u The option `imageCopyPolicy` (default: `delayed`) defines the image copy strategy used. * `delayed`: Submits the copy job to a process queue and moves on. -* `immediate`: Submits the copy job to a process queue and waits for it to finish (deadline 8s). -* `force`: Attempts to immediately copy the image (deadline 8s). +* `immediate`: Submits the copy job to a process queue and waits for it to finish (deadline defined by `imageCopyDeadline`). +* `force`: Attempts to immediately copy the image (deadline defined by `imageCopyDeadline`). +## ImageCopyDeadline + +The option `imageCopyDeadline` (default: `8s`) defines the duration after which the image copy if aborted. + +This option only applies for `immediate` and `force` image copy strategies. ## Source diff --git a/pkg/config/config.go b/pkg/config/config.go index e74dde05..f210245a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -23,19 +23,24 @@ package config import ( "fmt" + "time" ) +const DefaultImageCopyDeadline = 8 * time.Second + type Config struct { LogLevel string `yaml:"logLevel" validate:"oneof=trace debug info warn error fatal"` LogFormat string `yaml:"logFormat" validate:"oneof=json console"` ListenAddress string - DryRun bool `yaml:"dryRun"` - ImageSwapPolicy string `yaml:"imageSwapPolicy" validate:"oneof=always exists"` - ImageCopyPolicy string `yaml:"imageCopyPolicy" validate:"oneof=delayed immediate force"` - Source Source `yaml:"source"` - Target Target `yaml:"target"` + DryRun bool `yaml:"dryRun"` + ImageSwapPolicy string `yaml:"imageSwapPolicy" validate:"oneof=always exists"` + ImageCopyPolicy string `yaml:"imageCopyPolicy" validate:"oneof=delayed immediate force"` + ImageCopyDeadline time.Duration `yaml:"imageCopyDeadline"` + + Source Source `yaml:"source"` + Target Target `yaml:"target"` TLSCertFile string TLSKeyFile string diff --git a/pkg/registry/client.go b/pkg/registry/client.go index bec081e5..07a5645c 100644 --- a/pkg/registry/client.go +++ b/pkg/registry/client.go @@ -1,13 +1,15 @@ package registry +import "context" + // Client provides methods required to be implemented by the various target registry clients, e.g. ECR, Docker, Quay. type Client interface { - CreateRepository(string) error + CreateRepository(ctx context.Context, name string) error RepositoryExists() bool CopyImage() error PullImage() error PutImage() error - ImageExists(ref string) bool + ImageExists(ctx context.Context, ref string) bool // Endpoint returns the domain of the registry Endpoint() string diff --git a/pkg/registry/ecr.go b/pkg/registry/ecr.go index df9e3cd1..1b550244 100644 --- a/pkg/registry/ecr.go +++ b/pkg/registry/ecr.go @@ -1,6 +1,7 @@ package registry import ( + "context" "encoding/base64" "net/http" "os/exec" @@ -18,8 +19,6 @@ import ( "github.com/rs/zerolog/log" ) -var execCommand = exec.Command - type ECRClient struct { client ecriface.ECRAPI ecrDomain string @@ -36,12 +35,12 @@ func (e *ECRClient) Credentials() string { return string(e.authToken) } -func (e *ECRClient) CreateRepository(name string) error { +func (e *ECRClient) CreateRepository(ctx context.Context, name string) error { if _, found := e.cache.Get(name); found { return nil } - _, err := e.client.CreateRepository(&ecr.CreateRepositoryInput{ + _, err := e.client.CreateRepositoryWithContext(ctx, &ecr.CreateRepositoryInput{ RepositoryName: aws.String(name), ImageScanningConfiguration: &ecr.ImageScanningConfiguration{ ScanOnPush: aws.Bool(true), @@ -68,7 +67,7 @@ func (e *ECRClient) CreateRepository(name string) error { if len(e.accessPolicy) > 0 { log.Debug().Str("repo", name).Str("accessPolicy", e.accessPolicy).Msg("setting access policy on repo") - _, err := e.client.SetRepositoryPolicy(&ecr.SetRepositoryPolicyInput{ + _, err := e.client.SetRepositoryPolicyWithContext(ctx, &ecr.SetRepositoryPolicyInput{ PolicyText: &e.accessPolicy, RegistryId: &e.targetAccount, RepositoryName: aws.String(name), @@ -82,7 +81,7 @@ func (e *ECRClient) CreateRepository(name string) error { if len(e.lifecyclePolicy) > 0 { log.Debug().Str("repo", name).Str("lifecyclePolicy", e.lifecyclePolicy).Msg("setting lifecycle policy on repo") - _, err := e.client.PutLifecyclePolicy(&ecr.PutLifecyclePolicyInput{ + _, err := e.client.PutLifecyclePolicyWithContext(ctx, &ecr.PutLifecyclePolicyInput{ LifecyclePolicyText: &e.lifecyclePolicy, RegistryId: &e.targetAccount, RepositoryName: aws.String(name), @@ -130,7 +129,7 @@ func (e *ECRClient) PutImage() error { panic("implement me") } -func (e *ECRClient) ImageExists(ref string) bool { +func (e *ECRClient) ImageExists(ctx context.Context, ref string) bool { if _, found := e.cache.Get(ref); found { return true } @@ -143,10 +142,8 @@ func (e *ECRClient) ImageExists(ref string) bool { "--creds", e.Credentials(), } - log.Trace().Str("app", app).Strs("args", args).Msg("executing command to inspect image") - cmd := execCommand(app, args...) - - if _, err := cmd.Output(); err != nil { + log.Ctx(ctx).Trace().Str("app", app).Strs("args", args).Msg("executing command to inspect image") + if err := exec.CommandContext(ctx, app, args...).Run(); err != nil { return false } diff --git a/pkg/secrets/dummy.go b/pkg/secrets/dummy.go index 8e35ae82..722b2b04 100644 --- a/pkg/secrets/dummy.go +++ b/pkg/secrets/dummy.go @@ -1,6 +1,10 @@ package secrets -import v1 "k8s.io/api/core/v1" +import ( + "context" + + v1 "k8s.io/api/core/v1" +) // DummyImagePullSecretsProvider does nothing type DummyImagePullSecretsProvider struct { @@ -12,6 +16,6 @@ func NewDummyImagePullSecretsProvider() ImagePullSecretsProvider { } // GetImagePullSecrets returns an empty ImagePullSecretsResult -func (p *DummyImagePullSecretsProvider) GetImagePullSecrets(pod *v1.Pod) (*ImagePullSecretsResult, error) { +func (p *DummyImagePullSecretsProvider) GetImagePullSecrets(ctx context.Context, pod *v1.Pod) (*ImagePullSecretsResult, error) { return NewImagePullSecretsResult(), nil } diff --git a/pkg/secrets/dummy_test.go b/pkg/secrets/dummy_test.go index 88794e25..3e9316c0 100644 --- a/pkg/secrets/dummy_test.go +++ b/pkg/secrets/dummy_test.go @@ -1,6 +1,7 @@ package secrets import ( + "context" "reflect" "testing" @@ -41,7 +42,7 @@ func TestDummyImagePullSecretsProvider_GetImagePullSecrets(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { p := &DummyImagePullSecretsProvider{} - got, err := p.GetImagePullSecrets(tt.args.pod) + got, err := p.GetImagePullSecrets(context.Background(), tt.args.pod) if (err != nil) != tt.wantErr { t.Errorf("GetImagePullSecrets() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/pkg/secrets/kubernetes.go b/pkg/secrets/kubernetes.go index aeb1fb78..ccd1b9ee 100644 --- a/pkg/secrets/kubernetes.go +++ b/pkg/secrets/kubernetes.go @@ -62,7 +62,7 @@ func NewKubernetesImagePullSecretsProvider(clientset kubernetes.Interface) Image } // GetImagePullSecrets returns all secrets with their respective content -func (p *KubernetesImagePullSecretsProvider) GetImagePullSecrets(pod *v1.Pod) (*ImagePullSecretsResult, error) { +func (p *KubernetesImagePullSecretsProvider) GetImagePullSecrets(ctx context.Context, pod *v1.Pod) (*ImagePullSecretsResult, error) { var secrets = make(map[string][]byte) imagePullSecrets := pod.Spec.ImagePullSecrets @@ -70,9 +70,9 @@ func (p *KubernetesImagePullSecretsProvider) GetImagePullSecrets(pod *v1.Pod) (* // retrieve secret names from pod ServiceAccount (spec.imagePullSecrets) serviceAccount, err := p.kubernetesClient.CoreV1(). ServiceAccounts(pod.Namespace). - Get(context.TODO(), pod.Spec.ServiceAccountName, metav1.GetOptions{}) + Get(ctx, pod.Spec.ServiceAccountName, metav1.GetOptions{}) if err != nil { - log.Err(err).Msg("error fetching referenced service account, continue without service account imagePullSecrets") + log.Ctx(ctx).Warn().Msg("error fetching referenced service account, continue without service account imagePullSecrets") } if serviceAccount != nil { @@ -86,9 +86,9 @@ func (p *KubernetesImagePullSecretsProvider) GetImagePullSecrets(pod *v1.Pod) (* continue } - secret, err := p.kubernetesClient.CoreV1().Secrets(pod.Namespace).Get(context.TODO(), imagePullSecret.Name, metav1.GetOptions{}) + secret, err := p.kubernetesClient.CoreV1().Secrets(pod.Namespace).Get(ctx, imagePullSecret.Name, metav1.GetOptions{}) if err != nil { - log.Err(err).Msg("error fetching secret, continue without imagePullSecrets") + log.Ctx(ctx).Err(err).Msg("error fetching secret, continue without imagePullSecrets") } if secret == nil || secret.Type != v1.SecretTypeDockerConfigJson { diff --git a/pkg/secrets/kubernetes_test.go b/pkg/secrets/kubernetes_test.go index 17281874..dae29e6f 100644 --- a/pkg/secrets/kubernetes_test.go +++ b/pkg/secrets/kubernetes_test.go @@ -89,7 +89,7 @@ func TestKubernetesCredentialProvider_GetImagePullSecrets(t *testing.T) { _, _ = clientSet.CoreV1().Secrets("test-ns").Create(context.TODO(), podSecret, metav1.CreateOptions{}) provider := NewKubernetesImagePullSecretsProvider(clientSet) - result, err := provider.GetImagePullSecrets(pod) + result, err := provider.GetImagePullSecrets(context.Background(), pod) assert.NoError(t, err) assert.NotNil(t, result) diff --git a/pkg/secrets/provider.go b/pkg/secrets/provider.go index e8e9ed69..8bb11789 100644 --- a/pkg/secrets/provider.go +++ b/pkg/secrets/provider.go @@ -1,7 +1,11 @@ package secrets -import v1 "k8s.io/api/core/v1" +import ( + "context" + + v1 "k8s.io/api/core/v1" +) type ImagePullSecretsProvider interface { - GetImagePullSecrets(pod *v1.Pod) (*ImagePullSecretsResult, error) + GetImagePullSecrets(ctx context.Context, pod *v1.Pod) (*ImagePullSecretsResult, error) } diff --git a/pkg/webhook/image_copier.go b/pkg/webhook/image_copier.go new file mode 100644 index 00000000..a3b50ff3 --- /dev/null +++ b/pkg/webhook/image_copier.go @@ -0,0 +1,190 @@ +package webhook + +import ( + "context" + "errors" + "fmt" + "os" + "os/exec" + + "github.com/containers/image/v5/docker/reference" + ctypes "github.com/containers/image/v5/types" + "github.com/rs/zerolog/log" + corev1 "k8s.io/api/core/v1" +) + +// struct representing a job of copying an image with its subcontext +type ImageCopier struct { + sourcePod *corev1.Pod + sourceImageRef ctypes.ImageReference + targetImage string + + imagePullPolicy corev1.PullPolicy + imageSwapper *ImageSwapper + + context context.Context + cancelContext context.CancelFunc +} + +type Task struct { + function func() error + description string +} + +var ErrImageAlreadyPresent = errors.New("image already present in target registry") + +// replace the default context with a new one with a timeout +func (ic *ImageCopier) withDeadline() *ImageCopier { + imageCopierContext, imageCopierContextCancel := context.WithTimeout(ic.context, ic.imageSwapper.imageCopyDeadline) + ic.context = imageCopierContext + ic.cancelContext = imageCopierContextCancel + return ic +} + +// start the image copy job +func (ic *ImageCopier) start() { + if _, hasDeadline := ic.context.Deadline(); hasDeadline { + defer ic.cancelContext() + } + + // list of actions to execute in order to copy an image + tasks := []*Task{ + { + function: ic.taskCheckImage, + description: "checking image presence in target registry", + }, + { + function: ic.taskCreateRepository, + description: "creating a new repository in target registry", + }, + { + function: ic.taskCopyImage, + description: "copying image data to target repository", + }, + } + + for _, task := range tasks { + err := ic.run(task.function) + + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + log.Ctx(ic.context).Err(err).Msg("timeout during image copy") + } else if errors.Is(err, ErrImageAlreadyPresent) { + log.Ctx(ic.context).Trace().Msgf("image copy aborted: %s", err.Error()) + } else { + log.Ctx(ic.context).Err(err).Msgf("image copy error while %s", task.description) + } + break + } + } +} + +// run a task function and check for timeout +func (ic *ImageCopier) run(taskFunc func() error) error { + if err := ic.context.Err(); err != nil { + return err + } + + return taskFunc() +} + +func (ic *ImageCopier) taskCheckImage() error { + registryClient := ic.imageSwapper.registryClient + + imageAlreadyExists := registryClient.ImageExists(ic.context, ic.targetImage) && ic.imagePullPolicy != corev1.PullAlways + + if err := ic.context.Err(); err != nil { + return err + } else if imageAlreadyExists { + return ErrImageAlreadyPresent + } + + return nil +} + +func (ic *ImageCopier) taskCreateRepository() error { + createRepoName := reference.TrimNamed(ic.sourceImageRef.DockerReference()).String() + + log.Ctx(ic.context).Debug().Str("repository", createRepoName).Msg("create repository") + + return ic.imageSwapper.registryClient.CreateRepository(ic.context, createRepoName) +} + +func (ic *ImageCopier) taskCopyImage() error { + ctx := ic.context + sourceImage := ic.sourceImageRef.DockerReference().String() + targetImage := ic.targetImage + + // Retrieve secrets and auth credentials + imagePullSecrets, err := ic.imageSwapper.imagePullSecretProvider.GetImagePullSecrets(ctx, ic.sourcePod) + // not possible at the moment + if err != nil { + return err + } + + authFile, err := imagePullSecrets.AuthFile() + if err != nil { + log.Ctx(ctx).Err(err).Msg("failed generating authFile") + } + + defer func() { + if err := os.RemoveAll(authFile.Name()); err != nil { + log.Ctx(ctx).Err(err).Str("file", authFile.Name()).Msg("failed removing auth file") + } + }() + + // Copy image + // TODO: refactor to use structure instead of passing file name / string + // + // or transform registryClient creds into auth compatible form, e.g. + // {"auths":{"aws_account_id.dkr.ecr.region.amazonaws.com":{"username":"AWS","password":"..." }}} + return skopeoCopyImage(ctx, sourceImage, authFile.Name(), targetImage, ic.imageSwapper.registryClient.Credentials()) +} + +func skopeoCopyImage(ctx context.Context, src string, srcCeds string, dest string, destCreds string) error { + if err := ctx.Err(); err != nil { + return err + } + + app := "skopeo" + args := []string{ + "--override-os", "linux", + "copy", + "--all", + "--retry-times", "3", + "docker://" + src, + "docker://" + dest, + } + + if len(srcCeds) > 0 { + args = append(args, "--src-authfile", srcCeds) + } else { + args = append(args, "--src-no-creds") + } + + if len(destCreds) > 0 { + args = append(args, "--dest-creds", destCreds) + } else { + args = append(args, "--dest-no-creds") + } + + log.Ctx(ctx). + Trace(). + Str("app", app). + Strs("args", args). + Msg("execute command to copy image") + + output, cmdErr := exec.CommandContext(ctx, app, args...).CombinedOutput() + + // check if the command timed out during execution for proper logging + if err := ctx.Err(); err != nil { + return err + } + + // enrich error with output from the command which may contain the actual reason + if cmdErr != nil { + return fmt.Errorf("Command error, stderr: %s, stdout: %s", cmdErr.Error(), string(output)) + } + + return nil +} diff --git a/pkg/webhook/image_copier_test.go b/pkg/webhook/image_copier_test.go new file mode 100644 index 00000000..043589e7 --- /dev/null +++ b/pkg/webhook/image_copier_test.go @@ -0,0 +1,116 @@ +package webhook + +import ( + "context" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ecr" + "github.com/containers/image/v5/transports/alltransports" + "github.com/estahn/k8s-image-swapper/pkg/registry" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + corev1 "k8s.io/api/core/v1" +) + +func TestImageCopier_withDeadline(t *testing.T) { + mutator := NewImageSwapperWithOpts( + nil, + ImageCopyDeadline(8*time.Second), + ) + + imageSwapper, _ := mutator.(*ImageSwapper) + + imageCopier := &ImageCopier{ + imageSwapper: imageSwapper, + context: context.Background(), + } + + imageCopier = imageCopier.withDeadline() + deadline, hasDeadline := imageCopier.context.Deadline() + + // test that a deadline has been set + assert.Equal(t, true, hasDeadline) + + // test that the deadline is future + assert.GreaterOrEqual(t, deadline, time.Now()) + + // test that the context can be canceled + assert.NotEqual(t, nil, imageCopier.context.Done()) + + imageCopier.cancelContext() + + _, ok := <-imageCopier.context.Done() + // test that the Done channel is closed, meaning the context is canceled + assert.Equal(t, false, ok) + +} + +func TestImageCopier_tasksTimeout(t *testing.T) { + ecrClient := new(mockECRClient) + ecrClient.On( + "CreateRepositoryWithContext", + mock.AnythingOfType("*context.timerCtx"), + &ecr.CreateRepositoryInput{ + ImageScanningConfiguration: &ecr.ImageScanningConfiguration{ + ScanOnPush: aws.Bool(true), + }, + ImageTagMutability: aws.String("MUTABLE"), + RepositoryName: aws.String("docker.io/library/init-container"), + RegistryId: aws.String("123456789"), + Tags: []*ecr.Tag{ + { + Key: aws.String("CreatedBy"), + Value: aws.String("k8s-image-swapper"), + }, + }, + }).Return(mock.Anything) + + registryClient, _ := registry.NewMockECRClient(ecrClient, "ap-southeast-2", "123456789.dkr.ecr.ap-southeast-2.amazonaws.com", "123456789", "arn:aws:iam::123456789:role/fakerole") + + // image swapper with an instant timeout for testing purpose + mutator := NewImageSwapperWithOpts( + registryClient, + ImageCopyDeadline(0*time.Second), + ) + + imageSwapper, _ := mutator.(*ImageSwapper) + + srcRef, _ := alltransports.ParseImageName("docker://library/init-container:latest") + imageCopier := &ImageCopier{ + imageSwapper: imageSwapper, + context: context.Background(), + sourceImageRef: srcRef, + targetImage: "123456789.dkr.ecr.ap-southeast-2.amazonaws.com/docker.io/library/init-container:latest", + imagePullPolicy: corev1.PullAlways, + sourcePod: &corev1.Pod{ + Spec: corev1.PodSpec{ + ServiceAccountName: "service-account", + ImagePullSecrets: []corev1.LocalObjectReference{}, + }, + }, + } + imageCopier = imageCopier.withDeadline() + + // test that copy steps generate timeout errors + var timeoutError error + + timeoutError = imageCopier.run(imageCopier.taskCheckImage) + assert.Equal(t, context.DeadlineExceeded, timeoutError) + + timeoutError = imageCopier.run(imageCopier.taskCreateRepository) + assert.Equal(t, context.DeadlineExceeded, timeoutError) + + timeoutError = imageCopier.run(imageCopier.taskCopyImage) + assert.Equal(t, context.DeadlineExceeded, timeoutError) + + timeoutError = imageCopier.taskCheckImage() + assert.Equal(t, context.DeadlineExceeded, timeoutError) + + timeoutError = imageCopier.taskCreateRepository() + assert.Equal(t, context.DeadlineExceeded, timeoutError) + + timeoutError = imageCopier.taskCopyImage() + assert.Equal(t, context.DeadlineExceeded, timeoutError) +} diff --git a/pkg/webhook/image_swapper.go b/pkg/webhook/image_swapper.go index fdf8c823..3b2ab7b7 100644 --- a/pkg/webhook/image_swapper.go +++ b/pkg/webhook/image_swapper.go @@ -4,8 +4,7 @@ import ( "context" "encoding/json" "fmt" - "os" - "os/exec" + "time" "github.com/alitto/pond" "github.com/containers/image/v5/docker/reference" @@ -24,8 +23,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var execCommand = exec.Command - // Option represents an option that can be passed when instantiating the image swapper to customize it type Option func(*ImageSwapper) @@ -57,6 +54,13 @@ func ImageCopyPolicy(policy types.ImageCopyPolicy) Option { } } +// ImageCopyDeadline allows to pass the ImageCopyPolicy option +func ImageCopyDeadline(deadline time.Duration) Option { + return func(swapper *ImageSwapper) { + swapper.imageCopyDeadline = deadline + } +} + // Copier allows to pass the copier option func Copier(pool *pond.WorkerPool) Option { return func(swapper *ImageSwapper) { @@ -74,14 +78,15 @@ type ImageSwapper struct { filters []config.JMESPathFilter // copier manages the jobs copying the images to the target registry - copier *pond.WorkerPool + copier *pond.WorkerPool + imageCopyDeadline time.Duration imageSwapPolicy types.ImageSwapPolicy imageCopyPolicy types.ImageCopyPolicy } // NewImageSwapper returns a new ImageSwapper initialized. -func NewImageSwapper(registryClient registry.Client, imagePullSecretProvider secrets.ImagePullSecretsProvider, filters []config.JMESPathFilter, imageSwapPolicy types.ImageSwapPolicy, imageCopyPolicy types.ImageCopyPolicy) kwhmutating.Mutator { +func NewImageSwapper(registryClient registry.Client, imagePullSecretProvider secrets.ImagePullSecretsProvider, filters []config.JMESPathFilter, imageSwapPolicy types.ImageSwapPolicy, imageCopyPolicy types.ImageCopyPolicy, imageCopyDeadline time.Duration) kwhmutating.Mutator { return &ImageSwapper{ registryClient: registryClient, imagePullSecretProvider: imagePullSecretProvider, @@ -89,6 +94,7 @@ func NewImageSwapper(registryClient registry.Client, imagePullSecretProvider sec copier: pond.New(100, 1000), imageSwapPolicy: imageSwapPolicy, imageCopyPolicy: imageCopyPolicy, + imageCopyDeadline: imageCopyDeadline, } } @@ -126,8 +132,8 @@ func NewImageSwapperWebhookWithOpts(registryClient registry.Client, opts ...Opti return kwhmutating.NewWebhook(mcfg) } -func NewImageSwapperWebhook(registryClient registry.Client, imagePullSecretProvider secrets.ImagePullSecretsProvider, filters []config.JMESPathFilter, imageSwapPolicy types.ImageSwapPolicy, imageCopyPolicy types.ImageCopyPolicy) (webhook.Webhook, error) { - imageSwapper := NewImageSwapper(registryClient, imagePullSecretProvider, filters, imageSwapPolicy, imageCopyPolicy) +func NewImageSwapperWebhook(registryClient registry.Client, imagePullSecretProvider secrets.ImagePullSecretsProvider, filters []config.JMESPathFilter, imageSwapPolicy types.ImageSwapPolicy, imageCopyPolicy types.ImageCopyPolicy, imageCopyDeadline time.Duration) (webhook.Webhook, error) { + imageSwapper := NewImageSwapper(registryClient, imagePullSecretProvider, filters, imageSwapPolicy, imageCopyPolicy, imageCopyDeadline) mt := kwhmutating.MutatorFunc(imageSwapper.Mutate) mcfg := kwhmutating.WebhookConfig{ ID: "k8s-image-swapper", @@ -172,8 +178,7 @@ func (p *ImageSwapper) Mutate(ctx context.Context, ar *kwhmodel.AdmissionReview, Str("name", pod.Name). Logger() - lctx := logger. - WithContext(ctx) + lctx := logger.WithContext(context.Background()) containerSets := []*[]corev1.Container{&pod.Spec.Containers, &pod.Spec.InitContainers} for _, containerSet := range containerSets { @@ -204,57 +209,30 @@ func (p *ImageSwapper) Mutate(ctx context.Context, ar *kwhmodel.AdmissionReview, targetImage := p.targetName(srcRef) - copyFn := func() { - // Avoid unnecessary copying by ending early. For images such as :latest we adhere to the - // image pull policy. - if p.registryClient.ImageExists(targetImage) && container.ImagePullPolicy != corev1.PullAlways { - return - } - - // Create repository - createRepoName := reference.TrimNamed(srcRef.DockerReference()).String() - log.Ctx(lctx).Debug().Str("repository", createRepoName).Msg("create repository") - if err := p.registryClient.CreateRepository(createRepoName); err != nil { - log.Err(err).Str("repository", createRepoName).Msg("failed to create repository") - } - - // Retrieve secrets and auth credentials - imagePullSecrets, err := p.imagePullSecretProvider.GetImagePullSecrets(pod) - if err != nil { - log.Err(err).Msg("failed to retrieve image pull secrets from provider") - } - - authFile, err := imagePullSecrets.AuthFile() - if err != nil { - log.Err(err).Msg("failed generating authFile") - } - - defer func() { - if err := os.RemoveAll(authFile.Name()); err != nil { - log.Err(err).Str("file", authFile.Name()).Msg("failed removing auth file") - } - }() - - // Copy image - // TODO: refactor to use structure instead of passing file name / string - // or transform registryClient creds into auth compatible form, e.g. - // {"auths":{"aws_account_id.dkr.ecr.region.amazonaws.com":{"username":"AWS","password":"..." }}} - log.Ctx(lctx).Trace().Str("source", srcRef.DockerReference().String()).Str("target", targetImage).Msg("copy image") - if err := copyImage(srcRef.DockerReference().String(), authFile.Name(), targetImage, p.registryClient.Credentials()); err != nil { - log.Ctx(lctx).Err(err).Str("source", srcRef.DockerReference().String()).Str("target", targetImage).Msg("copying image to target registry failed") - } + imageCopierLogger := logger.With(). + Str("source-image", srcRef.DockerReference().Name()). + Str("target-image", targetImage). + Logger() + + imageCopierContext := imageCopierLogger.WithContext(lctx) + // create an object responsible for the image copy + imageCopier := ImageCopier{ + sourcePod: pod, + sourceImageRef: srcRef, + targetImage: targetImage, + imagePullPolicy: container.ImagePullPolicy, + imageSwapper: p, + context: imageCopierContext, } // imageCopyPolicy switch p.imageCopyPolicy { case types.ImageCopyPolicyDelayed: - p.copier.Submit(copyFn) + p.copier.Submit(imageCopier.start) case types.ImageCopyPolicyImmediate: - // TODO: Implement deadline - p.copier.SubmitAndWait(copyFn) + p.copier.SubmitAndWait(imageCopier.withDeadline().start) case types.ImageCopyPolicyForce: - // TODO: Implement deadline - copyFn() + imageCopier.withDeadline().start() default: panic("unknown imageCopyPolicy") } @@ -265,7 +243,7 @@ func (p *ImageSwapper) Mutate(ctx context.Context, ar *kwhmodel.AdmissionReview, log.Ctx(lctx).Debug().Str("image", targetImage).Msg("set new container image") containers[i].Image = targetImage case types.ImageSwapPolicyExists: - if p.registryClient.ImageExists(targetImage) { + if p.registryClient.ImageExists(lctx, targetImage) { log.Ctx(lctx).Debug().Str("image", targetImage).Msg("set new container image") containers[i].Image = targetImage } else { @@ -341,38 +319,3 @@ func NewFilterContext(request kwhmodel.AdmissionReview, obj metav1.Object, conta return FilterContext{Obj: obj, Container: container} } - -func copyImage(src string, srcCeds string, dest string, destCreds string) error { - app := "skopeo" - args := []string{ - "--override-os", "linux", - "copy", - "--multi-arch", "all", - "--retry-times", "3", - "docker://" + src, - "docker://" + dest, - } - - if len(srcCeds) > 0 { - args = append(args, "--src-authfile", srcCeds) - } else { - args = append(args, "--src-no-creds") - } - - if len(destCreds) > 0 { - args = append(args, "--dest-creds", destCreds) - } else { - args = append(args, "--dest-no-creds") - } - - cmd := execCommand(app, args...) - output, err := cmd.CombinedOutput() - - log.Trace(). - Str("app", app). - Strs("args", args). - Bytes("output", output). - Msg("executed command to copy image") - - return err -} diff --git a/pkg/webhook/image_swapper_test.go b/pkg/webhook/image_swapper_test.go index d68e5859..f313e05b 100644 --- a/pkg/webhook/image_swapper_test.go +++ b/pkg/webhook/image_swapper_test.go @@ -10,6 +10,7 @@ import ( "github.com/alitto/pond" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/ecr" "github.com/aws/aws-sdk-go/service/ecr/ecriface" "github.com/estahn/k8s-image-swapper/pkg/config" @@ -25,6 +26,8 @@ import ( "k8s.io/client-go/kubernetes/fake" ) +var execCommand = exec.Command + //func TestImageSwapperMutator(t *testing.T) { // tests := []struct { // name string @@ -198,8 +201,12 @@ type mockECRClient struct { ecriface.ECRAPI } -func (m *mockECRClient) CreateRepository(createRepositoryInput *ecr.CreateRepositoryInput) (*ecr.CreateRepositoryOutput, error) { - m.Called(createRepositoryInput) +func (m *mockECRClient) CreateRepositoryWithContext(ctx context.Context, createRepositoryInput *ecr.CreateRepositoryInput, opts ...request.Option) (*ecr.CreateRepositoryOutput, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + + m.Called(ctx, createRepositoryInput) return &ecr.CreateRepositoryOutput{}, nil } @@ -238,7 +245,8 @@ func TestImageSwapper_Mutate(t *testing.T) { ecrClient := new(mockECRClient) ecrClient.On( - "CreateRepository", + "CreateRepositoryWithContext", + mock.AnythingOfType("*context.valueCtx"), &ecr.CreateRepositoryInput{ ImageScanningConfiguration: &ecr.ImageScanningConfiguration{ ScanOnPush: aws.Bool(true), @@ -258,7 +266,8 @@ func TestImageSwapper_Mutate(t *testing.T) { }, }).Return(mock.Anything) ecrClient.On( - "CreateRepository", + "CreateRepositoryWithContext", + mock.AnythingOfType("*context.valueCtx"), &ecr.CreateRepositoryInput{ ImageScanningConfiguration: &ecr.ImageScanningConfiguration{ ScanOnPush: aws.Bool(true), @@ -278,7 +287,8 @@ func TestImageSwapper_Mutate(t *testing.T) { }, }).Return(mock.Anything) ecrClient.On( - "CreateRepository", + "CreateRepositoryWithContext", + mock.AnythingOfType("*context.valueCtx"), &ecr.CreateRepositoryInput{ ImageScanningConfiguration: &ecr.ImageScanningConfiguration{ ScanOnPush: aws.Bool(true), @@ -313,7 +323,7 @@ func TestImageSwapper_Mutate(t *testing.T) { assert.NoError(t, err, "NewImageSwapperWebhookWithOpts executed without errors") - resp, err := wh.Review(context.TODO(), admissionReviewModel) + resp, err := wh.Review(context.Background(), admissionReviewModel) expected := `[ {"op":"replace","path":"/spec/initContainers/0/image","value":"123456789.dkr.ecr.ap-southeast-2.amazonaws.com/docker.io/library/init-container:latest"}, @@ -338,7 +348,8 @@ func TestImageSwapper_MutateWithImagePullSecrets(t *testing.T) { ecrClient := new(mockECRClient) ecrClient.On( - "CreateRepository", + "CreateRepositoryWithContext", + mock.AnythingOfType("*context.valueCtx"), &ecr.CreateRepositoryInput{ ImageScanningConfiguration: &ecr.ImageScanningConfiguration{ ScanOnPush: aws.Bool(true), @@ -394,9 +405,9 @@ func TestImageSwapper_MutateWithImagePullSecrets(t *testing.T) { }, } - _, _ = clientSet.CoreV1().ServiceAccounts("test-ns").Create(context.TODO(), svcAccount, metav1.CreateOptions{}) - _, _ = clientSet.CoreV1().Secrets("test-ns").Create(context.TODO(), svcAccountSecret, metav1.CreateOptions{}) - _, _ = clientSet.CoreV1().Secrets("test-ns").Create(context.TODO(), podSecret, metav1.CreateOptions{}) + _, _ = clientSet.CoreV1().ServiceAccounts("test-ns").Create(context.Background(), svcAccount, metav1.CreateOptions{}) + _, _ = clientSet.CoreV1().Secrets("test-ns").Create(context.Background(), svcAccountSecret, metav1.CreateOptions{}) + _, _ = clientSet.CoreV1().Secrets("test-ns").Create(context.Background(), podSecret, metav1.CreateOptions{}) provider := secrets.NewKubernetesImagePullSecretsProvider(clientSet) @@ -411,7 +422,7 @@ func TestImageSwapper_MutateWithImagePullSecrets(t *testing.T) { assert.NoError(t, err, "NewImageSwapperWebhookWithOpts executed without errors") - resp, err := wh.Review(context.TODO(), admissionReviewModel) + resp, err := wh.Review(context.Background(), admissionReviewModel) assert.JSONEq(t, "[{\"op\":\"replace\",\"path\":\"/spec/containers/0/image\",\"value\":\"123456789.dkr.ecr.ap-southeast-2.amazonaws.com/docker.io/library/nginx:latest\"}]", string(resp.(*model.MutatingAdmissionResponse).JSONPatchPatch)) assert.Nil(t, resp.(*model.MutatingAdmissionResponse).Warnings)