Skip to content

Commit

Permalink
Implement basic filestore 'no-copy' functionality
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
  • Loading branch information
whyrusleeping committed Jan 24, 2017
1 parent 416f025 commit f49f52e
Show file tree
Hide file tree
Showing 26 changed files with 769 additions and 36 deletions.
6 changes: 5 additions & 1 deletion blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ func (bs *blockstore) Has(k *cid.Cid) (bool, error) {
}

func (s *blockstore) DeleteBlock(k *cid.Cid) error {
return s.datastore.Delete(dshelp.CidToDsKey(k))
err := s.datastore.Delete(dshelp.CidToDsKey(k))
if err == ds.ErrNotFound {
return ErrNotFound
}
return err
}

// AllKeysChan runs a query for keys from the blockstore.
Expand Down
6 changes: 3 additions & 3 deletions commands/files/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type File interface {
// they are not directories
io.ReadCloser

// FileName returns a filename path associated with this file
// FileName returns a filename associated with this file
FileName() string

// FullPath returns the full path in the os associated with this file
// FullPath returns the full path used when adding this file
FullPath() string

// IsDirectory returns true if the File is a directory (and therefore
Expand Down Expand Up @@ -57,6 +57,6 @@ type SizeFile interface {
}

type FileInfo interface {
FullPath() string
AbsPath() string
Stat() os.FileInfo
}
11 changes: 10 additions & 1 deletion commands/files/multipartfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (

applicationDirectory = "application/x-directory"
applicationSymlink = "application/symlink"
applicationFile = "application/octet-stream"

contentTypeHeader = "Content-Type"
)
Expand All @@ -34,7 +35,8 @@ func NewFileFromPart(part *multipart.Part) (File, error) {
}

contentType := part.Header.Get(contentTypeHeader)
if contentType == applicationSymlink {
switch contentType {
case applicationSymlink:
out, err := ioutil.ReadAll(part)
if err != nil {
return nil, err
Expand All @@ -44,6 +46,13 @@ func NewFileFromPart(part *multipart.Part) (File, error) {
Target: string(out),
name: f.FileName(),
}, nil
case applicationFile:
return &ReaderFile{
reader: part,
filename: f.FileName(),
abspath: part.Header.Get("abspath"),
fullpath: f.FullPath(),
}, nil
}

var err error
Expand Down
17 changes: 16 additions & 1 deletion commands/files/readerfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,30 @@ import (
"errors"
"io"
"os"
"path/filepath"
)

// ReaderFile is a implementation of File created from an `io.Reader`.
// ReaderFiles are never directories, and can be read from and closed.
type ReaderFile struct {
filename string
fullpath string
abspath string
reader io.ReadCloser
stat os.FileInfo
}

func NewReaderFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) *ReaderFile {
return &ReaderFile{filename, path, reader, stat}
return &ReaderFile{filename, path, path, reader, stat}
}

func NewReaderPathFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) (*ReaderFile, error) {
abspath, err := filepath.Abs(path)
if err != nil {
return nil, err
}

return &ReaderFile{filename, path, abspath, reader, stat}, nil
}

func (f *ReaderFile) IsDirectory() bool {
Expand All @@ -35,6 +46,10 @@ func (f *ReaderFile) FullPath() string {
return f.fullpath
}

func (f *ReaderFile) AbsPath() string {
return f.abspath
}

func (f *ReaderFile) Read(p []byte) (int, error) {
return f.reader.Read(p)
}
Expand Down
3 changes: 2 additions & 1 deletion commands/files/serialfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ type serialFile struct {
}

func NewSerialFile(name, path string, hidden bool, stat os.FileInfo) (File, error) {

switch mode := stat.Mode(); {
case mode.IsRegular():
file, err := os.Open(path)
if err != nil {
return nil, err
}
return NewReaderFile(name, path, file, stat), nil
return NewReaderPathFile(name, path, file, stat)
case mode.IsDir():
// for directories, stat all of the contents first, so we know what files to
// open when NextFile() is called
Expand Down
3 changes: 3 additions & 0 deletions commands/http/multifilereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func (mfr *MultiFileReader) Read(buf []byte) (written int, err error) {
header.Set("Content-Disposition", fmt.Sprintf("file; filename=\"%s\"", filename))

header.Set("Content-Type", contentType)
if rf, ok := file.(*files.ReaderFile); ok {
header.Set("abspath", rf.AbsPath())
}

_, err := mfr.mpWriter.CreatePart(header)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
filestore "github.com/ipfs/go-ipfs/filestore"
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
Expand Down Expand Up @@ -166,8 +167,8 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
TempErrFunc: isTooManyFDError,
}

var err error
bs := bstore.NewBlockstore(rds)

opts := bstore.DefaultCacheOpts()
conf, err := n.Repo.Config()
if err != nil {
Expand All @@ -184,7 +185,14 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}

n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker())
n.BaseBlocks = cbs
n.GCLocker = bstore.NewGCLocker()
n.Blockstore = bstore.NewGCBlockstore(cbs, n.GCLocker)

if conf.Experimental.FilestoreEnabled {
n.Filestore = filestore.NewFilestore(bs, n.Repo.FileManager())
n.Blockstore = bstore.NewGCBlockstore(n.Filestore, n.GCLocker)
}

rcfg, err := n.Repo.Config()
if err != nil {
Expand Down
47 changes: 33 additions & 14 deletions core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ipfs/go-ipfs/core/coreunix"
"gx/ipfs/QmeWjRodbcZFKe5tMN7poEx3izym6osrLSnTLf9UjJZBbs/pb"

bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
blockservice "github.com/ipfs/go-ipfs/blockservice"
cmds "github.com/ipfs/go-ipfs/commands"
files "github.com/ipfs/go-ipfs/commands/files"
Expand All @@ -23,16 +24,18 @@ import (
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")

const (
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
rawLeavesOptionName = "raw-leaves"
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
rawLeavesOptionName = "raw-leaves"
noCopyOptionName = "nocopy"
fstoreCacheOptionName = "fscache"
)

var AddCmd = &cmds.Command{
Expand Down Expand Up @@ -78,6 +81,8 @@ You can now refer to the added file in a gateway, like so:
cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."),
cmds.BoolOption(pinOptionName, "Pin this object when adding.").Default(true),
cmds.BoolOption(rawLeavesOptionName, "Use raw blocks for leaf nodes. (experimental)"),
cmds.BoolOption(noCopyOptionName, "Add the file using filestore. (experimental)"),
cmds.BoolOption(fstoreCacheOptionName, "Check the filestore for pre-existing blocks. (experimental)"),
},
PreRun: func(req cmds.Request) error {
quiet, _, _ := req.Option(quietOptionName).Bool()
Expand Down Expand Up @@ -140,6 +145,13 @@ You can now refer to the added file in a gateway, like so:
chunker, _, _ := req.Option(chunkerOptionName).String()
dopin, _, _ := req.Option(pinOptionName).Bool()
rawblks, _, _ := req.Option(rawLeavesOptionName).Bool()
nocopy, _, _ := req.Option(noCopyOptionName).Bool()
fscache, _, _ := req.Option(fstoreCacheOptionName).Bool()

if nocopy && !rawblks {
res.SetError(fmt.Errorf("nocopy option requires '--raw-leaves' to be enabled as well"), cmds.ErrNormal)
return
}

if hash {
nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{
Expand All @@ -154,14 +166,20 @@ You can now refer to the added file in a gateway, like so:
n = nilnode
}

dserv := n.DAG
addblockstore := n.Blockstore
if !fscache && !nocopy {
addblockstore = bstore.NewGCBlockstore(n.BaseBlocks, n.GCLocker)
}

exch := n.Exchange
local, _, _ := req.Option("local").Bool()
if local {
offlineexch := offline.Exchange(n.Blockstore)
bserv := blockservice.New(n.Blockstore, offlineexch)
dserv = dag.NewDAGService(bserv)
exch = offline.Exchange(addblockstore)
}

bserv := blockservice.New(addblockstore, exch)
dserv := dag.NewDAGService(bserv)

outChan := make(chan interface{}, 8)
res.SetOutput((<-chan interface{})(outChan))

Expand All @@ -180,6 +198,7 @@ You can now refer to the added file in a gateway, like so:
fileAdder.Pin = dopin
fileAdder.Silent = silent
fileAdder.RawLeaves = rawblks
fileAdder.NoCopy = nocopy

if hash {
md := dagtest.Mock()
Expand Down
4 changes: 4 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
filestore "github.com/ipfs/go-ipfs/filestore"
mount "github.com/ipfs/go-ipfs/fuse/mount"
merkledag "github.com/ipfs/go-ipfs/merkledag"
mfs "github.com/ipfs/go-ipfs/mfs"
Expand Down Expand Up @@ -107,6 +108,9 @@ type IpfsNode struct {
// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Filestore *filestore.Filestore // the filestore blockstore
BaseBlocks bstore.Blockstore // the raw blockstore, no filestore wrapping
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Expand Down
2 changes: 2 additions & 0 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type Adder struct {
RawLeaves bool
Silent bool
Wrap bool
NoCopy bool
Chunker string
root node.Node
mr *mfs.Root
Expand All @@ -124,6 +125,7 @@ func (adder Adder) add(reader io.Reader) (node.Node, error) {
Dagserv: adder.dagService,
RawLeaves: adder.RawLeaves,
Maxlinks: ihelper.DefaultLinksPerBlock,
NoCopy: adder.NoCopy,
}

if adder.Trickle {
Expand Down
15 changes: 11 additions & 4 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) {
adder.Out = make(chan interface{})
adder.Progress = true
adder.RawLeaves = rawLeaves
adder.NoCopy = true

data := make([]byte, 5*1024*1024)
rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error
Expand All @@ -210,12 +211,18 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) {
for _ = range adder.Out {
}

if bs.countAtOffsetZero != 2 {
t.Fatal("expected 2 blocks with an offset at zero (one root and one leafh), got", bs.countAtOffsetZero)
exp := 0
nonOffZero := 0
if rawLeaves {
exp = 1
nonOffZero = 19
}
if bs.countAtOffsetNonZero != 19 {
if bs.countAtOffsetZero != exp {
t.Fatalf("expected %d blocks with an offset at zero (one root and one leafh), got %d", exp, bs.countAtOffsetZero)
}
if bs.countAtOffsetNonZero != nonOffZero {
// note: the exact number will depend on the size and the sharding algo. used
t.Fatal("expected 19 blocks with an offset > 0, got", bs.countAtOffsetNonZero)
t.Fatalf("expected %d blocks with an offset > 0, got %d", nonOffZero, bs.countAtOffsetNonZero)
}
}

Expand Down
Loading

0 comments on commit f49f52e

Please sign in to comment.