Skip to content

Commit

Permalink
Add DAGService.GetLinks() method and use it in the GC and elsewhere.
Browse files Browse the repository at this point in the history
This method will use the (also new) LinkService if it is available to
retrieving just the links for a MerkleDAG without necessary having to
retrieve the underlying block.

For now the main benefit is that the pinner will not break when a block
becomes invalid due to a change in the backing file.  This is possible
because the metadata for a block (that includes the Links) is stored
separately and thus always available even if the backing file changes.

License: MIT
Signed-off-by: Kevin Atkinson <k@kevina.org>
  • Loading branch information
kevina committed Aug 20, 2016
1 parent a482ee1 commit 5a73226
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 33 deletions.
10 changes: 9 additions & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
ci "gx/ipfs/QmUWER4r4qMvaCnX5zREcfyiWN7cXN9g3a7fkRqNz8qWPP/go-libp2p-crypto"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"

"github.com/ipfs/go-ipfs/filestore"
"github.com/ipfs/go-ipfs/filestore/support"
)

type BuildCfg struct {
Expand Down Expand Up @@ -180,7 +183,12 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}

n.Blocks = bserv.New(n.Blockstore, n.Exchange)
n.DAG = dag.NewDAGService(n.Blocks)
dag := dag.NewDAGService(n.Blocks)
if fs,ok := n.Repo.DirectMount(fsrepo.FilestoreMount).(*filestore.Datastore); ok {
n.LinkService = filestore_support.NewLinkService(fs)
dag.LinkService = n.LinkService
}
n.DAG = dag
n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG)
if err != nil {
// TODO: we should move towards only running 'NewPinner' explicity on
Expand Down
4 changes: 2 additions & 2 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,11 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string
if typeStr == "indirect" || typeStr == "all" {
ks := key.NewKeySet()
for _, k := range n.Pinning.RecursiveKeys() {
nd, err := n.DAG.Get(ctx, k)
links, err := n.DAG.GetLinks(ctx, k)
if err != nil {
return nil, err
}
err = dag.EnumerateChildren(n.Context(), n.DAG, nd, ks, false)
err = dag.EnumerateChildren(n.Context(), n.DAG, links, ks, false)
if err != nil {
return nil, err
}
Expand Down
17 changes: 9 additions & 8 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ type IpfsNode struct {
PrivateKey ic.PrivKey // the local node's private Key

// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.MultiBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Discovery discovery.Service
FilesRoot *mfs.Root
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.MultiBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
LinkService merkledag.LinkService
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Discovery discovery.Service
FilesRoot *mfs.Root

// Online
PeerHost p2phost.Host // the network host (server+client)
Expand Down
4 changes: 2 additions & 2 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
if err != nil {
return err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots)
rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots)
if err != nil {
return err
}
Expand All @@ -114,7 +114,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo
if err != nil {
return nil, err
}
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots)
rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestAddGCLive(t *testing.T) {
gcstarted := make(chan struct{})
go func() {
defer close(gcstarted)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.Pinning, nil)
gcchan, err := gc.GC(context.Background(), node.Blockstore, node.LinkService, node.Pinning, nil)
if err != nil {
log.Error("GC ERROR:", err)
errs <- err
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestAddGCLive(t *testing.T) {
t.Fatal(err)
}

err = dag.EnumerateChildren(ctx, node.DAG, root, key.NewKeySet(), false)
err = dag.EnumerateChildren(ctx, node.DAG, root.Links, key.NewKeySet(), false)
if err != nil {
t.Fatal(err)
}
Expand Down
31 changes: 31 additions & 0 deletions filestore/support/linkservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package filestore_support

import (
key "github.com/ipfs/go-ipfs/blocks/key"
. "github.com/ipfs/go-ipfs/filestore"
dag "github.com/ipfs/go-ipfs/merkledag"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
)

func NewLinkService(fs *Datastore) dag.LinkService {
return &linkservice{fs}
}

type linkservice struct {
fs *Datastore
}

func (ls *linkservice) Get(key key.Key) ([]*dag.Link, error) {
dsKey := key.DsKey()
dataObj, err := ls.fs.GetDirect(dsKey)
if err == ds.ErrNotFound {
return nil, dag.ErrNotFound
} else if err != nil {
return nil, err
}
res, err := dag.DecodeProtobuf(dataObj.Data)
if err != nil {
return nil, err
}
return res.Links, nil
}
39 changes: 32 additions & 7 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,25 @@ type DAGService interface {
Get(context.Context, key.Key) (*Node, error)
Remove(*Node) error

// Return all links for a node, may be more effect than
// calling Get
GetLinks(context.Context, key.Key) ([]*Link, error)

// GetDAG returns, in order, all the single leve child
// nodes of the passed in node.
GetMany(context.Context, []key.Key) <-chan *NodeOption

Batch() *Batch
}

func NewDAGService(bs *bserv.BlockService) DAGService {
return &dagService{bs}
// A LinkService returns the links for a node if they are available
// locally without having to retrieve the block from the datastore.
type LinkService interface {
Get(key.Key) ([]*Link, error)
}

func NewDAGService(bs *bserv.BlockService) *dagService {
return &dagService{Blocks: bs}
}

// dagService is an IPFS Merkle DAG service.
Expand All @@ -39,7 +49,8 @@ func NewDAGService(bs *bserv.BlockService) DAGService {
// TODO: should cache Nodes that are in memory, and be
// able to free some of them when vm pressure is high
type dagService struct {
Blocks *bserv.BlockService
Blocks *bserv.BlockService
LinkService LinkService
}

func createBlock(nd *Node) (*blocks.BasicBlock, error) {
Expand Down Expand Up @@ -112,6 +123,20 @@ func (n *dagService) Get(ctx context.Context, k key.Key) (*Node, error) {
return res, nil
}

func (n *dagService) GetLinks(ctx context.Context, k key.Key) ([]*Link, error) {
if n.LinkService != nil {
links, err := n.LinkService.Get(k)
if err == nil {
return links, nil
}
}
node, err := n.Get(ctx, k)
if err != nil {
return nil, err
}
return node.Links, nil
}

func (n *dagService) Remove(nd *Node) error {
k, err := nd.Key()
if err != nil {
Expand Down Expand Up @@ -369,20 +394,20 @@ func (t *Batch) Commit() error {
// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet, bestEffort bool) error {
for _, lnk := range root.Links {
func EnumerateChildren(ctx context.Context, ds DAGService, links []*Link, set key.KeySet, bestEffort bool) error {
for _, lnk := range links {
k := key.Key(lnk.Hash)
if !set.Has(k) {
set.Add(k)
child, err := ds.Get(ctx, k)
children, err := ds.GetLinks(ctx, k)
if err != nil {
if bestEffort && err == ErrNotFound {
continue
} else {
return err
}
}
err = EnumerateChildren(ctx, ds, child, set, bestEffort)
err = EnumerateChildren(ctx, ds, children, set, bestEffort)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func TestFetchGraph(t *testing.T) {
offline_ds := NewDAGService(bs)
ks := key.NewKeySet()

err = EnumerateChildren(context.Background(), offline_ds, root, ks, false)
err = EnumerateChildren(context.Background(), offline_ds, root.Links, ks, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -309,7 +309,7 @@ func TestEnumerateChildren(t *testing.T) {
}

ks := key.NewKeySet()
err = EnumerateChildren(context.Background(), ds, root, ks, false)
err = EnumerateChildren(context.Background(), ds, root.Links, ks, false)
if err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 4 additions & 3 deletions pin/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ var log = logging.Logger("gc")
//
// The routine then iterates over every block in the blockstore and
// deletes any block that is not found in the marked set.
func GC(ctx context.Context, bs bstore.MultiBlockstore, pn pin.Pinner, bestEffortRoots []key.Key) (<-chan key.Key, error) {
func GC(ctx context.Context, bs bstore.MultiBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []key.Key) (<-chan key.Key, error) {
unlocker := bs.GCLock()

bsrv := bserv.New(bs, offline.Exchange(bs))
ds := dag.NewDAGService(bsrv)
ds.LinkService = ls

gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots)
if err != nil {
Expand Down Expand Up @@ -74,13 +75,13 @@ func GC(ctx context.Context, bs bstore.MultiBlockstore, pn pin.Pinner, bestEffor
func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []key.Key, bestEffort bool) error {
for _, k := range roots {
set.Add(k)
nd, err := ds.Get(ctx, k)
links, err := ds.GetLinks(ctx, k)
if err != nil {
return err
}

// EnumerateChildren recursively walks the dag and adds the keys to the given set
err = dag.EnumerateChildren(ctx, ds, nd, set, bestEffort)
err = dag.EnumerateChildren(ctx, ds, links, set, bestEffort)
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,12 @@ func (p *pinner) isPinnedWithType(k key.Key, mode PinMode) (string, bool, error)

// Default is Indirect
for _, rk := range p.recursePin.GetKeys() {
rnd, err := p.dserv.Get(context.Background(), rk)
links, err := p.dserv.GetLinks(context.Background(), rk)
if err != nil {
return "", false, err
}

has, err := hasChild(p.dserv, rnd, k)
has, err := hasChild(p.dserv, links, k)
if err != nil {
return "", false, err
}
Expand Down Expand Up @@ -483,19 +483,19 @@ func (p *pinner) PinWithMode(k key.Key, mode PinMode) {
}
}

func hasChild(ds mdag.DAGService, root *mdag.Node, child key.Key) (bool, error) {
for _, lnk := range root.Links {
func hasChild(ds mdag.DAGService, links []*mdag.Link, child key.Key) (bool, error) {
for _, lnk := range links {
k := key.Key(lnk.Hash)
if k == child {
return true, nil
}

nd, err := ds.Get(context.Background(), k)
children, err := ds.GetLinks(context.Background(), k)
if err != nil {
return false, err
}

has, err := hasChild(ds, nd, child)
has, err := hasChild(ds, children, child)
if err != nil {
return false, err
}
Expand Down
76 changes: 76 additions & 0 deletions test/sharness/t0263-filestore-gc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/bin/sh
#
# Copyright (c) 2014 Christian Couder
# MIT Licensed; see the LICENSE file in this repository.
#

test_description="Test filestore"

. lib/test-lib.sh

test_init_ipfs

# add block
# add filestore block / rm file
# make sure gc still words

FILE1=QmfM2r8seH2GiRaC4esTjeraXEachRt8ZsSeGaWTPLyMoG
test_expect_success "add a pinned file" '
echo "Hello World!" > file1 &&
ipfs add file1
ipfs cat $FILE1 | cmp file1
'

FILE2=QmPrrHqJzto9m7SyiRzarwkqPcCSsKR2EB1AyqJfe8L8tN
test_expect_success "add an unpinned file" '
echo "Hello Mars!" > file2
ipfs add --pin=false file2
ipfs cat $FILE2 | cmp file2
'

FILE3=QmeV1kwh3333bsnT6YRfdCRrSgUPngKmAhhTa4RrqYPbKT
test_expect_success "add and pin a directory using the filestore" '
mkdir adir &&
echo "hello world!" > adir/file3 &&
echo "hello mars!" > adir/file4 &&
ipfs filestore add --logical -r --pin adir &&
ipfs cat $FILE3 | cmp adir/file3
'

FILE5=QmU5kp3BH3B8tnWUU2Pikdb2maksBNkb92FHRr56hyghh4
test_expect_success "add a unpinned file to the filestore" '
echo "Hello Venus!" > file5 &&
ipfs filestore add --logical --pin=false file5 &&
ipfs cat $FILE5 | cmp file5
'

test_expect_success "make sure filestore block is really not pinned" '
test_must_fail ipfs pin ls $FILE5
'

test_expect_success "remove one of the backing files" '
rm adir/file3 &&
test_must_fail ipfs cat $FILE3
'

test_expect_success "make ipfs pin ls is still okay" '
ipfs pin ls
'

test_expect_success "make sure the gc will still run" '
ipfs repo gc
'

test_expect_success "make sure pinned block got removed after gc" '
ipfs cat $FILE1
'

test_expect_success "make sure un-pinned block still exists" '
test_must_fail ipfs cat $FILE2
'

test_expect_success "make sure unpinned filestore block did not get removed" '
ipfs cat $FILE5
'

test_done

0 comments on commit 5a73226

Please sign in to comment.