Skip to content

Commit

Permalink
Recursively pin and unpin data for ipfs + containerd 2 layer GC
Browse files Browse the repository at this point in the history
  • Loading branch information
hinshun committed Apr 1, 2019
1 parent c39ec5b commit 43b05cf
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 26 deletions.
7 changes: 0 additions & 7 deletions cmd/convert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"encoding/json"
"io"
"io/ioutil"
"log"
Expand Down Expand Up @@ -171,12 +170,6 @@ func PullByDescriptor(ctx context.Context, ipfsCln iface.CoreAPI, ctrdCln *conta
return errors.Wrap(err, "failed to get manifest")
}

mfstJSON, err := json.MarshalIndent(mfst, "", " ")
if err != nil {
return errors.Wrap(err, "failed to marshal manifest JSON")
}
log.Printf("Pulled Manifest [%d]:\n%s", len(mfstJSON), mfstJSON)

err = contentutil.Copy(ctx, ingester, provider, mfst.Config)
if err != nil {
return errors.Wrapf(err, "failed to ingest manifest config blob %q", mfst.Config.Digest)
Expand Down
3 changes: 2 additions & 1 deletion converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hinshun/ipcs/digestconv"
files "github.com/ipfs/go-ipfs-files"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -91,7 +92,7 @@ func copyFile(ctx context.Context, cln iface.CoreAPI, provider content.Provider,
}

func addFile(ctx context.Context, cln iface.CoreAPI, n files.Node) (digest.Digest, error) {
p, err := cln.Unixfs().Add(ctx, n)
p, err := cln.Unixfs().Add(ctx, n, options.Unixfs.Pin(true))
if err != nil {
return "", errors.Wrap(err, "failed to put blob to ipfs")
}
Expand Down
38 changes: 22 additions & 16 deletions ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hinshun/ipcs/digestconv"
files "github.com/ipfs/go-ipfs-files"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
Expand All @@ -35,11 +36,10 @@ func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.
}

w := &writer{
ctx: ctx,
cln: s.cln,
ref: wOpts.Ref,
total: wOpts.Desc.Size,
expected: wOpts.Desc.Digest,
ctx: ctx,
cln: s.cln,
ref: wOpts.Ref,
total: wOpts.Desc.Size,
}

err := w.Truncate(0)
Expand All @@ -56,8 +56,7 @@ type writer struct {
ref string
offset int64
total int64
committed bool
expected digest.Digest
dgst digest.Digest
startedAt time.Time
updatedAt time.Time
pw io.Writer
Expand Down Expand Up @@ -92,11 +91,7 @@ func (w *writer) Close() error {

// Digest may return empty digest or panics until committed.
func (w *writer) Digest() digest.Digest {
if w.committed {
return w.expected
}

return ""
return w.dgst
}

// Commit commits the blob (but no roll-back is guaranteed on an error).
Expand All @@ -108,11 +103,10 @@ func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest,
return errors.Wrapf(errdefs.ErrFailedPrecondition, "unexpected commit size %d, expected %d", w.offset, size)
}

if expected != "" && expected != w.expected {
return errors.Wrapf(errdefs.ErrFailedPrecondition, "unexpected commit digest %s, expected %s", w.expected, expected)
if expected != "" && expected != w.dgst {
return errors.Wrapf(errdefs.ErrFailedPrecondition, "unexpected commit digest %s, expected %s", w.dgst, expected)
}

w.committed = true
return w.Close()
}

Expand Down Expand Up @@ -145,7 +139,19 @@ func (w *writer) Truncate(size int64) error {
ctx, cancel := context.WithCancel(w.ctx)
w.ipfsErr = nil
go func() {
_, w.ipfsErr = w.cln.Unixfs().Add(ctx, files.NewReaderFile(r))
p, err := w.cln.Unixfs().Add(ctx, files.NewReaderFile(r), options.Unixfs.Pin(true))
if err != nil {
w.ipfsErr = err
return
}

dgst, err := digestconv.CidToDigest(p.Cid())
if err != nil {
w.ipfsErr = err
return
}

w.dgst = dgst
}()

w.cancel = func() error {
Expand Down
12 changes: 10 additions & 2 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/hinshun/ipcs/digestconv"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -53,7 +54,12 @@ func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...str
// match the provided filters. If no filters are given all
// items will be walked.
func (s *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
pins, err := s.cln.Pin().Ls(ctx)
// TODO: Filters are also not supported in containerd's local store.
// Since we replace the local store, and filters are implemented in the boltdb
// metadata that wraps the local store, we can wait until upstream supports
// it too.

pins, err := s.cln.Pin().Ls(ctx, options.Pin.Type.All())
if err != nil {
return errors.Wrap(err, "failed to list ipfs pins")
}
Expand Down Expand Up @@ -86,7 +92,9 @@ func (s *store) Delete(ctx context.Context, dgst digest.Digest) error {
return errors.Wrap(err, "failed to convert digest")
}

err = s.cln.Pin().Rm(ctx, iface.IpfsPath(c))
// Recursively removing a pin will not remove shared chunks because IPFS has
// its internal refcounting. This will expose the unpinned blobs to IPFS GC.
err = s.cln.Pin().Rm(ctx, iface.IpfsPath(c), options.Pin.RmRecursive(true))
if err != nil {
return errors.Wrap(err, "failed to remove pin")
}
Expand Down

0 comments on commit 43b05cf

Please sign in to comment.