diff --git a/pkg/cmd/infra/pusher/pusher.go b/pkg/cmd/infra/pusher/pusher.go new file mode 100644 index 000000000000..275deb33f191 --- /dev/null +++ b/pkg/cmd/infra/pusher/pusher.go @@ -0,0 +1,389 @@ +package pusher + +import ( + "fmt" + "io" + "net/http" + "os" + "strings" + + "github.com/docker/distribution" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/reference" + "github.com/docker/distribution/registry/client" + "github.com/docker/distribution/registry/client/auth" + "github.com/golang/glog" + "github.com/spf13/cobra" + + kerrors "k8s.io/apimachinery/pkg/util/errors" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/kubernetes/pkg/kubectl/cmd/templates" + kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" + + "github.com/docker/distribution/manifest/schema2" + imageapi "github.com/openshift/origin/pkg/image/apis/image" + "github.com/openshift/origin/pkg/image/importer" +) + +var ( + longDesc = templates.LongDesc(` + Push an image to a new location + + Accepts a list of arguments defining source images that should be pushed to the provided + destination image tag. Each argument is of the form "SRC=DST", where both parts must be + valid image references ([registry[:port]/]repository[:tag|@digest]). +`) +) + +type Mapping struct { + Source imageapi.DockerImageReference + Destination imageapi.DockerImageReference +} + +type pushOptions struct { + Out, ErrOut io.Writer + + Mappings []Mapping + + Insecure bool + SkipMount bool + Force bool +} + +// NewCommandPusher helps to push and pull images. +func NewCommandPusher(name string) *cobra.Command { + o := &pushOptions{} + + cmd := &cobra.Command{ + Use: fmt.Sprintf("%s SRC=DST[,...]", name), + Short: "Push images to an image registry", + Long: longDesc, + Run: func(c *cobra.Command, args []string) { + o.Out = os.Stdout + o.ErrOut = c.OutOrStderr() + kcmdutil.CheckErr(o.Complete(args)) + kcmdutil.CheckErr(o.Run()) + }, + } + + flag := cmd.Flags() + flag.BoolVar(&o.Insecure, "insecure", o.Insecure, "If true, connections may be made over HTTP") + flag.BoolVar(&o.SkipMount, "skip-mount", o.SkipMount, "If true, always push layers instead of cross-mounting them") + flag.BoolVar(&o.Force, "force", o.Force, "If true, attempt to write all contents.") + + return cmd +} + +func (o *pushOptions) Complete(args []string) error { + var remainingArgs []string + overlap := make(map[string]string) + for _, s := range args { + parts := strings.SplitN(s, "=", 2) + if len(parts) != 2 { + remainingArgs = append(remainingArgs, s) + continue + } + if len(parts[0]) == 0 || len(parts[1]) == 0 { + return fmt.Errorf("all arguments must be valid SRC=DST mappings") + } + + src, err := imageapi.ParseDockerImageReference(parts[0]) + if err != nil { + return fmt.Errorf("%q is not a valid image reference: %v", parts[0], err) + } + if len(src.Tag) == 0 && len(src.ID) == 0 { + return fmt.Errorf("you must specify a tag or digest for SRC") + } + dst, err := imageapi.ParseDockerImageReference(parts[1]) + if err != nil { + return fmt.Errorf("%q is not a valid image reference: %v", parts[0], err) + } + if len(dst.Tag) == 0 || len(dst.ID) != 0 { + return fmt.Errorf("you must specify a tag for DST") + } + if _, ok := overlap[dst.String()]; ok { + return fmt.Errorf("each destination tag may only be specified once: %s", dst.String()) + } + overlap[dst.String()] = src.String() + + o.Mappings = append(o.Mappings, Mapping{Source: src, Destination: dst}) + } + if len(remainingArgs) > 0 { + return fmt.Errorf("all arguments must be valid SRC=DST mappings") + } + if len(o.Mappings) == 0 { + return fmt.Errorf("you must specify at least one source image to pull and the destination to push to as SRC=DST") + } + return nil +} + +type key struct { + registry string + repository string +} + +type destination struct { + ref imageapi.DockerImageReference + tags []string +} + +type pushTargets map[key]destination + +type destinations struct { + ref imageapi.DockerImageReference + tags map[string]pushTargets + digests map[string]pushTargets +} + +func (d destinations) mergeIntoDigests(srcDigest digest.Digest, target pushTargets) { + srcKey := srcDigest.String() + current, ok := d.digests[srcKey] + if !ok { + d.digests[srcKey] = target + return + } + for repo, dst := range target { + existing, ok := current[repo] + if !ok { + current[repo] = dst + continue + } + existing.tags = append(existing.tags, dst.tags...) + } +} + +type targetTree map[key]destinations + +func buildTargetTree(mappings []Mapping) targetTree { + tree := make(targetTree) + for _, m := range mappings { + srcKey := key{registry: m.Source.Registry, repository: m.Source.RepositoryName()} + dstKey := key{registry: m.Destination.Registry, repository: m.Destination.RepositoryName()} + + src, ok := tree[srcKey] + if !ok { + src.ref = m.Source.AsRepository() + src.digests = make(map[string]pushTargets) + src.tags = make(map[string]pushTargets) + tree[srcKey] = src + } + + var current pushTargets + if tag := m.Source.Tag; len(tag) != 0 { + current = src.tags[tag] + if current == nil { + current = make(pushTargets) + src.tags[tag] = current + } + } else { + current = src.digests[m.Source.ID] + if current == nil { + current = make(pushTargets) + src.digests[m.Source.ID] = current + } + } + + dst, ok := current[dstKey] + if !ok { + dst.ref = m.Destination.AsRepository() + } + dst.tags = append(dst.tags, m.Destination.Tag) + current[dstKey] = dst + } + return tree +} + +type retrieverError struct { + src, dst imageapi.DockerImageReference + err error +} + +func (e retrieverError) Error() string { + return e.err.Error() +} + +func (o *pushOptions) Run() error { + tree := buildTargetTree(o.Mappings) + + creds := importer.NewLocalCredentials() + ctx := apirequest.NewContext() + + srcClient := importer.NewContext(http.DefaultTransport, http.DefaultTransport).WithCredentials(creds) + toContext := importer.NewContext(http.DefaultTransport, http.DefaultTransport).WithActions("pull", "push") + + var errs []error + for _, src := range tree { + srcRepo, err := srcClient.Repository(ctx, src.ref.DockerClientDefaults().RegistryURL(), src.ref.RepositoryName(), o.Insecure) + if err != nil { + errs = append(errs, retrieverError{err: fmt.Errorf("unable to connect to %s: %v", src.ref, err), src: src.ref}) + continue + } + + manifests, err := srcRepo.Manifests(ctx) + if err != nil { + errs = append(errs, retrieverError{src: src.ref, err: fmt.Errorf("unable to access source image %s manifests: %v", src.ref, err)}) + continue + } + + var tagErrs []retrieverError + var digestErrs []retrieverError + + // convert source tags to digests + for srcTag, pushTargets := range src.tags { + desc, err := srcRepo.Tags(ctx).Get(ctx, srcTag) + if err != nil { + tagErrs = append(tagErrs, retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s by tag: %v", src.ref, err)}) + continue + } + srcDigest := desc.Digest + fmt.Fprintf(o.Out, "Resolved source image tag %s to %s\n", src.ref, srcDigest) + src.mergeIntoDigests(srcDigest, pushTargets) + } + + canonicalFrom := srcRepo.Named() + + for srcDigestString, pushTargets := range src.digests { + // load the manifest + srcDigest := digest.Digest(srcDigestString) + var contentDigest digest.Digest + srcManifest, err := manifests.Get(ctx, digest.Digest(srcDigest), client.ReturnContentDigest(&contentDigest)) + if err != nil { + digestErrs = append(digestErrs, retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s manifest: %v", src.ref, err)}) + continue + } + + for _, dst := range pushTargets { + // if we are going to be using cross repository mount, get a token that covers the src + if src.ref.Registry == dst.ref.Registry { + toContext = toContext.WithScopes(auth.RepositoryScope{Repository: src.ref.RepositoryName(), Actions: []string{"pull"}}) + } + toClient := toContext.WithCredentials(creds) + + toRepo, err := toClient.Repository(ctx, dst.ref.DockerClientDefaults().RegistryURL(), dst.ref.RepositoryName(), o.Insecure) + if err != nil { + digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to connect to %s: %v", dst.ref, err)}) + continue + } + + canonicalTo := toRepo.Named() + fmt.Fprintf(o.Out, "Connecting to %s for %s\n", canonicalFrom, canonicalTo) + + toManifests, err := toRepo.Manifests(ctx) + if err != nil { + digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to access destination image %s manifests: %v", src.ref, err)}) + continue + } + + // if the destination tag already has this manifest, do nothing + var mustCopyLayers bool + if o.Force { + mustCopyLayers = true + } else { + if _, err := toManifests.Get(ctx, srcDigest); err != nil { + mustCopyLayers = true + } else { + glog.V(4).Infof("Manifest exists in %s, no need to copy layers without --force", dst.ref) + } + } + if mustCopyLayers { + fmt.Fprintf(o.Out, "Copying %s to %s (%d references)\n", src.ref, dst.ref, len(srcManifest.References())) + + // upload all the blobs + toBlobs := toRepo.Blobs(ctx) + srcBlobs := srcRepo.Blobs(ctx) + + // upload the config + switch t := srcManifest.(type) { + case *schema2.DeserializedManifest: + contents, err := srcBlobs.Get(ctx, t.Config.Digest) + if err != nil { + digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unreadable image config %s: %v", t.Config.Digest, err)}) + continue + } + desc, err := toBlobs.Put(ctx, t.Config.MediaType, contents) + if err != nil { + digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to upload manifest config to %s: %v", dst.ref, err)}) + continue + } + if desc.Digest != t.Config.Digest { + digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("the digest changed from %s to %s", contentDigest, desc.Digest)}) + continue + } + } + + for _, blob := range srcManifest.References() { + // tagging within the same registry is a no-op + if src.ref.Registry == dst.ref.Registry && canonicalFrom.String() == canonicalTo.String() { + continue + } + + var options []distribution.BlobCreateOption + blobSource, err := reference.WithDigest(canonicalFrom, blob.Digest) + if err != nil { + digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unexpected error building named digest: %v", err)}) + continue + } + if !o.SkipMount { + options = append(options, client.WithMountFrom(blobSource)) + } + + w, err := toBlobs.Create(ctx, options...) + if ebm, ok := err.(distribution.ErrBlobMounted); ok { + glog.V(5).Infof("Blob mounted %#v", blob) + if ebm.From.Digest() != blob.Digest { + digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to push %s: tried to mount blob %s src source and got back a different digest %s", src.ref, blob.Digest, ebm.From.Digest())}) + continue + } + break + } + if err != nil { + digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to upload blob %s to %s: %v", blob.Digest, dst.ref, err)}) + break + } + err = func() error { + glog.V(5).Infof("Uploading blob %s", blob.Digest) + defer w.Cancel(ctx) + r, err := srcBlobs.Open(ctx, blob.Digest) + if err != nil { + return fmt.Errorf("unable to open source layer %s to copy to %s: %v", blob.Digest, dst.ref, err) + } + defer r.Close() + fmt.Fprintf(o.Out, "Copying to %s (%d bytes)\n", blob.Digest, blob.Size) + n, err := w.ReadFrom(r) + if err != nil { + return fmt.Errorf("unable to copy layer %s to %s: %v", blob.Digest, dst.ref, err) + } + if n != blob.Size { + fmt.Fprintf(o.ErrOut, "warning: Layer size mismatch for %s: had %d, wrote %d\n", blob.Digest, blob.Size, n) + } + _, err = w.Commit(ctx, blob) + return err + }() + if err != nil { + digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: err}) + break + } + } + } + + if len(digestErrs) > 0 { + continue + } + + // upload and tag the manifest + for _, tag := range dst.tags { + toDigest, err := toManifests.Put(ctx, srcManifest, distribution.WithTag(tag)) + if err != nil { + digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to push manifest to %s: %v", dst.ref, err)}) + continue + } + fmt.Fprintf(o.Out, "Pushed to %s:%s as %s\n", dst.ref, tag, toDigest) + } + } + } + for _, err := range append(tagErrs, digestErrs...) { + errs = append(errs, err) + } + } + return kerrors.NewAggregate(errs) +} diff --git a/pkg/cmd/openshift/openshift.go b/pkg/cmd/openshift/openshift.go index 49c95630af7c..5078847990ee 100644 --- a/pkg/cmd/openshift/openshift.go +++ b/pkg/cmd/openshift/openshift.go @@ -16,6 +16,7 @@ import ( "github.com/openshift/origin/pkg/cmd/flagtypes" "github.com/openshift/origin/pkg/cmd/infra/builder" "github.com/openshift/origin/pkg/cmd/infra/deployer" + "github.com/openshift/origin/pkg/cmd/infra/pusher" irouter "github.com/openshift/origin/pkg/cmd/infra/router" "github.com/openshift/origin/pkg/cmd/recycle" "github.com/openshift/origin/pkg/cmd/server/start" @@ -145,6 +146,7 @@ func NewCommandOpenShift(name string) *cobra.Command { builder.NewCommandDockerBuilder("docker-build"), diagnostics.NewCommandPodDiagnostics("diagnostic-pod", out), diagnostics.NewCommandNetworkPodDiagnostics("network-diagnostic-pod", out), + pusher.NewCommandPusher("push"), ) root.AddCommand(infra) diff --git a/pkg/image/importer/client.go b/pkg/image/importer/client.go index 8bf0a2205eb2..80cdee42dc25 100644 --- a/pkg/image/importer/client.go +++ b/pkg/image/importer/client.go @@ -119,6 +119,9 @@ func (r *repositoryRetriever) Repository(ctx gocontext.Context, registry *url.UR t = r.context.InsecureTransport } src := *registry + if len(src.Scheme) == 0 { + src.Scheme = "https" + } // ping the registry to get challenge headers if err, ok := r.pings[src]; ok { if err != nil {