diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 274c1ee7b4f..8c971d89a47 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -21,7 +21,9 @@ import ( var log = logging.Logger("blockstore") // BlockPrefix namespaces blockstore datastores -var BlockPrefix = ds.NewKey("blocks") +const DefaultPrefix = "/blocks" + +var blockPrefix = ds.NewKey(DefaultPrefix) var ValueTypeMismatch = errors.New("the retrieved value is not a Block") var ErrHashMismatch = errors.New("block in storage has different hash than requested") @@ -71,20 +73,23 @@ type gcBlockstore struct { } func NewBlockstore(d ds.Batching) *blockstore { + return NewBlockstoreWPrefix(d, DefaultPrefix) +} + +func NewBlockstoreWPrefix(d ds.Batching, prefix string) *blockstore { var dsb ds.Batching - dd := dsns.Wrap(d, BlockPrefix) + prefixKey := ds.NewKey(prefix) + dd := dsns.Wrap(d, prefixKey) dsb = dd return &blockstore{ datastore: dsb, + prefix: prefixKey, } } type blockstore struct { datastore ds.Batching - - lk sync.RWMutex - gcreq int32 - gcreqlk sync.Mutex + prefix ds.Key rehash bool } @@ -130,11 +135,8 @@ func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) { func (bs *blockstore) Put(block blocks.Block) error { k := dshelp.CidToDsKey(block.Cid()) - // Has is cheaper than Put, so see if we already have it - exists, err := bs.datastore.Has(k) - if err == nil && exists { - return nil // already stored. - } + // Note: The Has Check is now done by the MultiBlockstore + return bs.datastore.Put(k, block.RawData()) } @@ -145,11 +147,6 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error { } for _, b := range blocks { k := dshelp.CidToDsKey(b.Cid()) - exists, err := bs.datastore.Has(k) - if err == nil && exists { - continue - } - err = t.Put(k, b.RawData()) if err != nil { return err @@ -175,7 +172,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) // KeysOnly, because that would be _a lot_ of data. q := dsq.Query{KeysOnly: true} // datastore/namespace does *NOT* fix up Query.Prefix - q.Prefix = BlockPrefix.String() + q.Prefix = bs.prefix.String() res, err := bs.datastore.Query(q) if err != nil { return nil, err diff --git a/blocks/blockstore/blockstore_test.go b/blocks/blockstore/blockstore_test.go index abe8a1a72d5..22c15d0004b 100644 --- a/blocks/blockstore/blockstore_test.go +++ b/blocks/blockstore/blockstore_test.go @@ -170,7 +170,7 @@ func TestAllKeysRespectsContext(t *testing.T) { default: } - e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()} + e := dsq.Entry{Key: blockPrefix.ChildString("foo").String()} resultChan <- dsq.Result{Entry: e} // let it go. close(resultChan) <-done // should be done now. @@ -190,7 +190,7 @@ func TestValueTypeMismatch(t *testing.T) { block := blocks.NewBlock([]byte("some data")) datastore := ds.NewMapDatastore() - k := BlockPrefix.Child(dshelp.CidToDsKey(block.Cid())) + k := blockPrefix.Child(dshelp.CidToDsKey(block.Cid())) datastore.Put(k, "data that isn't a block!") blockstore := NewBlockstore(ds_sync.MutexWrap(datastore)) diff --git a/blocks/blockstore/bloom_cache_test.go b/blocks/blockstore/bloom_cache_test.go index 72223cd44e0..0ee3a557a5c 100644 --- a/blocks/blockstore/bloom_cache_test.go +++ b/blocks/blockstore/bloom_cache_test.go @@ -104,11 +104,11 @@ func TestHasIsBloomCached(t *testing.T) { block := blocks.NewBlock([]byte("newBlock")) cachedbs.PutMany([]blocks.Block{block}) - if cacheFails != 2 { - t.Fatalf("expected two datastore hits: %d", cacheFails) + if cacheFails != 1 { + t.Fatalf("expected datastore hits: %d", cacheFails) } cachedbs.Put(block) - if cacheFails != 3 { + if cacheFails != 2 { t.Fatalf("expected datastore hit: %d", cacheFails) } diff --git a/blocks/blockstore/multi.go b/blocks/blockstore/multi.go new file mode 100644 index 00000000000..2d260b72ff6 --- /dev/null +++ b/blocks/blockstore/multi.go @@ -0,0 +1,158 @@ +package blockstore + +// A very simple multi-blockstore that analogous to a unionfs Put and +// DeleteBlock only go to the first blockstore all others are +// considered readonly. + +import ( + //"errors" + "context" + + blocks "github.com/ipfs/go-ipfs/blocks" + cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid" + dsq "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query" +) + +type LocateInfo struct { + Prefix string + Error error +} + +type MultiBlockstore interface { + Blockstore + GCLocker + FirstMount() Blockstore + Mounts() []string + Mount(prefix string) Blockstore + Locate(*cid.Cid) []LocateInfo +} + +type Mount struct { + Prefix string + Blocks Blockstore +} + +func NewMultiBlockstore(mounts ...Mount) *multiblockstore { + return &multiblockstore{ + mounts: mounts, + } +} + +type multiblockstore struct { + mounts []Mount + gclocker +} + +func (bs *multiblockstore) FirstMount() Blockstore { + return bs.mounts[0].Blocks +} + +func (bs *multiblockstore) Mounts() []string { + mounts := make([]string, 0, len(bs.mounts)) + for _, mnt := range bs.mounts { + mounts = append(mounts, mnt.Prefix) + } + return mounts +} + +func (bs *multiblockstore) Mount(prefix string) Blockstore { + for _, m := range bs.mounts { + if m.Prefix == prefix { + return m.Blocks + } + } + return nil +} + +func (bs *multiblockstore) DeleteBlock(key *cid.Cid) error { + return bs.mounts[0].Blocks.DeleteBlock(key) +} + +func (bs *multiblockstore) Has(c *cid.Cid) (bool, error) { + var firstErr error + for _, m := range bs.mounts { + have, err := m.Blocks.Has(c) + if have && err == nil { + return have, nil + } + if err != nil && firstErr == nil { + firstErr = err + } + } + return false, firstErr +} + +func (bs *multiblockstore) Get(c *cid.Cid) (blocks.Block, error) { + var firstErr error + for _, m := range bs.mounts { + blk, err := m.Blocks.Get(c) + if err == nil { + return blk, nil + } + if firstErr == nil || firstErr == ErrNotFound { + firstErr = err + } + } + return nil, firstErr +} + +func (bs *multiblockstore) Locate(c *cid.Cid) []LocateInfo { + res := make([]LocateInfo, 0, len(bs.mounts)) + for _, m := range bs.mounts { + _, err := m.Blocks.Get(c) + res = append(res, LocateInfo{m.Prefix, err}) + } + return res +} + +func (bs *multiblockstore) Put(blk blocks.Block) error { + // First call Has() to make sure the block doesn't exist in any of + // the sub-blockstores, otherwise we could end with data being + // duplicated in two blockstores. + exists, err := bs.Has(blk.Cid()) + if err == nil && exists { + return nil // already stored + } + return bs.mounts[0].Blocks.Put(blk) +} + +func (bs *multiblockstore) PutMany(blks []blocks.Block) error { + stilladd := make([]blocks.Block, 0, len(blks)) + // First call Has() to make sure the block doesn't exist in any of + // the sub-blockstores, otherwise we could end with data being + // duplicated in two blockstores. + for _, blk := range blks { + exists, err := bs.Has(blk.Cid()) + if err == nil && exists { + continue // already stored + } + stilladd = append(stilladd, blk) + } + if len(stilladd) == 0 { + return nil + } + return bs.mounts[0].Blocks.PutMany(stilladd) +} + +func (bs *multiblockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) { + //return bs.mounts[0].Blocks.AllKeysChan(ctx) + //return nil, errors.New("Unimplemented") + in := make([]<-chan *cid.Cid, 0, len(bs.mounts)) + for _, m := range bs.mounts { + ch, err := m.Blocks.AllKeysChan(ctx) + if err != nil { + return nil, err + } + in = append(in, ch) + } + out := make(chan *cid.Cid, dsq.KeysOnlyBufSize) + go func() { + defer close(out) + for _, in0 := range in { + for key := range in0 { + out <- key + } + } + }() + return out, nil +} diff --git a/blocks/blockstore/util/remove.go b/blocks/blockstore/util/remove.go index 01f2ce44e31..7c72c29d9d3 100644 --- a/blocks/blockstore/util/remove.go +++ b/blocks/blockstore/util/remove.go @@ -27,14 +27,23 @@ type RmBlocksOpts struct { Force bool } -func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, opts RmBlocksOpts) error { +func RmBlocks(mbs bs.MultiBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, opts RmBlocksOpts) error { + prefix := opts.Prefix + if prefix == "" { + prefix = mbs.Mounts()[0] + } + blocks := mbs.Mount(prefix) + if blocks == nil { + return fmt.Errorf("Could not find blockstore: %s\n", prefix) + } + go func() { defer close(out) - unlocker := blocks.GCLock() + unlocker := mbs.GCLock() defer unlocker.Unlock() - stillOkay := FilterPinned(pins, out, cids) + stillOkay := FilterPinned(mbs, pins, out, cids, prefix) for _, c := range stillOkay { err := blocks.DeleteBlock(c) @@ -50,7 +59,7 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, c return nil } -func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*cid.Cid { +func FilterPinned(mbs bs.MultiBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, prefix string) []*cid.Cid { stillOkay := make([]*cid.Cid, 0, len(cids)) res, err := pins.CheckIfPinned(cids...) if err != nil { @@ -58,7 +67,7 @@ func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*c return nil } for _, r := range res { - if !r.Pinned() { + if !r.Pinned() || AvailableElsewhere(mbs, prefix, r.Key) { stillOkay = append(stillOkay, r.Key) } else { out <- &RemovedBlock{ @@ -70,6 +79,16 @@ func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*c return stillOkay } +func AvailableElsewhere(mbs bs.MultiBlockstore, prefix string, c *cid.Cid) bool { + locations := mbs.Locate(c) + for _, loc := range locations { + if loc.Error == nil && loc.Prefix != prefix { + return true + } + } + return false +} + func ProcRmOutput(in <-chan interface{}, sout io.Writer, serr io.Writer) error { someFailed := false for res := range in { diff --git a/core/builder.go b/core/builder.go index baef82ed06d..898d72440c8 100644 --- a/core/builder.go +++ b/core/builder.go @@ -16,6 +16,7 @@ import ( pin "github.com/ipfs/go-ipfs/pin" repo "github.com/ipfs/go-ipfs/repo" cfg "github.com/ipfs/go-ipfs/repo/config" + fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" context "context" retry "gx/ipfs/QmPF5kxTYFkzhaY5LmkExood7aTTZBHWQC6cjdDQBuGrjp/retry-datastore" @@ -167,7 +168,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { } var err error - bs := bstore.NewBlockstore(rds) + bs := bstore.NewBlockstoreWPrefix(rds, fsrepo.CacheMount) opts := bstore.DefaultCacheOpts() conf, err := n.Repo.Config() if err != nil { @@ -184,7 +185,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { return err } - n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker()) + mounts := []bstore.Mount{{fsrepo.CacheMount, cbs}} + + n.Blockstore = bstore.NewMultiBlockstore(mounts...) rcfg, err := n.Repo.Config() if err != nil { diff --git a/core/commands/add.go b/core/commands/add.go index 52613ca4cf1..957bbd67ea7 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -7,7 +7,8 @@ import ( "github.com/ipfs/go-ipfs/core/coreunix" "gx/ipfs/QmeWjRodbcZFKe5tMN7poEx3izym6osrLSnTLf9UjJZBbs/pb" - blockservice "github.com/ipfs/go-ipfs/blockservice" + bs "github.com/ipfs/go-ipfs/blocks/blockstore" + bserv "github.com/ipfs/go-ipfs/blockservice" cmds "github.com/ipfs/go-ipfs/commands" files "github.com/ipfs/go-ipfs/commands/files" core "github.com/ipfs/go-ipfs/core" @@ -33,6 +34,7 @@ const ( chunkerOptionName = "chunker" pinOptionName = "pin" rawLeavesOptionName = "raw-leaves" + allowDupName = "allow-dup" ) var AddCmd = &cmds.Command{ @@ -80,6 +82,7 @@ You can now refer to the added file in a gateway, like so: cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."), cmds.BoolOption(pinOptionName, "Pin this object when adding.").Default(true), cmds.BoolOption(rawLeavesOptionName, "Use raw blocks for leaf nodes. (experimental)"), + cmds.BoolOption(allowDupName, "Add even if blocks are in non-cache blockstore.").Default(false), }, PreRun: func(req cmds.Request) error { if quiet, _, _ := req.Option(quietOptionName).Bool(); quiet { @@ -138,6 +141,7 @@ You can now refer to the added file in a gateway, like so: chunker, _, _ := req.Option(chunkerOptionName).String() dopin, _, _ := req.Option(pinOptionName).Bool() rawblks, _, _ := req.Option(rawLeavesOptionName).Bool() + allowDup, _, _ := req.Option(allowDupName).Bool() if hash { nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{ @@ -152,18 +156,30 @@ You can now refer to the added file in a gateway, like so: n = nilnode } - dserv := n.DAG + exchange := n.Exchange local, _, _ := req.Option("local").Bool() if local { - offlineexch := offline.Exchange(n.Blockstore) - bserv := blockservice.New(n.Blockstore, offlineexch) - dserv = dag.NewDAGService(bserv) + exchange = offline.Exchange(n.Blockstore) } outChan := make(chan interface{}, 8) res.SetOutput((<-chan interface{})(outChan)) - fileAdder, err := coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, dserv) + var fileAdder *coreunix.Adder + if allowDup { + // add directly to the first mount bypassing + // the Has() check of the multi-blockstore + blockstore := bs.NewGCBlockstore(n.Blockstore.FirstMount(), n.Blockstore) + blockService := bserv.NewWriteThrough(blockstore, exchange) + dagService := dag.NewDAGService(blockService) + fileAdder, err = coreunix.NewAdder(req.Context(), n.Pinning, blockstore, dagService) + } else if exchange != n.Exchange { + blockService := bserv.New(n.Blockstore, exchange) + dagService := dag.NewDAGService(blockService) + fileAdder, err = coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, dagService) + } else { + fileAdder, err = coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, n.DAG) + } if err != nil { res.SetError(err, cmds.ErrNormal) return diff --git a/core/commands/block.go b/core/commands/block.go index 54b447d2ee1..12e40aa95ff 100644 --- a/core/commands/block.go +++ b/core/commands/block.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/ipfs/go-ipfs/blocks" + bs "github.com/ipfs/go-ipfs/blocks/blockstore" util "github.com/ipfs/go-ipfs/blocks/blockstore/util" cmds "github.com/ipfs/go-ipfs/commands" @@ -36,10 +37,11 @@ multihash. }, Subcommands: map[string]*cmds.Command{ - "stat": blockStatCmd, - "get": blockGetCmd, - "put": blockPutCmd, - "rm": blockRmCmd, + "stat": blockStatCmd, + "get": blockGetCmd, + "put": blockPutCmd, + "rm": blockRmCmd, + "locate": blockLocateCmd, }, } @@ -238,39 +240,104 @@ It takes a list of base58 encoded multihashs to remove. cmds.BoolOption("quiet", "q", "Write minimal output.").Default(false), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() + blockRmRun(req, res, "") + }, + PostRun: func(req cmds.Request, res cmds.Response) { + if res.Error() != nil { + return + } + outChan, ok := res.Output().(<-chan interface{}) + if !ok { + res.SetError(u.ErrCast(), cmds.ErrNormal) + return + } + res.SetOutput(nil) + + err := util.ProcRmOutput(outChan, res.Stdout(), res.Stderr()) if err != nil { res.SetError(err, cmds.ErrNormal) - return } - hashes := req.Arguments() - force, _, _ := req.Option("force").Bool() - quiet, _, _ := req.Option("quiet").Bool() - cids := make([]*cid.Cid, 0, len(hashes)) - for _, hash := range hashes { - c, err := cid.Decode(hash) - if err != nil { - res.SetError(fmt.Errorf("invalid content id: %s (%s)", hash, err), cmds.ErrNormal) - return - } + }, + Type: util.RemovedBlock{}, +} - cids = append(cids, c) +func blockRmRun(req cmds.Request, res cmds.Response, prefix string) { + n, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + hashes := req.Arguments() + force, _, _ := req.Option("force").Bool() + quiet, _, _ := req.Option("quiet").Bool() + cids := make([]*cid.Cid, 0, len(hashes)) + for _, hash := range hashes { + c, err := cid.Decode(hash) + if err != nil { + res.SetError(fmt.Errorf("invalid content id: %s (%s)", hash, err), cmds.ErrNormal) + return } - outChan := make(chan interface{}) - err = util.RmBlocks(n.Blockstore, n.Pinning, outChan, cids, util.RmBlocksOpts{ - Quiet: quiet, - Force: force, - }) + cids = append(cids, c) + } + outChan := make(chan interface{}) + err = util.RmBlocks(n.Blockstore, n.Pinning, outChan, cids, util.RmBlocksOpts{ + Quiet: quiet, + Force: force, + Prefix: prefix, + }) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + res.SetOutput((<-chan interface{})(outChan)) +} + +type BlockLocateRes struct { + Key string + Res []bs.LocateInfo +} + +var blockLocateCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Locate an IPFS block.", + ShortDescription: ` +'ipfs block rm' is a plumbing command for locating which +sub-datastores block(s) are located in. +`, + }, + Arguments: []cmds.Argument{ + cmds.StringArg("hash", true, true, "Bash58 encoded multihash of block(s) to check."), + }, + Options: []cmds.Option{ + cmds.BoolOption("quiet", "q", "Write minimal output.").Default(false), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := req.InvocContext().GetNode() if err != nil { res.SetError(err, cmds.ErrNormal) return } + hashes := req.Arguments() + outChan := make(chan interface{}) res.SetOutput((<-chan interface{})(outChan)) + go func() { + defer close(outChan) + for _, hash := range hashes { + key, err := cid.Decode(hash) + if err != nil { + panic(err) // FIXME + } + ret := n.Blockstore.Locate(key) + outChan <- &BlockLocateRes{hash, ret} + } + }() + return }, PostRun: func(req cmds.Request, res cmds.Response) { if res.Error() != nil { return } + quiet, _, _ := req.Option("quiet").Bool() outChan, ok := res.Output().(<-chan interface{}) if !ok { res.SetError(u.ErrCast(), cmds.ErrNormal) @@ -278,10 +345,18 @@ It takes a list of base58 encoded multihashs to remove. } res.SetOutput(nil) - err := util.ProcRmOutput(outChan, res.Stdout(), res.Stderr()) - if err != nil { - res.SetError(err, cmds.ErrNormal) + for out := range outChan { + ret := out.(*BlockLocateRes) + for _, inf := range ret.Res { + if quiet && inf.Error == nil { + fmt.Fprintf(res.Stdout(), "%s %s\n", ret.Key, inf.Prefix) + } else if !quiet && inf.Error == nil { + fmt.Fprintf(res.Stdout(), "%s %s found\n", ret.Key, inf.Prefix) + } else if !quiet { + fmt.Fprintf(res.Stdout(), "%s %s error %s\n", ret.Key, inf.Prefix, inf.Error.Error()) + } + } } }, - Type: util.RemovedBlock{}, + Type: BlockLocateRes{}, } diff --git a/core/core.go b/core/core.go index 231b91ac6b0..16bd44689ac 100644 --- a/core/core.go +++ b/core/core.go @@ -95,11 +95,11 @@ type IpfsNode struct { PrivateKey ic.PrivKey // the local node's private Key // Services - Peerstore pstore.Peerstore // storage for other Peer instances - Blockstore bstore.GCBlockstore // the block store (lower level) - Blocks bserv.BlockService // the block service, get/add blocks. - DAG merkledag.DAGService // the merkle dag service, get/add objects. - Resolver *path.Resolver // the path resolution system + Peerstore pstore.Peerstore // storage for other Peer instances + Blockstore bstore.MultiBlockstore // the block store (lower level) + Blocks bserv.BlockService // the block service, get/add blocks. + DAG merkledag.DAGService // the merkle dag service, get/add objects. + Resolver *path.Resolver // the path resolution system Reporter metrics.Reporter Discovery discovery.Service FilesRoot *mfs.Root diff --git a/pin/gc/gc.go b/pin/gc/gc.go index d2607bdbef5..286f4e0f2d4 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -22,7 +22,7 @@ var log = logging.Logger("gc") // // The routine then iterates over every block in the blockstore and // deletes any block that is not found in the marked set. -func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan *cid.Cid, error) { +func GC(ctx context.Context, bs bstore.MultiBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan *cid.Cid, error) { unlocker := bs.GCLock() ls = ls.GetOfflineLinkService() @@ -32,7 +32,8 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin. return nil, err } - keychan, err := bs.AllKeysChan(ctx) + // only delete blocks in the first (cache) mount + keychan, err := bs.FirstMount().AllKeysChan(ctx) if err != nil { return nil, err } diff --git a/repo/fsrepo/defaultds.go b/repo/fsrepo/defaultds.go index ed8fbafe702..c6691b77263 100644 --- a/repo/fsrepo/defaultds.go +++ b/repo/fsrepo/defaultds.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-ipfs/thirdparty/dir" "gx/ipfs/QmU4VzzKNLJXJ72SedXBQKyf5Jo8W89iWpbWQjHn9qef8N/go-ds-flatfs" levelds "gx/ipfs/QmUHmMGmcwCrjHQHcYhBnqGCSWs5pBSMbGZmfwavETR1gg/go-ds-leveldb" + //multi "github.com/ipfs/go-ipfs/repo/multi" ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" mount "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/syncmount" @@ -20,7 +21,13 @@ const ( flatfsDirectory = "blocks" ) -func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { +const ( + RootMount = "/" + CacheMount = "/blocks" // needs to be the same as blockstore.DefaultPrefix + FilestoreMount = "/filestore" +) + +func openDefaultDatastore(r *FSRepo) (repo.Datastore, []Mount, error) { leveldbPath := path.Join(r.path, leveldbDirectory) // save leveldb reference so it can be neatly closed afterward @@ -28,7 +35,7 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { Compression: ldbopts.NoCompression, }) if err != nil { - return nil, fmt.Errorf("unable to open leveldb datastore: %v", err) + return nil, nil, fmt.Errorf("unable to open leveldb datastore: %v", err) } syncfs := !r.config.Datastore.NoSync @@ -36,7 +43,7 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { // by the Qm prefix. Leaving us with 9 bits, or 512 way sharding blocksDS, err := flatfs.New(path.Join(r.path, flatfsDirectory), 5, syncfs) if err != nil { - return nil, fmt.Errorf("unable to open flatfs datastore: %v", err) + return nil, nil, fmt.Errorf("unable to open flatfs datastore: %v", err) } // Add our PeerID to metrics paths to keep them unique @@ -51,18 +58,24 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { prefix := "fsrepo." + id + ".datastore." metricsBlocks := measure.New(prefix+"blocks", blocksDS) metricsLevelDB := measure.New(prefix+"leveldb", leveldbDS) - mountDS := mount.New([]mount.Mount{ - { - Prefix: ds.NewKey("/blocks"), - Datastore: metricsBlocks, - }, - { - Prefix: ds.NewKey("/"), - Datastore: metricsLevelDB, - }, + + var mounts []mount.Mount + var directMounts []Mount + + mounts = append(mounts, mount.Mount{ + Prefix: ds.NewKey(CacheMount), + Datastore: metricsBlocks, + }) + directMounts = append(directMounts, Mount{CacheMount, blocksDS}) + mounts = append(mounts, mount.Mount{ + Prefix: ds.NewKey(RootMount), + Datastore: metricsLevelDB, }) + directMounts = append(directMounts, Mount{RootMount, leveldbDS}) + + mountDS := mount.New(mounts) - return mountDS, nil + return mountDS, directMounts, nil } func initDefaultDatastore(repoPath string, conf *config.Config) error { diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 03ac313e63b..847ee7777c6 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -20,6 +20,7 @@ import ( dir "github.com/ipfs/go-ipfs/thirdparty/dir" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" util "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" "gx/ipfs/QmeqtHtxGfcsfXiou7wqHJARWPKUTUcPdtSfSYYHp48dtQ/go-ds-measure" ) @@ -93,6 +94,12 @@ type FSRepo struct { lockfile io.Closer config *config.Config ds repo.Datastore + mounts []Mount +} + +type Mount struct { + prefix string + dstore ds.Datastore } var _ repo.Repo = (*FSRepo)(nil) @@ -331,11 +338,12 @@ func (r *FSRepo) openConfig() error { func (r *FSRepo) openDatastore() error { switch r.config.Datastore.Type { case "default", "leveldb", "": - d, err := openDefaultDatastore(r) + d, m, err := openDefaultDatastore(r) if err != nil { return err } r.ds = d + r.mounts = m default: return fmt.Errorf("unknown datastore type: %s", r.config.Datastore.Type) } @@ -557,6 +565,27 @@ func (r *FSRepo) Datastore() repo.Datastore { return d } +func (r *FSRepo) DirectMount(prefix string) ds.Datastore { + packageLock.Lock() + defer packageLock.Unlock() + for _, m := range r.mounts { + if prefix == m.prefix { + return m.dstore + } + } + return nil +} + +func (r *FSRepo) Mounts() []string { + packageLock.Lock() + mounts := make([]string, 0, len(r.mounts)) + for _, m := range r.mounts { + mounts = append(mounts, m.prefix) + } + packageLock.Unlock() + return mounts +} + // GetStorageUsage computes the storage space taken by the repo in bytes func (r *FSRepo) GetStorageUsage() (uint64, error) { pth, err := config.PathRoot() diff --git a/repo/mock.go b/repo/mock.go index 8190a0bda1b..f68e078cfbc 100644 --- a/repo/mock.go +++ b/repo/mock.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/ipfs/go-ipfs/repo/config" + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" ) var errTODO = errors.New("TODO: mock repo") @@ -33,6 +34,18 @@ func (m *Mock) GetConfigKey(key string) (interface{}, error) { func (m *Mock) Datastore() Datastore { return m.D } +func (m *Mock) DirectMount(prefix string) ds.Datastore { + if prefix == "/" { + return m.D + } else { + return nil + } +} + +func (m *Mock) Mounts() []string { + return []string{"/"} +} + func (m *Mock) GetStorageUsage() (uint64, error) { return 0, nil } func (m *Mock) Close() error { return errTODO } diff --git a/repo/repo.go b/repo/repo.go index d95af0446dd..633ff57114b 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -22,6 +22,14 @@ type Repo interface { Datastore() Datastore GetStorageUsage() (uint64, error) + // DirectMount provides direct access to a datastore mounted + // under prefix in order to perform low-level operations. The + // datastore returned is guaranteed not be a proxy (such as a + // go-datastore/measure) normal operations should go through + // Datastore() + DirectMount(prefix string) ds.Datastore + Mounts() []string + // SetAPIAddr sets the API address in the repo. SetAPIAddr(addr string) error