From 4de853a87cb236e3ef3c274e9ba0b095c6c50425 Mon Sep 17 00:00:00 2001 From: Cezar Sa Espinola Date: Fri, 8 Jan 2021 16:11:53 -0300 Subject: [PATCH] Initial containerd sidecar implementation --- .gitignore | 1 + deploy.go | 5 +- filesystem.go | 3 + go.mod | 4 + go.sum | 2 - integration_test.go | 2 +- internal/containerd/executor.go | 99 +++++++++++ internal/containerd/push.go | 157 +++++++++++++++++ internal/containerd/sidecar.go | 262 ++++++++++++++++++++++++++++ internal/docker/integration_test.go | 8 +- internal/docker/sidecar.go | 2 +- internal/sidecar/sidecar.go | 2 +- main.go | 10 +- 13 files changed, 544 insertions(+), 13 deletions(-) create mode 100644 .gitignore create mode 100644 internal/containerd/executor.go create mode 100644 internal/containerd/push.go create mode 100644 internal/containerd/sidecar.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d844c97 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +deploy-agent diff --git a/deploy.go b/deploy.go index 5c27e32..a99b9a2 100644 --- a/deploy.go +++ b/deploy.go @@ -70,11 +70,12 @@ func pushSidecar(ctx context.Context, sc sidecar.Sidecar, config Config, regConf return nil } fmt.Fprintln(w, "---- Building application image ----") - imgID, err := sc.Commit(ctx, config.DestinationImages[0]) + _, err := sc.Commit(ctx, config.DestinationImages[0]) if err != nil { return fmt.Errorf("failed to commit main container: %v", err) } - return sc.TagAndPush(ctx, imgID, config.DestinationImages, regConfig, w) + fmt.Fprintln(w, "---- Pushing application image ----") + return sc.TagAndPush(ctx, config.DestinationImages[0], config.DestinationImages, regConfig, w) } func inspect(ctx context.Context, sc sidecar.Sidecar, image string, filesystem Filesystem, w io.Writer, errW io.Writer) error { diff --git a/filesystem.go b/filesystem.go index 4060029..ea9fef7 100644 --- a/filesystem.go +++ b/filesystem.go @@ -83,6 +83,9 @@ func (f *executorFS) CheckFile(name string) (bool, error) { if strings.Contains(strings.ToLower(errOut), "no such") { return false, nil } + if strings.Contains(strings.ToLower(err.Error()), "no such file") { + return false, nil + } return false, fmt.Errorf("error checking file %v: %v. Output: %v", name, err, errOut) } return true, nil diff --git a/go.mod b/go.mod index 76b952c..7af6d22 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,12 @@ require ( github.com/fsouza/go-dockerclient v1.7.0 github.com/ghodss/yaml v1.0.0 github.com/kelseyhightower/envconfig v1.3.0 + github.com/opencontainers/image-spec v1.0.1 + github.com/opencontainers/runtime-spec v1.0.3-0.20200728170252-4d89ac9fbff6 + github.com/pkg/errors v0.9.1 github.com/tsuru/commandmocker v0.0.0-20160909010208-e1d28f4f616a // indirect github.com/tsuru/tsuru v0.0.0-20171023121507-c91725578089 + golang.org/x/sync v0.0.0-20201207232520-09787c993a3a gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f ) diff --git a/go.sum b/go.sum index 0abc1a7..25acf6d 100644 --- a/go.sum +++ b/go.sum @@ -339,8 +339,6 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fsouza/go-dockerclient v1.7.0 h1:Ie1/8pAnBHNyCbSIDnYKBdXUEobk4AeJhWZz7k6rWfc= -github.com/fsouza/go-dockerclient v1.7.0/go.mod h1:Ny0LfP7OOsYu9nAi4339E4Ifor6nGBFO2M8lnd2nR+c= github.com/fullsailor/pkcs7 v0.0.0-20190404230743-d7302db945fa/go.mod h1:KnogPXtdwXqoenmZCw6S+25EAm2MkxbG0deNDu4cbSA= github.com/garyburd/redigo v0.0.0-20150301180006-535138d7bcd7/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= diff --git a/integration_test.go b/integration_test.go index c9b6629..7bd792c 100644 --- a/integration_test.go +++ b/integration_test.go @@ -41,7 +41,7 @@ hooks: - ps ` - executor := sc.Executor() + executor := sc.Executor(context.Background()) err = executor.Execute(exec.ExecuteOptions{ Cmd: "/bin/sh", diff --git a/internal/containerd/executor.go b/internal/containerd/executor.go new file mode 100644 index 0000000..128b1b6 --- /dev/null +++ b/internal/containerd/executor.go @@ -0,0 +1,99 @@ +package containerd + +import ( + "context" + "crypto" + "crypto/rand" + "fmt" + "io" + "io/ioutil" + + "github.com/containerd/containerd/cio" + "github.com/opencontainers/runtime-spec/specs-go" + "github.com/tsuru/tsuru/exec" +) + +var _ exec.Executor = &containerdExecutor{} + +type containerdExecutor struct { + sidecar *containerdSidecar + ctx context.Context +} + +func (e *containerdExecutor) Execute(opts exec.ExecuteOptions) error { + return e.ExecuteAsUser(e.sidecar.user, opts) +} + +func (e *containerdExecutor) IsRemote() bool { + return true +} + +func (e *containerdExecutor) ExecuteAsUser(user string, opts exec.ExecuteOptions) error { + fullCmd := append([]string{opts.Cmd}, opts.Args...) + + container, err := e.sidecar.client.LoadContainer(e.ctx, e.sidecar.primaryContainerID) + if err != nil { + return err + } + spec, err := container.Spec(e.ctx) + if err != nil { + return err + } + pspec := spec.Process + pspec.Args = fullCmd + if user != "" { + pspec.User = specs.User{ + Username: user, + } + } + pspec.Env = append(pspec.Env, opts.Envs...) + if opts.Dir != "" { + pspec.Cwd = opts.Dir + } + + task, err := container.Task(e.ctx, nil) + if err != nil { + return err + } + + if opts.Stdout == nil { + opts.Stdout = ioutil.Discard + } + if opts.Stderr == nil { + opts.Stderr = ioutil.Discard + } + ioCreator := cio.NewCreator(cio.WithStreams(opts.Stdin, opts.Stdout, opts.Stderr)) + + execID := "exec-" + randID() + process, err := task.Exec(e.ctx, execID, pspec, ioCreator) + if err != nil { + return err + } + defer process.Delete(e.ctx) + + statusCh, err := process.Wait(e.ctx) + if err != nil { + return err + } + + err = process.Start(e.ctx) + if err != nil { + return err + } + + select { + case status := <-statusCh: + if status.ExitCode() != 0 { + return fmt.Errorf("unexpected exit code %#+v while running %v", status.ExitCode(), fullCmd) + } + case <-e.ctx.Done(): + return e.ctx.Err() + } + return nil +} + +func randID() string { + h := crypto.SHA1.New() + io.CopyN(h, rand.Reader, 10) + return fmt.Sprintf("%x", h.Sum(nil))[:20] +} diff --git a/internal/containerd/push.go b/internal/containerd/push.go new file mode 100644 index 0000000..4e79ffd --- /dev/null +++ b/internal/containerd/push.go @@ -0,0 +1,157 @@ +/* + Copyright The containerd Authors. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package containerd + +import ( + "context" + "encoding/json" + "io" + "sync" + "time" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "golang.org/x/sync/errgroup" +) + +// This file mostly contains unexported code copied from containerd/containerd. +// Ref: +// https://github.com/containerd/containerd/blob/a4f4a4311022e9dddeff5ad6d633847aaa32d4c4/cmd/ctr/commands/images/push.go +// The copyright of this file belongs to containerd authors as stated on the +// header above. The copied code is useful for displaying progress information +// during image push operations. + +func pushWithProgress(ctx context.Context, client *containerd.Client, ref string, target ocispec.Descriptor, tracker docker.StatusTracker, resolver remotes.Resolver, w io.Writer) error { + ongoing := newPushJobs(tracker) + + eg, ctx := errgroup.WithContext(ctx) + + // used to notify the progress writer + doneCh := make(chan struct{}) + + eg.Go(func() error { + defer close(doneCh) + + jobHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + ongoing.add(remotes.MakeRefKey(ctx, desc)) + return nil, nil + }) + + return client.Push(ctx, ref, target, + containerd.WithResolver(resolver), + containerd.WithImageHandler(jobHandler), + ) + }) + + eg.Go(func() error { + var ( + ticker = time.NewTicker(100 * time.Millisecond) + fw = json.NewEncoder(w) + done bool + ) + + defer ticker.Stop() + + for { + select { + case <-ticker.C: + for _, st := range ongoing.status() { + fw.Encode(st) + } + + if done { + return nil + } + case <-doneCh: + done = true + case <-ctx.Done(): + done = true // allow ui to update once more + } + } + }) + + return eg.Wait() +} + +type pushjobs struct { + jobs map[string]struct{} + ordered []string + tracker docker.StatusTracker + mu sync.Mutex +} + +func newPushJobs(tracker docker.StatusTracker) *pushjobs { + return &pushjobs{ + jobs: make(map[string]struct{}), + tracker: tracker, + } +} + +func (j *pushjobs) add(ref string) { + j.mu.Lock() + defer j.mu.Unlock() + + if _, ok := j.jobs[ref]; ok { + return + } + j.ordered = append(j.ordered, ref) + j.jobs[ref] = struct{}{} +} + +type progressDetail struct { + Current int64 `json:"current"` + Total int64 `json:"total"` +} + +type imageProgess struct { + Status string `json:"status"` + ProgressDetail progressDetail `json:"progressDetail"` + Progress string `json:"progress"` + ID string `json:"id"` +} + +func (j *pushjobs) status() []imageProgess { + j.mu.Lock() + defer j.mu.Unlock() + + statuses := make([]imageProgess, 0, len(j.jobs)) + for _, name := range j.ordered { + si := imageProgess{ + ID: name, + } + + status, err := j.tracker.GetStatus(name) + if err != nil { + si.Status = "waiting" + } else { + si.ProgressDetail.Total = status.Total + si.ProgressDetail.Current = status.Offset + if status.Offset >= status.Total { + if status.UploadUUID == "" { + si.Status = "done" + } else { + si.Status = "committing" + } + } else { + si.Status = "uploading" + } + } + statuses = append(statuses, si) + } + + return statuses +} diff --git a/internal/containerd/sidecar.go b/internal/containerd/sidecar.go new file mode 100644 index 0000000..636c202 --- /dev/null +++ b/internal/containerd/sidecar.go @@ -0,0 +1,262 @@ +package containerd + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strings" + "time" + + "github.com/AkihiroSuda/nerdctl/pkg/imgutil/commit" + "github.com/containerd/containerd" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/mount" + refDocker "github.com/containerd/containerd/reference/docker" + remoteDocker "github.com/containerd/containerd/remotes/docker" + "github.com/pkg/errors" + "github.com/tsuru/deploy-agent/internal/sidecar" + "github.com/tsuru/tsuru/exec" +) + +const defaultContainerdAddress = "/run/containerd/containerd.sock" + +var _ sidecar.Sidecar = &containerdSidecar{} + +var pushHTTPClient = &http.Client{ + Transport: http.DefaultTransport, + Timeout: 10 * time.Minute, +} + +type containerdSidecar struct { + client *containerd.Client + primaryContainerID string + user string +} + +func NewSidecar(ctx context.Context, address string, user string) (sidecar.Sidecar, error) { + if address == "" { + address = defaultContainerdAddress + } + client, err := containerd.New(address, + containerd.WithDefaultNamespace("k8s.io"), + containerd.WithTimeout(10*time.Minute), + ) + if err != nil { + return nil, err + } + sc := containerdSidecar{ + client: client, + user: user, + } + if err = sc.setup(ctx); err != nil { + return nil, err + } + return &sc, nil +} + +func (s *containerdSidecar) setup(ctx context.Context) error { + hostname, err := os.Hostname() + if err != nil { + return errors.Wrap(err, "failed to get hostname") + } + + filter := fmt.Sprintf("labels.io.kubernetes.container.name==%s,labels.io.kubernetes.pod.name==%s", hostname, hostname) + + for { + conts, err := s.client.Containers(ctx, filter) + if err != nil { + return errors.Wrap(err, "failed to get containers") + } + + conts, err = s.filterRunningContainers(ctx, conts) + if err != nil { + return errors.Wrap(err, "failed to filter containers") + } + + if len(conts) == 1 { + s.primaryContainerID = conts[0].ID() + return nil + } + if len(conts) > 1 { + return errors.Errorf("too many containers matching filters: %d", len(conts)) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 1): + } + } +} + +func (s *containerdSidecar) filterRunningContainers(ctx context.Context, conts []containerd.Container) ([]containerd.Container, error) { + var result []containerd.Container + for _, c := range conts { + task, err := c.Task(ctx, nil) + if err != nil { + continue + } + status, err := task.Status(ctx) + if err != nil { + continue + } + if status.Status == containerd.Running { + result = append(result, c) + } + } + return result, nil +} + +func (s *containerdSidecar) Commit(ctx context.Context, image string) (string, error) { + imageRef, err := refDocker.ParseDockerRef(image) + if err != nil { + return "", err + } + digest, err := commit.Commit(ctx, s.client, s.primaryContainerID, &commit.Opts{ + Ref: imageRef.String(), + }) + if err != nil { + return "", err + } + return digest.String(), nil +} + +func (s *containerdSidecar) Upload(ctx context.Context, fileName string) error { + container, err := s.client.LoadContainer(ctx, s.primaryContainerID) + if err != nil { + return err + } + info, err := container.Info(ctx) + if err != nil { + return err + } + mounts, err := s.client.SnapshotService(info.Snapshotter).Mounts(ctx, s.primaryContainerID) + if err != nil { + return err + } + srcFile, err := os.Open(fileName) + if err != nil { + return err + } + defer srcFile.Close() + err = mount.WithTempMount(ctx, mounts, func(root string) error { + dstFileName := filepath.Join(root, srcFile.Name()) + var dstFile *os.File + dstFile, err = os.Create(dstFileName) + if err != nil { + return err + } + _, err = io.Copy(dstFile, srcFile) + if err != nil { + return err + } + return dstFile.Close() + }) + if err != nil { + return errors.Wrapf(err, "unable to mount %#v", mounts) + } + return nil +} + +func (s *containerdSidecar) BuildImage(ctx context.Context, fileName, image string) error { + // TODO(cezarsa): build must work, requires running buildkit daemon + return errors.New("build not supported yet") +} + +func (s *containerdSidecar) TagAndPush(ctx context.Context, baseImage string, destinationImages []string, reg sidecar.RegistryConfig, w io.Writer) error { + baseRef, err := refDocker.ParseDockerRef(baseImage) + if err != nil { + return err + } + image, err := s.client.ImageService().Get(ctx, baseRef.String()) + if err != nil { + return err + } + + authorizer := remoteDocker.NewDockerAuthorizer( + remoteDocker.WithAuthClient(pushHTTPClient), + remoteDocker.WithAuthCreds(func(string) (string, string, error) { + return reg.RegistryAuthUser, reg.RegistryAuthPass, nil + })) + registryHosts := remoteDocker.ConfigureDefaultRegistries( + remoteDocker.WithAuthorizer(authorizer), + remoteDocker.WithClient(pushHTTPClient), + remoteDocker.WithPlainHTTP(func(host string) (ret bool, err error) { + local, err := remoteDocker.MatchLocalhost(host) + if local { + return local, err + } + return strings.HasPrefix(host, "192.168."), nil + }), + ) + + tracker := remoteDocker.NewInMemoryTracker() + + resolver := remoteDocker.NewResolver(remoteDocker.ResolverOptions{ + Hosts: registryHosts, + Tracker: tracker, + }) + + for _, dstImgName := range destinationImages { + dstRef, err := refDocker.ParseDockerRef(dstImgName) + if err != nil { + return err + } + image.Name = dstRef.String() + _, err = s.client.ImageService().Create(ctx, image) + if err != nil && !errdefs.IsAlreadyExists(err) { + return err + } + err = pushWithProgress(ctx, s.client, dstRef.String(), image.Target, tracker, resolver, w) + if err != nil { + return err + } + } + return nil +} + +func (s *containerdSidecar) Inspect(ctx context.Context, imageName string) (*sidecar.ImageInspect, error) { + imageRef, err := refDocker.ParseDockerRef(imageName) + if err != nil { + return nil, err + } + image, err := s.client.GetImage(ctx, imageRef.String()) + if err != nil { + return nil, err + } + imgConfig, err := readImageConfig(ctx, image) + if err != nil { + return nil, err + } + if imgConfig.ID == "" { + imgConfig.ID = image.Name() + } + return imgConfig, nil +} + +func readImageConfig(ctx context.Context, img containerd.Image) (*sidecar.ImageInspect, error) { + configDesc, err := img.Config(ctx) + if err != nil { + return nil, err + } + + p, err := content.ReadBlob(ctx, img.ContentStore(), configDesc) + if err != nil { + return nil, err + } + + var config sidecar.ImageInspect + if err := json.Unmarshal(p, &config); err != nil { + return nil, err + } + return &config, nil +} + +func (s *containerdSidecar) Executor(ctx context.Context) exec.Executor { + return &containerdExecutor{sidecar: s, ctx: ctx} +} diff --git a/internal/docker/integration_test.go b/internal/docker/integration_test.go index 5fcc1ed..54aac94 100644 --- a/internal/docker/integration_test.go +++ b/internal/docker/integration_test.go @@ -31,7 +31,7 @@ func (s *S) TestSidecarUploadToPrimaryContainerIntegration(c *check.C) { outBuff := new(bytes.Buffer) errBuff := new(bytes.Buffer) - err = sidecar.Executor().Execute(exec.ExecuteOptions{ + err = sidecar.Executor(context.Background()).Execute(exec.ExecuteOptions{ Cmd: "/bin/sh", Args: []string{"-lc", "cat /testdata/file.txt"}, Stdout: outBuff, @@ -53,7 +53,7 @@ func (s *S) TestSidecarExecuteIntegration(c *check.C) { sidecar, err := NewSidecar("", "") c.Assert(err, check.IsNil) - err = sidecar.Executor().Execute(exec.ExecuteOptions{ + err = sidecar.Executor(context.Background()).Execute(exec.ExecuteOptions{ Dir: "/", Cmd: "/bin/sh", Args: []string{"-c", `echo '#!/bin/bash -el' >/tmp/myscript; echo 'echo hey; exit 0; echo done' >>/tmp/myscript; chmod +x /tmp/myscript`}, @@ -153,7 +153,7 @@ func (s *S) TestSidecarExecuteIntegration(c *check.C) { for _, t := range tt { outBuff := new(bytes.Buffer) errBuff := new(bytes.Buffer) - err = sidecar.Executor().Execute(exec.ExecuteOptions{ + err = sidecar.Executor(context.Background()).Execute(exec.ExecuteOptions{ Cmd: t.Cmd, Args: t.Args, Envs: t.Envs, @@ -196,7 +196,7 @@ func (s *S) TestSidecarExecuteAsUserIntegration(c *check.C) { outBuff := new(bytes.Buffer) errBuff := new(bytes.Buffer) - executor := sidecar.Executor() + executor := sidecar.Executor(context.Background()) asUserExec, ok := executor.(interface { ExecuteAsUser(string, exec.ExecuteOptions) error }) diff --git a/internal/docker/sidecar.go b/internal/docker/sidecar.go index 34d3cc5..44366f8 100644 --- a/internal/docker/sidecar.go +++ b/internal/docker/sidecar.go @@ -46,7 +46,7 @@ func NewSidecar(dockerHost string, user string) (sidecar.Sidecar, error) { return &sc, nil } -func (s *dockerSidecar) Executor() exec.Executor { +func (s *dockerSidecar) Executor(ctx context.Context) exec.Executor { return &s.executor } diff --git a/internal/sidecar/sidecar.go b/internal/sidecar/sidecar.go index 7ebb6a5..4b14621 100644 --- a/internal/sidecar/sidecar.go +++ b/internal/sidecar/sidecar.go @@ -29,5 +29,5 @@ type Sidecar interface { BuildImage(ctx context.Context, fileName, image string) error TagAndPush(ctx context.Context, baseImage string, destinationImages []string, reg RegistryConfig, w io.Writer) error Inspect(ctx context.Context, image string) (*ImageInspect, error) - Executor() exec.Executor + Executor(ctx context.Context) exec.Executor } diff --git a/main.go b/main.go index 13ec8f4..4680eb6 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "os" "github.com/kelseyhightower/envconfig" + "github.com/tsuru/deploy-agent/internal/containerd" "github.com/tsuru/deploy-agent/internal/docker" "github.com/tsuru/deploy-agent/internal/sidecar" "github.com/tsuru/deploy-agent/internal/tsuru" @@ -22,6 +23,7 @@ const version = "0.8.4" type Config struct { DockerHost string `envconfig:"DOCKER_HOST"` + ContainerdAddress string `envconfig:"CONTAINERD_ADDRESS"` RunAsSidecar bool `split_words:"true"` DestinationImages []string `split_words:"true"` SourceImage string `split_words:"true"` @@ -76,7 +78,11 @@ func runAgent() error { if config.RunAsSidecar { sc, err = docker.NewSidecar(config.DockerHost, config.RunAsUser) if err != nil { - return fmt.Errorf("failed to setup sidecar: %v", err) + var containerdErr error + sc, containerdErr = containerd.NewSidecar(ctx, config.ContainerdAddress, config.RunAsUser) + if containerdErr != nil { + return fmt.Errorf("failed to initialize both docker and containerd: docker error: %v, containerd error: %v", err, containerdErr) + } } if config.DockerfileBuild { @@ -92,7 +98,7 @@ func runAgent() error { } } - executor = sc.Executor() + executor = sc.Executor(ctx) filesystem = &executorFS{executor: executor} if config.SourceImage != "" {