Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement basic filestore 'no-copy' functionality #3629

Merged
merged 10 commits into from
Mar 6, 2017
6 changes: 5 additions & 1 deletion blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ func (bs *blockstore) Has(k *cid.Cid) (bool, error) {
}

func (s *blockstore) DeleteBlock(k *cid.Cid) error {
return s.datastore.Delete(dshelp.CidToDsKey(k))
err := s.datastore.Delete(dshelp.CidToDsKey(k))
if err == ds.ErrNotFound {
return ErrNotFound
}
return err
}

// AllKeysChan runs a query for keys from the blockstore.
Expand Down
6 changes: 3 additions & 3 deletions commands/files/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type File interface {
// they are not directories
io.ReadCloser

// FileName returns a filename path associated with this file
// FileName returns a filename associated with this file
FileName() string

// FullPath returns the full path in the os associated with this file
// FullPath returns the full path used when adding this file
FullPath() string

// IsDirectory returns true if the File is a directory (and therefore
Expand Down Expand Up @@ -57,6 +57,6 @@ type SizeFile interface {
}

type FileInfo interface {
FullPath() string
AbsPath() string
Stat() os.FileInfo
}
11 changes: 10 additions & 1 deletion commands/files/multipartfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (

applicationDirectory = "application/x-directory"
applicationSymlink = "application/symlink"
applicationFile = "application/octet-stream"

contentTypeHeader = "Content-Type"
)
Expand All @@ -34,7 +35,8 @@ func NewFileFromPart(part *multipart.Part) (File, error) {
}

contentType := part.Header.Get(contentTypeHeader)
if contentType == applicationSymlink {
switch contentType {
case applicationSymlink:
out, err := ioutil.ReadAll(part)
if err != nil {
return nil, err
Expand All @@ -44,6 +46,13 @@ func NewFileFromPart(part *multipart.Part) (File, error) {
Target: string(out),
name: f.FileName(),
}, nil
case applicationFile:
return &ReaderFile{
reader: part,
filename: f.FileName(),
abspath: part.Header.Get("abspath"),
fullpath: f.FullPath(),
}, nil
}

var err error
Expand Down
17 changes: 16 additions & 1 deletion commands/files/readerfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,30 @@ import (
"errors"
"io"
"os"
"path/filepath"
)

// ReaderFile is a implementation of File created from an `io.Reader`.
// ReaderFiles are never directories, and can be read from and closed.
type ReaderFile struct {
filename string
fullpath string
abspath string
reader io.ReadCloser
stat os.FileInfo
}

func NewReaderFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) *ReaderFile {
return &ReaderFile{filename, path, reader, stat}
return &ReaderFile{filename, path, path, reader, stat}
}

func NewReaderPathFile(filename, path string, reader io.ReadCloser, stat os.FileInfo) (*ReaderFile, error) {
abspath, err := filepath.Abs(path)
if err != nil {
return nil, err
}

return &ReaderFile{filename, path, abspath, reader, stat}, nil
}

func (f *ReaderFile) IsDirectory() bool {
Expand All @@ -35,6 +46,10 @@ func (f *ReaderFile) FullPath() string {
return f.fullpath
}

func (f *ReaderFile) AbsPath() string {
return f.abspath
}

func (f *ReaderFile) Read(p []byte) (int, error) {
return f.reader.Read(p)
}
Expand Down
3 changes: 2 additions & 1 deletion commands/files/serialfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ type serialFile struct {
}

func NewSerialFile(name, path string, hidden bool, stat os.FileInfo) (File, error) {

switch mode := stat.Mode(); {
case mode.IsRegular():
file, err := os.Open(path)
if err != nil {
return nil, err
}
return NewReaderFile(name, path, file, stat), nil
return NewReaderPathFile(name, path, file, stat)
case mode.IsDir():
// for directories, stat all of the contents first, so we know what files to
// open when NextFile() is called
Expand Down
3 changes: 3 additions & 0 deletions commands/http/multifilereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func (mfr *MultiFileReader) Read(buf []byte) (written int, err error) {
header.Set("Content-Disposition", fmt.Sprintf("file; filename=\"%s\"", filename))

header.Set("Content-Type", contentType)
if rf, ok := file.(*files.ReaderFile); ok {
header.Set("abspath", rf.AbsPath())
}

_, err := mfr.mpWriter.CreatePart(header)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
filestore "github.com/ipfs/go-ipfs/filestore"
dag "github.com/ipfs/go-ipfs/merkledag"
path "github.com/ipfs/go-ipfs/path"
pin "github.com/ipfs/go-ipfs/pin"
Expand Down Expand Up @@ -166,8 +167,8 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
TempErrFunc: isTooManyFDError,
}

var err error
bs := bstore.NewBlockstore(rds)

opts := bstore.DefaultCacheOpts()
conf, err := n.Repo.Config()
if err != nil {
Expand All @@ -184,7 +185,14 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}

n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker())
n.BaseBlocks = cbs
n.GCLocker = bstore.NewGCLocker()
n.Blockstore = bstore.NewGCBlockstore(cbs, n.GCLocker)

if conf.Experimental.FilestoreEnabled {
n.Filestore = filestore.NewFilestore(bs, n.Repo.FileManager())
n.Blockstore = bstore.NewGCBlockstore(n.Filestore, n.GCLocker)
}

rcfg, err := n.Repo.Config()
if err != nil {
Expand Down
53 changes: 38 additions & 15 deletions core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/ipfs/go-ipfs/core/coreunix"
"gx/ipfs/QmeWjRodbcZFKe5tMN7poEx3izym6osrLSnTLf9UjJZBbs/pb"

bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
blockservice "github.com/ipfs/go-ipfs/blockservice"
cmds "github.com/ipfs/go-ipfs/commands"
files "github.com/ipfs/go-ipfs/commands/files"
Expand All @@ -23,16 +24,18 @@ import (
var ErrDepthLimitExceeded = fmt.Errorf("depth limit exceeded")

const (
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
rawLeavesOptionName = "raw-leaves"
quietOptionName = "quiet"
silentOptionName = "silent"
progressOptionName = "progress"
trickleOptionName = "trickle"
wrapOptionName = "wrap-with-directory"
hiddenOptionName = "hidden"
onlyHashOptionName = "only-hash"
chunkerOptionName = "chunker"
pinOptionName = "pin"
rawLeavesOptionName = "raw-leaves"
noCopyOptionName = "nocopy"
fstoreCacheOptionName = "fscache"
)

var AddCmd = &cmds.Command{
Expand Down Expand Up @@ -78,6 +81,8 @@ You can now refer to the added file in a gateway, like so:
cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."),
cmds.BoolOption(pinOptionName, "Pin this object when adding.").Default(true),
cmds.BoolOption(rawLeavesOptionName, "Use raw blocks for leaf nodes. (experimental)"),
cmds.BoolOption(noCopyOptionName, "Add the file using filestore. (experimental)"),
cmds.BoolOption(fstoreCacheOptionName, "Check the filestore for pre-existing blocks. (experimental)"),
},
PreRun: func(req cmds.Request) error {
quiet, _, _ := req.Option(quietOptionName).Bool()
Expand Down Expand Up @@ -139,7 +144,18 @@ You can now refer to the added file in a gateway, like so:
silent, _, _ := req.Option(silentOptionName).Bool()
chunker, _, _ := req.Option(chunkerOptionName).String()
dopin, _, _ := req.Option(pinOptionName).Bool()
rawblks, _, _ := req.Option(rawLeavesOptionName).Bool()
rawblks, rbset, _ := req.Option(rawLeavesOptionName).Bool()
nocopy, _, _ := req.Option(noCopyOptionName).Bool()
fscache, _, _ := req.Option(fstoreCacheOptionName).Bool()

if nocopy && !rbset {
rawblks = true
}

if nocopy && !rawblks {
res.SetError(fmt.Errorf("nocopy option requires '--raw-leaves' to be enabled as well"), cmds.ErrNormal)
return
}

if hash {
nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{
Expand All @@ -154,14 +170,20 @@ You can now refer to the added file in a gateway, like so:
n = nilnode
}

dserv := n.DAG
addblockstore := n.Blockstore
if !(fscache || nocopy) {
addblockstore = bstore.NewGCBlockstore(n.BaseBlocks, n.GCLocker)
}

exch := n.Exchange
local, _, _ := req.Option("local").Bool()
if local {
offlineexch := offline.Exchange(n.Blockstore)
bserv := blockservice.New(n.Blockstore, offlineexch)
dserv = dag.NewDAGService(bserv)
exch = offline.Exchange(addblockstore)
}

bserv := blockservice.New(addblockstore, exch)
dserv := dag.NewDAGService(bserv)

outChan := make(chan interface{}, 8)
res.SetOutput((<-chan interface{})(outChan))

Expand All @@ -180,6 +202,7 @@ You can now refer to the added file in a gateway, like so:
fileAdder.Pin = dopin
fileAdder.Silent = silent
fileAdder.RawLeaves = rawblks
fileAdder.NoCopy = nocopy

if hash {
md := dagtest.Mock()
Expand Down
4 changes: 4 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
filestore "github.com/ipfs/go-ipfs/filestore"
mount "github.com/ipfs/go-ipfs/fuse/mount"
merkledag "github.com/ipfs/go-ipfs/merkledag"
mfs "github.com/ipfs/go-ipfs/mfs"
Expand Down Expand Up @@ -110,6 +111,9 @@ type IpfsNode struct {
// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Filestore *filestore.Filestore // the filestore blockstore
BaseBlocks bstore.Blockstore // the raw blockstore, no filestore wrapping
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Expand Down
2 changes: 2 additions & 0 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type Adder struct {
RawLeaves bool
Silent bool
Wrap bool
NoCopy bool
Chunker string
root node.Node
mr *mfs.Root
Expand All @@ -124,6 +125,7 @@ func (adder Adder) add(reader io.Reader) (node.Node, error) {
Dagserv: adder.dagService,
RawLeaves: adder.RawLeaves,
Maxlinks: ihelper.DefaultLinksPerBlock,
NoCopy: adder.NoCopy,
}

if adder.Trickle {
Expand Down
15 changes: 11 additions & 4 deletions core/coreunix/add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) {
adder.Out = make(chan interface{})
adder.Progress = true
adder.RawLeaves = rawLeaves
adder.NoCopy = true

data := make([]byte, 5*1024*1024)
rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error
Expand All @@ -210,12 +211,18 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) {
for _ = range adder.Out {
}

if bs.countAtOffsetZero != 2 {
t.Fatal("expected 2 blocks with an offset at zero (one root and one leafh), got", bs.countAtOffsetZero)
exp := 0
nonOffZero := 0
if rawLeaves {
exp = 1
nonOffZero = 19
}
if bs.countAtOffsetNonZero != 19 {
if bs.countAtOffsetZero != exp {
t.Fatalf("expected %d blocks with an offset at zero (one root and one leafh), got %d", exp, bs.countAtOffsetZero)
}
if bs.countAtOffsetNonZero != nonOffZero {
// note: the exact number will depend on the size and the sharding algo. used
t.Fatal("expected 19 blocks with an offset > 0, got", bs.countAtOffsetNonZero)
t.Fatalf("expected %d blocks with an offset > 0, got %d", nonOffZero, bs.countAtOffsetNonZero)
}
}

Expand Down
Loading