From 5a732268eecddd19034325e631742f3e8d88aebf Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Fri, 19 Aug 2016 15:52:27 -0400 Subject: [PATCH] Add DAGService.GetLinks() method and use it in the GC and elsewhere. This method will use the (also new) LinkService if it is available to retrieving just the links for a MerkleDAG without necessary having to retrieve the underlying block. For now the main benefit is that the pinner will not break when a block becomes invalid due to a change in the backing file. This is possible because the metadata for a block (that includes the Links) is stored separately and thus always available even if the backing file changes. License: MIT Signed-off-by: Kevin Atkinson --- core/builder.go | 10 +++- core/commands/pin.go | 4 +- core/core.go | 17 ++++--- core/corerepo/gc.go | 4 +- core/coreunix/add_test.go | 4 +- filestore/support/linkservice.go | 31 ++++++++++++ merkledag/merkledag.go | 39 ++++++++++++--- merkledag/merkledag_test.go | 4 +- pin/gc/gc.go | 7 +-- pin/pin.go | 12 ++--- test/sharness/t0263-filestore-gc.sh | 76 +++++++++++++++++++++++++++++ 11 files changed, 175 insertions(+), 33 deletions(-) create mode 100644 filestore/support/linkservice.go create mode 100755 test/sharness/t0263-filestore-gc.sh diff --git a/core/builder.go b/core/builder.go index 1b7a0926c25..83c11194fdb 100644 --- a/core/builder.go +++ b/core/builder.go @@ -22,6 +22,9 @@ import ( goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context" ci "gx/ipfs/QmUWER4r4qMvaCnX5zREcfyiWN7cXN9g3a7fkRqNz8qWPP/go-libp2p-crypto" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" + + "github.com/ipfs/go-ipfs/filestore" + "github.com/ipfs/go-ipfs/filestore/support" ) type BuildCfg struct { @@ -180,7 +183,12 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { } n.Blocks = bserv.New(n.Blockstore, n.Exchange) - n.DAG = dag.NewDAGService(n.Blocks) + dag := dag.NewDAGService(n.Blocks) + if fs,ok := n.Repo.DirectMount(fsrepo.FilestoreMount).(*filestore.Datastore); ok { + n.LinkService = filestore_support.NewLinkService(fs) + dag.LinkService = n.LinkService + } + n.DAG = dag n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG) if err != nil { // TODO: we should move towards only running 'NewPinner' explicity on diff --git a/core/commands/pin.go b/core/commands/pin.go index efa6fa1bb3e..f4bfbed8b89 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -326,11 +326,11 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string if typeStr == "indirect" || typeStr == "all" { ks := key.NewKeySet() for _, k := range n.Pinning.RecursiveKeys() { - nd, err := n.DAG.Get(ctx, k) + links, err := n.DAG.GetLinks(ctx, k) if err != nil { return nil, err } - err = dag.EnumerateChildren(n.Context(), n.DAG, nd, ks, false) + err = dag.EnumerateChildren(n.Context(), n.DAG, links, ks, false) if err != nil { return nil, err } diff --git a/core/core.go b/core/core.go index 8240fa20a2e..e1a6f0cda23 100644 --- a/core/core.go +++ b/core/core.go @@ -92,14 +92,15 @@ type IpfsNode struct { PrivateKey ic.PrivKey // the local node's private Key // Services - Peerstore pstore.Peerstore // storage for other Peer instances - Blockstore bstore.MultiBlockstore // the block store (lower level) - Blocks *bserv.BlockService // the block service, get/add blocks. - DAG merkledag.DAGService // the merkle dag service, get/add objects. - Resolver *path.Resolver // the path resolution system - Reporter metrics.Reporter - Discovery discovery.Service - FilesRoot *mfs.Root + Peerstore pstore.Peerstore // storage for other Peer instances + Blockstore bstore.MultiBlockstore // the block store (lower level) + Blocks *bserv.BlockService // the block service, get/add blocks. + DAG merkledag.DAGService // the merkle dag service, get/add objects. + LinkService merkledag.LinkService + Resolver *path.Resolver // the path resolution system + Reporter metrics.Reporter + Discovery discovery.Service + FilesRoot *mfs.Root // Online PeerHost p2phost.Host // the network host (server+client) diff --git a/core/corerepo/gc.go b/core/corerepo/gc.go index bf586620759..fdbdebd2737 100644 --- a/core/corerepo/gc.go +++ b/core/corerepo/gc.go @@ -91,7 +91,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { if err != nil { return err } - rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots) + rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) if err != nil { return err } @@ -114,7 +114,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo if err != nil { return nil, err } - rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots) + rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) if err != nil { return nil, err } diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index 06f0b2a495a..30e782b68fa 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -96,7 +96,7 @@ func TestAddGCLive(t *testing.T) { gcstarted := make(chan struct{}) go func() { defer close(gcstarted) - gcchan, err := gc.GC(context.Background(), node.Blockstore, node.Pinning, nil) + gcchan, err := gc.GC(context.Background(), node.Blockstore, node.LinkService, node.Pinning, nil) if err != nil { log.Error("GC ERROR:", err) errs <- err @@ -155,7 +155,7 @@ func TestAddGCLive(t *testing.T) { t.Fatal(err) } - err = dag.EnumerateChildren(ctx, node.DAG, root, key.NewKeySet(), false) + err = dag.EnumerateChildren(ctx, node.DAG, root.Links, key.NewKeySet(), false) if err != nil { t.Fatal(err) } diff --git a/filestore/support/linkservice.go b/filestore/support/linkservice.go new file mode 100644 index 00000000000..d2c541b0ed4 --- /dev/null +++ b/filestore/support/linkservice.go @@ -0,0 +1,31 @@ +package filestore_support + +import ( + key "github.com/ipfs/go-ipfs/blocks/key" + . "github.com/ipfs/go-ipfs/filestore" + dag "github.com/ipfs/go-ipfs/merkledag" + ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore" +) + +func NewLinkService(fs *Datastore) dag.LinkService { + return &linkservice{fs} +} + +type linkservice struct { + fs *Datastore +} + +func (ls *linkservice) Get(key key.Key) ([]*dag.Link, error) { + dsKey := key.DsKey() + dataObj, err := ls.fs.GetDirect(dsKey) + if err == ds.ErrNotFound { + return nil, dag.ErrNotFound + } else if err != nil { + return nil, err + } + res, err := dag.DecodeProtobuf(dataObj.Data) + if err != nil { + return nil, err + } + return res.Links, nil +} diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 4b1fe181009..f572cdb82c2 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -22,6 +22,10 @@ type DAGService interface { Get(context.Context, key.Key) (*Node, error) Remove(*Node) error + // Return all links for a node, may be more effect than + // calling Get + GetLinks(context.Context, key.Key) ([]*Link, error) + // GetDAG returns, in order, all the single leve child // nodes of the passed in node. GetMany(context.Context, []key.Key) <-chan *NodeOption @@ -29,8 +33,14 @@ type DAGService interface { Batch() *Batch } -func NewDAGService(bs *bserv.BlockService) DAGService { - return &dagService{bs} +// A LinkService returns the links for a node if they are available +// locally without having to retrieve the block from the datastore. +type LinkService interface { + Get(key.Key) ([]*Link, error) +} + +func NewDAGService(bs *bserv.BlockService) *dagService { + return &dagService{Blocks: bs} } // dagService is an IPFS Merkle DAG service. @@ -39,7 +49,8 @@ func NewDAGService(bs *bserv.BlockService) DAGService { // TODO: should cache Nodes that are in memory, and be // able to free some of them when vm pressure is high type dagService struct { - Blocks *bserv.BlockService + Blocks *bserv.BlockService + LinkService LinkService } func createBlock(nd *Node) (*blocks.BasicBlock, error) { @@ -112,6 +123,20 @@ func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) { return res, nil } +func (n *dagService) GetLinks(ctx context.Context, k key.Key) ([]*Link, error) { + if n.LinkService != nil { + links, err := n.LinkService.Get(k) + if err == nil { + return links, nil + } + } + node, err := n.Get(ctx, k) + if err != nil { + return nil, err + } + return node.Links, nil +} + func (n *dagService) Remove(nd *Node) error { k, err := nd.Key() if err != nil { @@ -369,12 +394,12 @@ func (t *Batch) Commit() error { // EnumerateChildren will walk the dag below the given root node and add all // unseen children to the passed in set. // TODO: parallelize to avoid disk latency perf hits? -func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet, bestEffort bool) error { - for _, lnk := range root.Links { +func EnumerateChildren(ctx context.Context, ds DAGService, links []*Link, set key.KeySet, bestEffort bool) error { + for _, lnk := range links { k := key.Key(lnk.Hash) if !set.Has(k) { set.Add(k) - child, err := ds.Get(ctx, k) + children, err := ds.GetLinks(ctx, k) if err != nil { if bestEffort && err == ErrNotFound { continue @@ -382,7 +407,7 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K return err } } - err = EnumerateChildren(ctx, ds, child, set, bestEffort) + err = EnumerateChildren(ctx, ds, children, set, bestEffort) if err != nil { return err } diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index dcf9ced1cab..f880dc4f542 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -292,7 +292,7 @@ func TestFetchGraph(t *testing.T) { offline_ds := NewDAGService(bs) ks := key.NewKeySet() - err = EnumerateChildren(context.Background(), offline_ds, root, ks, false) + err = EnumerateChildren(context.Background(), offline_ds, root.Links, ks, false) if err != nil { t.Fatal(err) } @@ -309,7 +309,7 @@ func TestEnumerateChildren(t *testing.T) { } ks := key.NewKeySet() - err = EnumerateChildren(context.Background(), ds, root, ks, false) + err = EnumerateChildren(context.Background(), ds, root.Links, ks, false) if err != nil { t.Fatal(err) } diff --git a/pin/gc/gc.go b/pin/gc/gc.go index bada0bb4486..096cf593255 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -23,11 +23,12 @@ var log = logging.Logger("gc") // // The routine then iterates over every block in the blockstore and // deletes any block that is not found in the marked set. -func GC(ctx context.Context, bs bstore.MultiBlockstore, pn pin.Pinner, bestEffortRoots []key.Key) (<-chan key.Key, error) { +func GC(ctx context.Context, bs bstore.MultiBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []key.Key) (<-chan key.Key, error) { unlocker := bs.GCLock() bsrv := bserv.New(bs, offline.Exchange(bs)) ds := dag.NewDAGService(bsrv) + ds.LinkService = ls gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots) if err != nil { @@ -74,13 +75,13 @@ func GC(ctx context.Context, bs bstore.MultiBlockstore, pn pin.Pinner, bestEffor func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []key.Key, bestEffort bool) error { for _, k := range roots { set.Add(k) - nd, err := ds.Get(ctx, k) + links, err := ds.GetLinks(ctx, k) if err != nil { return err } // EnumerateChildren recursively walks the dag and adds the keys to the given set - err = dag.EnumerateChildren(ctx, ds, nd, set, bestEffort) + err = dag.EnumerateChildren(ctx, ds, links, set, bestEffort) if err != nil { return err } diff --git a/pin/pin.go b/pin/pin.go index 49c5a8dd1e0..2e6b81d3bb2 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -249,12 +249,12 @@ func (p *pinner) isPinnedWithType(k key.Key, mode PinMode) (string, bool, error) // Default is Indirect for _, rk := range p.recursePin.GetKeys() { - rnd, err := p.dserv.Get(context.Background(), rk) + links, err := p.dserv.GetLinks(context.Background(), rk) if err != nil { return "", false, err } - has, err := hasChild(p.dserv, rnd, k) + has, err := hasChild(p.dserv, links, k) if err != nil { return "", false, err } @@ -483,19 +483,19 @@ func (p *pinner) PinWithMode(k key.Key, mode PinMode) { } } -func hasChild(ds mdag.DAGService, root *mdag.Node, child key.Key) (bool, error) { - for _, lnk := range root.Links { +func hasChild(ds mdag.DAGService, links []*mdag.Link, child key.Key) (bool, error) { + for _, lnk := range links { k := key.Key(lnk.Hash) if k == child { return true, nil } - nd, err := ds.Get(context.Background(), k) + children, err := ds.GetLinks(context.Background(), k) if err != nil { return false, err } - has, err := hasChild(ds, nd, child) + has, err := hasChild(ds, children, child) if err != nil { return false, err } diff --git a/test/sharness/t0263-filestore-gc.sh b/test/sharness/t0263-filestore-gc.sh new file mode 100755 index 00000000000..ede6f450a61 --- /dev/null +++ b/test/sharness/t0263-filestore-gc.sh @@ -0,0 +1,76 @@ +#!/bin/sh +# +# Copyright (c) 2014 Christian Couder +# MIT Licensed; see the LICENSE file in this repository. +# + +test_description="Test filestore" + +. lib/test-lib.sh + +test_init_ipfs + +# add block +# add filestore block / rm file +# make sure gc still words + +FILE1=QmfM2r8seH2GiRaC4esTjeraXEachRt8ZsSeGaWTPLyMoG +test_expect_success "add a pinned file" ' + echo "Hello World!" > file1 && + ipfs add file1 + ipfs cat $FILE1 | cmp file1 +' + +FILE2=QmPrrHqJzto9m7SyiRzarwkqPcCSsKR2EB1AyqJfe8L8tN +test_expect_success "add an unpinned file" ' + echo "Hello Mars!" > file2 + ipfs add --pin=false file2 + ipfs cat $FILE2 | cmp file2 +' + +FILE3=QmeV1kwh3333bsnT6YRfdCRrSgUPngKmAhhTa4RrqYPbKT +test_expect_success "add and pin a directory using the filestore" ' + mkdir adir && + echo "hello world!" > adir/file3 && + echo "hello mars!" > adir/file4 && + ipfs filestore add --logical -r --pin adir && + ipfs cat $FILE3 | cmp adir/file3 +' + +FILE5=QmU5kp3BH3B8tnWUU2Pikdb2maksBNkb92FHRr56hyghh4 +test_expect_success "add a unpinned file to the filestore" ' + echo "Hello Venus!" > file5 && + ipfs filestore add --logical --pin=false file5 && + ipfs cat $FILE5 | cmp file5 +' + +test_expect_success "make sure filestore block is really not pinned" ' + test_must_fail ipfs pin ls $FILE5 +' + +test_expect_success "remove one of the backing files" ' + rm adir/file3 && + test_must_fail ipfs cat $FILE3 +' + +test_expect_success "make ipfs pin ls is still okay" ' + ipfs pin ls +' + +test_expect_success "make sure the gc will still run" ' + ipfs repo gc +' + +test_expect_success "make sure pinned block got removed after gc" ' + ipfs cat $FILE1 +' + +test_expect_success "make sure un-pinned block still exists" ' + test_must_fail ipfs cat $FILE2 +' + +test_expect_success "make sure unpinned filestore block did not get removed" ' + ipfs cat $FILE5 +' + +test_done