From 43b05cf68e08da52d7fd121d7a9dc22a2528d42a Mon Sep 17 00:00:00 2001 From: Edgar Lee Date: Mon, 1 Apr 2019 14:31:37 -0700 Subject: [PATCH] Recursively pin and unpin data for ipfs + containerd 2 layer GC --- cmd/convert/main.go | 7 ------- converter.go | 3 ++- ingester.go | 38 ++++++++++++++++++++++---------------- manager.go | 12 ++++++++++-- 4 files changed, 34 insertions(+), 26 deletions(-) diff --git a/cmd/convert/main.go b/cmd/convert/main.go index a9329c8..10ff798 100644 --- a/cmd/convert/main.go +++ b/cmd/convert/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/json" "io" "io/ioutil" "log" @@ -171,12 +170,6 @@ func PullByDescriptor(ctx context.Context, ipfsCln iface.CoreAPI, ctrdCln *conta return errors.Wrap(err, "failed to get manifest") } - mfstJSON, err := json.MarshalIndent(mfst, "", " ") - if err != nil { - return errors.Wrap(err, "failed to marshal manifest JSON") - } - log.Printf("Pulled Manifest [%d]:\n%s", len(mfstJSON), mfstJSON) - err = contentutil.Copy(ctx, ingester, provider, mfst.Config) if err != nil { return errors.Wrapf(err, "failed to ingest manifest config blob %q", mfst.Config.Digest) diff --git a/converter.go b/converter.go index a19b024..6768647 100644 --- a/converter.go +++ b/converter.go @@ -11,6 +11,7 @@ import ( "github.com/hinshun/ipcs/digestconv" files "github.com/ipfs/go-ipfs-files" iface "github.com/ipfs/interface-go-ipfs-core" + "github.com/ipfs/interface-go-ipfs-core/options" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -91,7 +92,7 @@ func copyFile(ctx context.Context, cln iface.CoreAPI, provider content.Provider, } func addFile(ctx context.Context, cln iface.CoreAPI, n files.Node) (digest.Digest, error) { - p, err := cln.Unixfs().Add(ctx, n) + p, err := cln.Unixfs().Add(ctx, n, options.Unixfs.Pin(true)) if err != nil { return "", errors.Wrap(err, "failed to put blob to ipfs") } diff --git a/ingester.go b/ingester.go index 094cee2..e557692 100644 --- a/ingester.go +++ b/ingester.go @@ -10,6 +10,7 @@ import ( "github.com/hinshun/ipcs/digestconv" files "github.com/ipfs/go-ipfs-files" iface "github.com/ipfs/interface-go-ipfs-core" + "github.com/ipfs/interface-go-ipfs-core/options" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) @@ -35,11 +36,10 @@ func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content. } w := &writer{ - ctx: ctx, - cln: s.cln, - ref: wOpts.Ref, - total: wOpts.Desc.Size, - expected: wOpts.Desc.Digest, + ctx: ctx, + cln: s.cln, + ref: wOpts.Ref, + total: wOpts.Desc.Size, } err := w.Truncate(0) @@ -56,8 +56,7 @@ type writer struct { ref string offset int64 total int64 - committed bool - expected digest.Digest + dgst digest.Digest startedAt time.Time updatedAt time.Time pw io.Writer @@ -92,11 +91,7 @@ func (w *writer) Close() error { // Digest may return empty digest or panics until committed. func (w *writer) Digest() digest.Digest { - if w.committed { - return w.expected - } - - return "" + return w.dgst } // Commit commits the blob (but no roll-back is guaranteed on an error). @@ -108,11 +103,10 @@ func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, return errors.Wrapf(errdefs.ErrFailedPrecondition, "unexpected commit size %d, expected %d", w.offset, size) } - if expected != "" && expected != w.expected { - return errors.Wrapf(errdefs.ErrFailedPrecondition, "unexpected commit digest %s, expected %s", w.expected, expected) + if expected != "" && expected != w.dgst { + return errors.Wrapf(errdefs.ErrFailedPrecondition, "unexpected commit digest %s, expected %s", w.dgst, expected) } - w.committed = true return w.Close() } @@ -145,7 +139,19 @@ func (w *writer) Truncate(size int64) error { ctx, cancel := context.WithCancel(w.ctx) w.ipfsErr = nil go func() { - _, w.ipfsErr = w.cln.Unixfs().Add(ctx, files.NewReaderFile(r)) + p, err := w.cln.Unixfs().Add(ctx, files.NewReaderFile(r), options.Unixfs.Pin(true)) + if err != nil { + w.ipfsErr = err + return + } + + dgst, err := digestconv.CidToDigest(p.Cid()) + if err != nil { + w.ipfsErr = err + return + } + + w.dgst = dgst }() w.cancel = func() error { diff --git a/manager.go b/manager.go index 2a9cb07..7602203 100644 --- a/manager.go +++ b/manager.go @@ -8,6 +8,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/hinshun/ipcs/digestconv" iface "github.com/ipfs/interface-go-ipfs-core" + "github.com/ipfs/interface-go-ipfs-core/options" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) @@ -53,7 +54,12 @@ func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...str // match the provided filters. If no filters are given all // items will be walked. func (s *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error { - pins, err := s.cln.Pin().Ls(ctx) + // TODO: Filters are also not supported in containerd's local store. + // Since we replace the local store, and filters are implemented in the boltdb + // metadata that wraps the local store, we can wait until upstream supports + // it too. + + pins, err := s.cln.Pin().Ls(ctx, options.Pin.Type.All()) if err != nil { return errors.Wrap(err, "failed to list ipfs pins") } @@ -86,7 +92,9 @@ func (s *store) Delete(ctx context.Context, dgst digest.Digest) error { return errors.Wrap(err, "failed to convert digest") } - err = s.cln.Pin().Rm(ctx, iface.IpfsPath(c)) + // Recursively removing a pin will not remove shared chunks because IPFS has + // its internal refcounting. This will expose the unpinned blobs to IPFS GC. + err = s.cln.Pin().Rm(ctx, iface.IpfsPath(c), options.Pin.RmRecursive(true)) if err != nil { return errors.Wrap(err, "failed to remove pin") }