diff --git a/cmd/buildkitd/main.go b/cmd/buildkitd/main.go index 67ebdc7791a3..400f922a8936 100644 --- a/cmd/buildkitd/main.go +++ b/cmd/buildkitd/main.go @@ -677,8 +677,8 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err return nil, err } frontends := map[string]frontend.Frontend{} - frontends["dockerfile.v0"] = forwarder.NewGatewayForwarder(wc, dockerfile.Build) - frontends["gateway.v0"] = gateway.NewGatewayFrontend(wc) + frontends["dockerfile.v0"] = forwarder.NewGatewayForwarder(wc.Infos(), dockerfile.Build) + frontends["gateway.v0"] = gateway.NewGatewayFrontend(wc.Infos()) cacheStorage, err := bboltcachestorage.NewStore(filepath.Join(cfg.Root, "cache.db")) if err != nil { diff --git a/executor/executor.go b/executor/executor.go index 741f347cd9ca..69237cbf97ca 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -6,8 +6,9 @@ import ( "net" "syscall" + "github.com/containerd/containerd/mount" + "github.com/docker/docker/pkg/idtools" resourcestypes "github.com/moby/buildkit/executor/resources/types" - "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver/pb" ) @@ -28,8 +29,13 @@ type Meta struct { RemoveMountStubsRecursive bool } +type MountableRef interface { + Mount() ([]mount.Mount, func() error, error) + IdentityMapping() *idtools.IdentityMapping +} + type Mountable interface { - Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) + Mount(ctx context.Context, readonly bool) (MountableRef, error) } type Mount struct { diff --git a/frontend/frontend.go b/frontend/frontend.go index fb89a8414afa..6152ee36b9c1 100644 --- a/frontend/frontend.go +++ b/frontend/frontend.go @@ -4,6 +4,7 @@ import ( "context" "github.com/moby/buildkit/client/llb" + "github.com/moby/buildkit/executor" gw "github.com/moby/buildkit/frontend/gateway/client" "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" @@ -17,7 +18,7 @@ type Result = result.Result[solver.ResultProxy] type Attestation = result.Attestation[solver.ResultProxy] type Frontend interface { - Solve(ctx context.Context, llb FrontendLLBBridge, opt map[string]string, inputs map[string]*pb.Definition, sid string, sm *session.Manager) (*Result, error) + Solve(ctx context.Context, llb FrontendLLBBridge, exec executor.Executor, opt map[string]string, inputs map[string]*pb.Definition, sid string, sm *session.Manager) (*Result, error) } type FrontendLLBBridge interface { diff --git a/frontend/gateway/container/container.go b/frontend/gateway/container/container.go index af6476e7fce2..155d9f4fead0 100644 --- a/frontend/gateway/container/container.go +++ b/frontend/gateway/container/container.go @@ -47,7 +47,7 @@ type Mount struct { WorkerRef *worker.WorkerRef } -func NewContainer(ctx context.Context, w worker.Worker, sm *session.Manager, g session.Group, req NewContainerRequest) (client.Container, error) { +func NewContainer(ctx context.Context, cm cache.Manager, exec executor.Executor, sm *session.Manager, g session.Group, req NewContainerRequest) (client.Container, error) { ctx, cancel := context.WithCancel(ctx) eg, ctx := errgroup.WithContext(ctx) platform := opspb.Platform{ @@ -63,7 +63,7 @@ func NewContainer(ctx context.Context, w worker.Worker, sm *session.Manager, g s hostname: req.Hostname, extraHosts: req.ExtraHosts, platform: platform, - executor: w.Executor(), + executor: exec, sm: sm, group: g, errGroup: eg, @@ -86,9 +86,8 @@ func NewContainer(ctx context.Context, w worker.Worker, sm *session.Manager, g s } name := fmt.Sprintf("container %s", req.ContainerID) - mm := mounts.NewMountManager(name, w.CacheManager(), sm) - p, err := PrepareMounts(ctx, mm, w.CacheManager(), g, "", mnts, refs, func(m *opspb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) { - cm := w.CacheManager() + mm := mounts.NewMountManager(name, cm, sm) + p, err := PrepareMounts(ctx, mm, cm, g, "", mnts, refs, func(m *opspb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) { if m.Input != opspb.Empty { cm = refs[m.Input].Worker.CacheManager() } diff --git a/frontend/gateway/forwarder/forward.go b/frontend/gateway/forwarder/forward.go index 0f4da47cbde6..cc8201c74feb 100644 --- a/frontend/gateway/forwarder/forward.go +++ b/frontend/gateway/forwarder/forward.go @@ -6,6 +6,7 @@ import ( cacheutil "github.com/moby/buildkit/cache/util" "github.com/moby/buildkit/client/llb" + "github.com/moby/buildkit/executor" "github.com/moby/buildkit/frontend" "github.com/moby/buildkit/frontend/gateway/client" "github.com/moby/buildkit/frontend/gateway/container" @@ -26,7 +27,7 @@ import ( "golang.org/x/sync/errgroup" ) -func LLBBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, w worker.Infos, sid string, sm *session.Manager) (*BridgeClient, error) { +func LLBBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, opts map[string]string, inputs map[string]*opspb.Definition, w worker.Infos, sid string, sm *session.Manager) (*BridgeClient, error) { bc := &BridgeClient{ opts: opts, inputs: inputs, @@ -35,6 +36,7 @@ func LLBBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLL sm: sm, workers: w, workerRefByID: make(map[string]*worker.WorkerRef), + executor: exec, } bc.buildOpts = bc.loadBuildOpts() return bc, nil @@ -52,6 +54,7 @@ type BridgeClient struct { workerRefByID map[string]*worker.WorkerRef buildOpts client.BuildOpts ctrs []client.Container + executor executor.Executor } func (c *BridgeClient) Solve(ctx context.Context, req client.SolveRequest) (*client.Result, error) { @@ -293,13 +296,13 @@ func (c *BridgeClient) NewContainer(ctx context.Context, req client.NewContainer return nil, err } - w, err := c.workers.GetDefault() + cm, err := c.workers.DefaultCacheManager() if err != nil { return nil, err } group := session.NewGroup(c.sid) - ctr, err := container.NewContainer(ctx, w, c.sm, group, ctrReq) + ctr, err := container.NewContainer(ctx, cm, c.executor, c.sm, group, ctrReq) if err != nil { return nil, err } diff --git a/frontend/gateway/forwarder/frontend.go b/frontend/gateway/forwarder/frontend.go index ae144162c939..9b6381df517c 100644 --- a/frontend/gateway/forwarder/frontend.go +++ b/frontend/gateway/forwarder/frontend.go @@ -3,6 +3,7 @@ package forwarder import ( "context" + "github.com/moby/buildkit/executor" "github.com/moby/buildkit/frontend" "github.com/moby/buildkit/frontend/gateway/client" "github.com/moby/buildkit/session" @@ -22,8 +23,8 @@ type GatewayForwarder struct { f client.BuildFunc } -func (gf *GatewayForwarder) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*pb.Definition, sid string, sm *session.Manager) (retRes *frontend.Result, retErr error) { - c, err := LLBBridgeToGatewayClient(ctx, llbBridge, opts, inputs, gf.workers, sid, sm) +func (gf *GatewayForwarder) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, opts map[string]string, inputs map[string]*pb.Definition, sid string, sm *session.Manager) (retRes *frontend.Result, retErr error) { + c, err := LLBBridgeToGatewayClient(ctx, llbBridge, exec, opts, inputs, gf.workers, sid, sm) if err != nil { return nil, err } diff --git a/frontend/gateway/gateway.go b/frontend/gateway/gateway.go index 25b875922bd9..9112736325ed 100644 --- a/frontend/gateway/gateway.go +++ b/frontend/gateway/gateway.go @@ -86,7 +86,7 @@ func filterPrefix(opts map[string]string, pfx string) map[string]string { return m } -func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*frontend.Result, error) { +func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, opts map[string]string, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*frontend.Result, error) { source, ok := opts[keySource] if !ok { return nil, errors.Errorf("no source specified for gateway") @@ -141,7 +141,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten } } } else { - c, err := forwarder.LLBBridgeToGatewayClient(ctx, llbBridge, opts, inputs, gf.workers, sid, sm) + c, err := forwarder.LLBBridgeToGatewayClient(ctx, llbBridge, exec, opts, inputs, gf.workers, sid, sm) if err != nil { return nil, err } @@ -281,18 +281,13 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten } } - lbf, ctx, err := serveLLBBridgeForwarder(ctx, llbBridge, gf.workers, inputs, sid, sm) + lbf, ctx, err := serveLLBBridgeForwarder(ctx, llbBridge, exec, gf.workers, inputs, sid, sm) defer lbf.conn.Close() //nolint if err != nil { return nil, err } defer lbf.Discard() - w, err := gf.workers.GetDefault() - if err != nil { - return nil, err - } - mdmnt, release, err := metadataMount(frontendDef) if err != nil { return nil, err @@ -305,7 +300,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten mnts = append(mnts, *mdmnt) } - _, err = w.Executor().Run(ctx, "", container.MountWithSession(rootFS, session.NewGroup(sid)), mnts, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil) + _, err = exec.Run(ctx, "", container.MountWithSession(rootFS, session.NewGroup(sid)), mnts, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil) if err != nil { if errdefs.IsCanceled(ctx, err) && lbf.isErrServerClosed { err = errors.Errorf("frontend grpc server closed unexpectedly") @@ -434,11 +429,11 @@ func (lbf *llbBridgeForwarder) Result() (*frontend.Result, error) { return lbf.result, nil } -func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) LLBBridgeForwarder { - return newBridgeForwarder(ctx, llbBridge, workers, inputs, sid, sm) +func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) LLBBridgeForwarder { + return newBridgeForwarder(ctx, llbBridge, exec, workers, inputs, sid, sm) } -func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) *llbBridgeForwarder { +func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) *llbBridgeForwarder { lbf := &llbBridgeForwarder{ callCtx: ctx, llbBridge: llbBridge, @@ -451,13 +446,14 @@ func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridg sid: sid, sm: sm, ctrs: map[string]gwclient.Container{}, + executor: exec, } return lbf } -func serveLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*llbBridgeForwarder, context.Context, error) { +func serveLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*llbBridgeForwarder, context.Context, error) { ctx, cancel := context.WithCancel(ctx) - lbf := newBridgeForwarder(ctx, llbBridge, workers, inputs, sid, sm) + lbf := newBridgeForwarder(ctx, llbBridge, exec, workers, inputs, sid, sm) server := grpc.NewServer(grpc.UnaryInterceptor(grpcerrors.UnaryServerInterceptor), grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor)) grpc_health_v1.RegisterHealthServer(server, health.NewServer()) pb.RegisterLLBBridgeServer(server, lbf) @@ -552,6 +548,7 @@ type llbBridgeForwarder struct { isErrServerClosed bool sid string sm *session.Manager + executor executor.Executor *pipe ctrs map[string]gwclient.Container ctrsMu sync.Mutex @@ -1042,7 +1039,7 @@ func (lbf *llbBridgeForwarder) NewContainer(ctx context.Context, in *pb.NewConta // and we want the context to live for the duration of the container. group := session.NewGroup(lbf.sid) - w, err := lbf.workers.GetDefault() + cm, err := lbf.workers.DefaultCacheManager() if err != nil { return nil, stack.Enable(err) } @@ -1052,7 +1049,7 @@ func (lbf *llbBridgeForwarder) NewContainer(ctx context.Context, in *pb.NewConta return nil, stack.Enable(err) } - ctr, err := container.NewContainer(context.Background(), w, lbf.sm, group, ctrReq) + ctr, err := container.NewContainer(context.Background(), cm, lbf.executor, lbf.sm, group, ctrReq) if err != nil { return nil, stack.Enable(err) } diff --git a/snapshot/snapshotter.go b/snapshot/snapshotter.go index f5c59f173545..089479991183 100644 --- a/snapshot/snapshotter.go +++ b/snapshot/snapshotter.go @@ -10,14 +10,11 @@ import ( "github.com/containerd/containerd/pkg/userns" "github.com/containerd/containerd/snapshots" "github.com/docker/docker/pkg/idtools" + "github.com/moby/buildkit/executor" "github.com/pkg/errors" ) -type Mountable interface { - // ID() string - Mount() ([]mount.Mount, func() error, error) - IdentityMapping() *idtools.IdentityMapping -} +type Mountable = executor.MountableRef // Snapshotter defines interface that any snapshot implementation should satisfy type Snapshotter interface { diff --git a/solver/llbsolver/bridge.go b/solver/llbsolver/bridge.go index db851a8f1f1a..558a2e3efef5 100644 --- a/solver/llbsolver/bridge.go +++ b/solver/llbsolver/bridge.go @@ -11,6 +11,8 @@ import ( "github.com/moby/buildkit/cache/remotecache" "github.com/moby/buildkit/client" "github.com/moby/buildkit/client/llb" + "github.com/moby/buildkit/executor" + resourcestypes "github.com/moby/buildkit/executor/resources/types" "github.com/moby/buildkit/frontend" gw "github.com/moby/buildkit/frontend/gateway/client" "github.com/moby/buildkit/identity" @@ -39,6 +41,10 @@ type llbBridge struct { cms map[string]solver.CacheManager cmsMu sync.Mutex sm *session.Manager + + executorOnce sync.Once + executorErr error + executor executor.Executor } func (b *llbBridge) Warn(ctx context.Context, dgst digest.Digest, msg string, opts frontend.WarnOpts) error { @@ -159,6 +165,32 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp return res, nil } +func (b *llbBridge) Run(ctx context.Context, id string, rootfs executor.Mount, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (resourcestypes.Recorder, error) { + if err := b.loadExecutor(); err != nil { + return nil, err + } + return b.executor.Run(ctx, id, rootfs, mounts, process, started) +} + +func (b *llbBridge) Exec(ctx context.Context, id string, process executor.ProcessInfo) error { + if err := b.loadExecutor(); err != nil { + return err + } + return b.executor.Exec(ctx, id, process) +} + +func (b *llbBridge) loadExecutor() error { + b.executorOnce.Do(func() { + w, err := b.resolveWorker() + if err != nil { + b.executorErr = err + return + } + b.executor = w.Executor() + }) + return b.executorErr +} + type resultProxy struct { id string b *provenanceBridge diff --git a/solver/llbsolver/provenance.go b/solver/llbsolver/provenance.go index 26abf78d1c6c..665c678adcd2 100644 --- a/solver/llbsolver/provenance.go +++ b/solver/llbsolver/provenance.go @@ -165,7 +165,7 @@ func (b *provenanceBridge) Solve(ctx context.Context, req frontend.SolveRequest, return nil, errors.Errorf("invalid frontend: %s", req.Frontend) } wb := &provenanceBridge{llbBridge: b.llbBridge, req: &req} - res, err = f.Solve(ctx, wb, req.FrontendOpt, req.FrontendInputs, sid, b.llbBridge.sm) + res, err = f.Solve(ctx, wb, b.llbBridge, req.FrontendOpt, req.FrontendInputs, sid, b.llbBridge.sm) if err != nil { return nil, err } diff --git a/solver/llbsolver/solver.go b/solver/llbsolver/solver.go index e342dad8294c..00b88b8159a7 100644 --- a/solver/llbsolver/solver.go +++ b/solver/llbsolver/solver.go @@ -458,7 +458,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro br := s.bridge(j) var fwd gateway.LLBBridgeForwarder if s.gatewayForwarder != nil && req.Definition == nil && req.Frontend == "" { - fwd = gateway.NewBridgeForwarder(ctx, br, s.workerController, req.FrontendInputs, sessionID, s.sm) + fwd = gateway.NewBridgeForwarder(ctx, br, br, s.workerController.Infos(), req.FrontendInputs, sessionID, s.sm) defer fwd.Discard() // Register build before calling s.recordBuildHistory, because // s.recordBuildHistory can block for several seconds on diff --git a/worker/worker.go b/worker/worker.go index d62047e9fb5f..8a12585ed9a9 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -43,6 +43,6 @@ type Worker interface { } type Infos interface { - GetDefault() (Worker, error) + DefaultCacheManager() (cache.Manager, error) WorkerInfos() []client.WorkerInfo } diff --git a/worker/workercontroller.go b/worker/workercontroller.go index e175b4002b4a..150eed352a3a 100644 --- a/worker/workercontroller.go +++ b/worker/workercontroller.go @@ -3,6 +3,7 @@ package worker import ( "github.com/containerd/containerd/filters" "github.com/hashicorp/go-multierror" + "github.com/moby/buildkit/cache" "github.com/moby/buildkit/client" "github.com/pkg/errors" ) @@ -81,3 +82,25 @@ func (c *Controller) WorkerInfos() []client.WorkerInfo { } return out } + +func (c *Controller) Infos() Infos { + return &infosController{c: c} +} + +type infosController struct { + c *Controller +} + +var _ Infos = &infosController{} + +func (c *infosController) DefaultCacheManager() (cache.Manager, error) { + w, err := c.c.GetDefault() + if err != nil { + return nil, err + } + return w.CacheManager(), nil +} + +func (c *infosController) WorkerInfos() []client.WorkerInfo { + return c.c.WorkerInfos() +}