diff --git a/core/commands/add.go b/core/commands/add.go index 08395ac2120..05484e9ec07 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -6,24 +6,15 @@ import ( "os" "strings" - core "github.com/ipfs/go-ipfs/core" cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" - "github.com/ipfs/go-ipfs/core/coreunix" - filestore "github.com/ipfs/go-ipfs/filestore" - blockservice "gx/ipfs/QmNozJswSuwiZspexEHcQo5GMqpzM5exUGjNW6s4AAipUX/go-blockservice" - dag "gx/ipfs/QmTGpm48qm4fUZ9E5hMXy4ZngJUYCMKu15rTMVR3BSEnPm/go-merkledag" - dagtest "gx/ipfs/QmTGpm48qm4fUZ9E5hMXy4ZngJUYCMKu15rTMVR3BSEnPm/go-merkledag/test" - ft "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs" - - offline "gx/ipfs/QmPXcrGQQEEPswwg6YiE2WLk8qkmvncZ7zphMKKP8bXqY3/go-ipfs-exchange-offline" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash" pb "gx/ipfs/QmPtj12fdwuAqj9sBSTNUxBNu8kCGNp8b3o8yUzMm5GHpq/pb" - cidutil "gx/ipfs/QmQJSeE3CX4zos9qeaG8EhecEK9zvrTEfTG84J8C5NVRwt/go-cidutil" - mfs "gx/ipfs/QmQUjAGdPuNA9tpzrx5osWnPMhht7B5YzJNddjB45DUq2U/go-mfs" cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit" files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files" cmds "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds" - bstore "gx/ipfs/QmfUhZX9KpvJiuiziUzP2cjhRAyqHJURsPgRKn1cdDZMKa/go-ipfs-blockstore" ) // ErrDepthLimitExceeded indicates that the max depth has been exceeded. @@ -148,22 +139,10 @@ You can now check what blocks have been created by: return nil }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) - if err != nil { - return err - } - - cfg, err := n.Repo.Config() + api, err := cmdenv.GetApi(env) if err != nil { return err } - // check if repo will exceed storage limit if added - // TODO: this doesn't handle the case if the hashed file is already in blocks (deduplicated) - // TODO: conditional GC is disabled due to it is somehow not possible to pass the size to the daemon - //if err := corerepo.ConditionalGC(req.Context(), n, uint64(size)); err != nil { - // res.SetError(err, cmdkit.ErrNormal) - // return - //} progress, _ := req.Options[progressOptionName].(bool) trickle, _ := req.Options[trickleOptionName].(bool) @@ -181,131 +160,59 @@ You can now check what blocks have been created by: inline, _ := req.Options[inlineOptionName].(bool) inlineLimit, _ := req.Options[inlineLimitOptionName].(int) pathName, _ := req.Options[stdinPathName].(string) - - // The arguments are subject to the following constraints. - // - // nocopy -> filestoreEnabled - // nocopy -> rawblocks - // (hash != sha2-256) -> cidv1 - - // NOTE: 'rawblocks -> cidv1' is missing. Legacy reasons. - - // nocopy -> filestoreEnabled - if nocopy && !cfg.Experimental.FilestoreEnabled { - return cmdkit.Errorf(cmdkit.ErrClient, filestore.ErrFilestoreNotEnabled.Error()) - } - - // nocopy -> rawblocks - if nocopy && !rawblks { - // fixed? - if rbset { - return fmt.Errorf("nocopy option requires '--raw-leaves' to be enabled as well") - } - // No, satisfy mandatory constraint. - rawblks = true - } - - // (hash != "sha2-256") -> CIDv1 - if hashFunStr != "sha2-256" && cidVer == 0 { - if cidVerSet { - return cmdkit.Errorf(cmdkit.ErrClient, "CIDv0 only supports sha2-256") - } - cidVer = 1 - } - - // cidV1 -> raw blocks (by default) - if cidVer > 0 && !rbset { - rawblks = true - } - - prefix, err := dag.PrefixForCidVersion(cidVer) - if err != nil { - return err - } + local, _ := req.Options["local"].(bool) hashFunCode, ok := mh.Names[strings.ToLower(hashFunStr)] if !ok { return fmt.Errorf("unrecognized hash function: %s", strings.ToLower(hashFunStr)) } - prefix.MhType = hashFunCode - prefix.MhLength = -1 - - if hash { - nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{ - //TODO: need this to be true or all files - // hashed will be stored in memory! - NilRepo: true, - }) - if err != nil { - return err - } - n = nilnode - } + events := make(chan interface{}, adderOutChanSize) - addblockstore := n.Blockstore - if !(fscache || nocopy) { - addblockstore = bstore.NewGCBlockstore(n.BaseBlocks, n.GCLocker) - } + opts := []options.UnixfsAddOption{ + options.Unixfs.Hash(hashFunCode), - exch := n.Exchange - local, _ := req.Options["local"].(bool) - if local { - exch = offline.Exchange(addblockstore) - } + options.Unixfs.Inline(inline), + options.Unixfs.InlineLimit(inlineLimit), - bserv := blockservice.New(addblockstore, exch) // hash security 001 - dserv := dag.NewDAGService(bserv) + options.Unixfs.Chunker(chunker), - outChan := make(chan interface{}, adderOutChanSize) + options.Unixfs.Pin(dopin), + options.Unixfs.HashOnly(hash), + options.Unixfs.Local(local), + options.Unixfs.FsCache(fscache), + options.Unixfs.Nocopy(nocopy), - fileAdder, err := coreunix.NewAdder(req.Context, n.Pinning, n.Blockstore, dserv) - if err != nil { - return err + options.Unixfs.Wrap(wrap), + options.Unixfs.Hidden(hidden), + options.Unixfs.StdinName(pathName), + + options.Unixfs.Progress(progress), + options.Unixfs.Silent(silent), + options.Unixfs.Events(events), } - fileAdder.Out = outChan - fileAdder.Chunker = chunker - fileAdder.Progress = progress - fileAdder.Hidden = hidden - fileAdder.Trickle = trickle - fileAdder.Wrap = wrap - fileAdder.Pin = dopin && !hash - fileAdder.Silent = silent - fileAdder.RawLeaves = rawblks - fileAdder.NoCopy = nocopy - fileAdder.Name = pathName - fileAdder.CidBuilder = prefix - - if inline { - fileAdder.CidBuilder = cidutil.InlineBuilder{ - Builder: fileAdder.CidBuilder, - Limit: inlineLimit, - } + if cidVerSet { + opts = append(opts, options.Unixfs.CidVersion(cidVer)) } - if hash { - md := dagtest.Mock() - emptyDirNode := ft.EmptyDirNode() - // Use the same prefix for the "empty" MFS root as for the file adder. - emptyDirNode.SetCidBuilder(fileAdder.CidBuilder) - mr, err := mfs.NewRoot(req.Context, md, emptyDirNode, nil) - if err != nil { - return err - } + if rbset { + opts = append(opts, options.Unixfs.RawLeaves(rawblks)) + } - fileAdder.SetMfsRoot(mr) + if trickle { + opts = append(opts, options.Unixfs.Layout(options.TrickleLayout)) } errCh := make(chan error) go func() { var err error defer func() { errCh <- err }() - defer close(outChan) - err = fileAdder.AddAllAndPin(req.Files) + defer close(events) + _, err = api.Unixfs().Add(req.Context, req.Files, opts...) }() - err = res.Emit(outChan) + err = res.Emit(events) if err != nil { return err } @@ -371,7 +278,7 @@ You can now check what blocks have been created by: break LOOP } - output := out.(*coreunix.AddedObject) + output := out.(*coreiface.AddEvent) if len(output.Hash) > 0 { lastHash = output.Hash if quieter { @@ -451,5 +358,5 @@ You can now check what blocks have been created by: } }, }, - Type: coreunix.AddedObject{}, + Type: coreiface.AddEvent{}, } diff --git a/core/commands/tar.go b/core/commands/tar.go index 84472e0a543..ddf3b2ff39d 100644 --- a/core/commands/tar.go +++ b/core/commands/tar.go @@ -7,7 +7,7 @@ import ( cmds "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" e "github.com/ipfs/go-ipfs/core/commands/e" - "github.com/ipfs/go-ipfs/core/coreunix" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" tar "github.com/ipfs/go-ipfs/tar" dag "gx/ipfs/QmTGpm48qm4fUZ9E5hMXy4ZngJUYCMKu15rTMVR3BSEnPm/go-merkledag" path "gx/ipfs/QmV4QxScV9Y7LbaWhHazFfRd8uyeUd4pAH8a7fFFbi5odJ/go-path" @@ -60,12 +60,12 @@ represent it. c := node.Cid() fi.FileName() - res.SetOutput(&coreunix.AddedObject{ + res.SetOutput(&coreiface.AddEvent{ Name: fi.FileName(), Hash: c.String(), }) }, - Type: coreunix.AddedObject{}, + Type: coreiface.AddEvent{}, Marshalers: cmds.MarshalerMap{ cmds.Text: func(res cmds.Response) (io.Reader, error) { v, err := unwrapOutput(res.Output()) @@ -73,7 +73,7 @@ represent it. return nil, err } - o, ok := v.(*coreunix.AddedObject) + o, ok := v.(*coreiface.AddEvent) if !ok { return nil, e.TypeErr(o, v) } diff --git a/core/coreapi/interface/options/unixfs.go b/core/coreapi/interface/options/unixfs.go new file mode 100644 index 00000000000..6a54b2d399d --- /dev/null +++ b/core/coreapi/interface/options/unixfs.go @@ -0,0 +1,304 @@ +package options + +import ( + "errors" + "fmt" + + cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" + mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash" + dag "gx/ipfs/QmTGpm48qm4fUZ9E5hMXy4ZngJUYCMKu15rTMVR3BSEnPm/go-merkledag" +) + +type Layout int + +const ( + BalancedLayout Layout = iota + TrickleLayout +) + +type UnixfsAddSettings struct { + CidVersion int + MhType uint64 + + Inline bool + InlineLimit int + RawLeaves bool + RawLeavesSet bool + + Chunker string + Layout Layout + + Pin bool + OnlyHash bool + Local bool + FsCache bool + NoCopy bool + + Wrap bool + Hidden bool + StdinName string + + Events chan<- interface{} + Silent bool + Progress bool +} + +type UnixfsAddOption func(*UnixfsAddSettings) error + +func UnixfsAddOptions(opts ...UnixfsAddOption) (*UnixfsAddSettings, cid.Prefix, error) { + options := &UnixfsAddSettings{ + CidVersion: -1, + MhType: mh.SHA2_256, + + Inline: false, + InlineLimit: 32, + RawLeaves: false, + RawLeavesSet: false, + + Chunker: "size-262144", + Layout: BalancedLayout, + + Pin: false, + OnlyHash: false, + Local: false, + FsCache: false, + NoCopy: false, + + Wrap: false, + Hidden: false, + StdinName: "", + + Events: nil, + Silent: false, + Progress: false, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, cid.Prefix{}, err + } + } + + // nocopy -> rawblocks + if options.NoCopy && !options.RawLeaves { + // fixed? + if options.RawLeavesSet { + return nil, cid.Prefix{}, fmt.Errorf("nocopy option requires '--raw-leaves' to be enabled as well") + } + + // No, satisfy mandatory constraint. + options.RawLeaves = true + } + + // (hash != "sha2-256") -> CIDv1 + if options.MhType != mh.SHA2_256 { + switch options.CidVersion { + case 0: + return nil, cid.Prefix{}, errors.New("CIDv0 only supports sha2-256") + case 1, -1: + options.CidVersion = 1 + default: + return nil, cid.Prefix{}, fmt.Errorf("unknown CID version: %d", options.CidVersion) + } + } else { + if options.CidVersion < 0 { + // Default to CIDv0 + options.CidVersion = 0 + } + } + + // cidV1 -> raw blocks (by default) + if options.CidVersion > 0 && !options.RawLeavesSet { + options.RawLeaves = true + } + + prefix, err := dag.PrefixForCidVersion(options.CidVersion) + if err != nil { + return nil, cid.Prefix{}, err + } + + prefix.MhType = options.MhType + prefix.MhLength = -1 + + return options, prefix, nil +} + +type unixfsOpts struct{} + +var Unixfs unixfsOpts + +// CidVersion specifies which CID version to use. Defaults to 0 unless an option +// that depends on CIDv1 is passed. +func (unixfsOpts) CidVersion(version int) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.CidVersion = version + return nil + } +} + +// Hash function to use. Implies CIDv1 if not set to sha2-256 (default). +// +// Table of functions is declared in https://github.com/multiformats/go-multihash/blob/master/multihash.go +func (unixfsOpts) Hash(mhtype uint64) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.MhType = mhtype + return nil + } +} + +// RawLeaves specifies whether to use raw blocks for leaves (data nodes with no +// links) instead of wrapping them with unixfs structures. +func (unixfsOpts) RawLeaves(enable bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.RawLeaves = enable + settings.RawLeavesSet = true + return nil + } +} + +// Inline tells the adder to inline small blocks into CIDs +func (unixfsOpts) Inline(enable bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Inline = enable + return nil + } +} + +// InlineLimit sets the amount of bytes below which blocks will be encoded +// directly into CID instead of being stored and addressed by it's hash. +// Specifying this option won't enable block inlining. For that use `Inline` +// option. Default: 32 bytes +// +// Note that while there is no hard limit on the number of bytes, it should be +// kept at a reasonably low value, such as 64; implementations may choose to +// reject anything larger. +func (unixfsOpts) InlineLimit(limit int) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.InlineLimit = limit + return nil + } +} + +// Chunker specifies settings for the chunking algorithm to use. +// +// Default: size-262144, formats: +// size-[bytes] - Simple chunker splitting data into blocks of n bytes +// rabin-[min]-[avg]-[max] - Rabin chunker +func (unixfsOpts) Chunker(chunker string) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Chunker = chunker + return nil + } +} + +// Layout tells the adder how to balance data between leaves. +// options.BalancedLayout is the default, it's optimized for static seekable +// files. +// options.TrickleLayout is optimized for streaming data, +func (unixfsOpts) Layout(layout Layout) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Layout = layout + return nil + } +} + +// Pin tells the adder to pin the file root recursively after adding +func (unixfsOpts) Pin(pin bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Pin = pin + return nil + } +} + +// HashOnly will make the adder calculate data hash without storing it in the +// blockstore or announcing it to the network +func (unixfsOpts) HashOnly(hashOnly bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.OnlyHash = hashOnly + return nil + } +} + +// Local will add the data to blockstore without announcing it to the network +// +// Note that this doesn't prevent other nodes from getting this data +func (unixfsOpts) Local(local bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Local = local + return nil + } +} + +// Wrap tells the adder to wrap the added file structure with an additional +// directory. +func (unixfsOpts) Wrap(wrap bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Wrap = wrap + return nil + } +} + +// Hidden enables adding of hidden files (files prefixed with '.') +func (unixfsOpts) Hidden(hidden bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Hidden = hidden + return nil + } +} + +// StdinName is the name set for files which don specify FilePath as +// os.Stdin.Name() +func (unixfsOpts) StdinName(name string) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.StdinName = name + return nil + } +} + +// Events specifies channel which will be used to report events about ongoing +// Add operation. +// +// Note that if this channel blocks it may slowdown the adder +func (unixfsOpts) Events(sink chan<- interface{}) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Events = sink + return nil + } +} + +// Silent reduces event output +func (unixfsOpts) Silent(silent bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Silent = silent + return nil + } +} + +// Progress tells the adder whether to enable progress events +func (unixfsOpts) Progress(enable bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Progress = enable + return nil + } +} + +// FsCache tells the adder to check the filestore for pre-existing blocks +// +// Experimental +func (unixfsOpts) FsCache(enable bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.FsCache = enable + return nil + } +} + +// NoCopy tells the adder to add the files using filestore. Implies RawLeaves. +// +// Experimental +func (unixfsOpts) Nocopy(enable bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.NoCopy = enable + return nil + } +} diff --git a/core/coreapi/interface/unixfs.go b/core/coreapi/interface/unixfs.go index 4a3aff6fc53..c622e210e58 100644 --- a/core/coreapi/interface/unixfs.go +++ b/core/coreapi/interface/unixfs.go @@ -2,17 +2,37 @@ package iface import ( "context" - "io" + options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + + files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files" ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" ) +// TODO: ideas on making this more coreapi-ish without breaking the http API? +type AddEvent struct { + Name string + Hash string `json:",omitempty"` + Bytes int64 `json:",omitempty"` + Size string `json:",omitempty"` +} + // UnixfsAPI is the basic interface to immutable files in IPFS +// NOTE: This API is heavily WIP, things are guaranteed to break frequently type UnixfsAPI interface { // Add imports the data from the reader into merkledag file - Add(context.Context, io.Reader) (ResolvedPath, error) + // + // TODO: a long useful comment on how to use this for many different scenarios + Add(context.Context, files.File, ...options.UnixfsAddOption) (ResolvedPath, error) + + // Get returns a read-only handle to a file tree referenced by a path + // + // Note that some implementations of this API may apply the specified context + // to operations performed on the returned file + Get(context.Context, Path) (files.File, error) // Cat returns a reader for the file + // TODO: Remove in favour of Get (if we use Get on a file we still have reader directly, so..) Cat(context.Context, Path) (Reader, error) // Ls returns the list of links in a directory diff --git a/core/coreapi/name_test.go b/core/coreapi/name_test.go index 419a16781be..41d286ef888 100644 --- a/core/coreapi/name_test.go +++ b/core/coreapi/name_test.go @@ -3,10 +3,12 @@ package coreapi_test import ( "context" "io" + "io/ioutil" "math/rand" "testing" "time" + files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files" ipath "gx/ipfs/QmV4QxScV9Y7LbaWhHazFfRd8uyeUd4pAH8a7fFFbi5odJ/go-path" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" @@ -16,7 +18,7 @@ import ( var rnd = rand.New(rand.NewSource(0x62796532303137)) func addTestObject(ctx context.Context, api coreiface.CoreAPI) (coreiface.Path, error) { - return api.Unixfs().Add(ctx, &io.LimitedReader{R: rnd, N: 4092}) + return api.Unixfs().Add(ctx, files.NewReaderFile("", "", ioutil.NopCloser(&io.LimitedReader{R: rnd, N: 4092}), nil)) } func TestBasicPublishResolve(t *testing.T) { diff --git a/core/coreapi/pin_test.go b/core/coreapi/pin_test.go index 9bbf16c9cfa..a9a7547c3db 100644 --- a/core/coreapi/pin_test.go +++ b/core/coreapi/pin_test.go @@ -15,7 +15,7 @@ func TestPinAdd(t *testing.T) { t.Error(err) } - p, err := api.Unixfs().Add(ctx, strings.NewReader("foo")) + p, err := api.Unixfs().Add(ctx, strFile("foo")()) if err != nil { t.Error(err) } @@ -33,7 +33,7 @@ func TestPinSimple(t *testing.T) { t.Error(err) } - p, err := api.Unixfs().Add(ctx, strings.NewReader("foo")) + p, err := api.Unixfs().Add(ctx, strFile("foo")()) if err != nil { t.Error(err) } @@ -82,12 +82,12 @@ func TestPinRecursive(t *testing.T) { t.Error(err) } - p0, err := api.Unixfs().Add(ctx, strings.NewReader("foo")) + p0, err := api.Unixfs().Add(ctx, strFile("foo")()) if err != nil { t.Error(err) } - p1, err := api.Unixfs().Add(ctx, strings.NewReader("bar")) + p1, err := api.Unixfs().Add(ctx, strFile("bar")()) if err != nil { t.Error(err) } diff --git a/core/coreapi/unixfile.go b/core/coreapi/unixfile.go new file mode 100644 index 00000000000..85935221e14 --- /dev/null +++ b/core/coreapi/unixfile.go @@ -0,0 +1,190 @@ +package coreapi + +import ( + "context" + "errors" + "io" + "os" + gopath "path" + "time" + + files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files" + dag "gx/ipfs/QmTGpm48qm4fUZ9E5hMXy4ZngJUYCMKu15rTMVR3BSEnPm/go-merkledag" + ft "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs" + uio "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs/io" + ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" +) + +// Number to file to prefetch in directories +// TODO: should we allow setting this via context hint? +const prefetchFiles = 4 + +// TODO: this probably belongs in go-unixfs (and could probably replace a chunk of it's interface in the long run) + +type sizeInfo struct { + size int64 + name string + modTime time.Time +} + +func (s *sizeInfo) Name() string { + return s.name +} + +func (s *sizeInfo) Size() int64 { + return s.size +} + +func (s *sizeInfo) Mode() os.FileMode { + return 0444 // all read +} + +func (s *sizeInfo) ModTime() time.Time { + return s.modTime +} + +func (s *sizeInfo) IsDir() bool { + return false +} + +func (s *sizeInfo) Sys() interface{} { + return nil +} + +type ufsDirectory struct { + ctx context.Context + dserv ipld.DAGService + + files chan *ipld.Link + + name string + path string +} + +func (d *ufsDirectory) Close() error { + return files.ErrNotReader +} + +func (d *ufsDirectory) Read(_ []byte) (int, error) { + return 0, files.ErrNotReader +} + +func (d *ufsDirectory) FileName() string { + return d.name +} + +func (d *ufsDirectory) FullPath() string { + return d.path +} + +func (d *ufsDirectory) IsDirectory() bool { + return true +} + +func (d *ufsDirectory) NextFile() (files.File, error) { + l, ok := <-d.files + if !ok { + return nil, io.EOF + } + + nd, err := l.GetNode(d.ctx, d.dserv) + if err != nil { + return nil, err + } + + return newUnixfsFile(d.ctx, d.dserv, nd, l.Name, d) +} + +type ufsFile struct { + uio.DagReader + + name string + path string +} + +func (f *ufsFile) IsDirectory() bool { + return false +} + +func (f *ufsFile) NextFile() (files.File, error) { + return nil, files.ErrNotDirectory +} + +func (f *ufsFile) FileName() string { + return f.name +} + +func (f *ufsFile) FullPath() string { + return f.path +} + +func (f *ufsFile) Size() (int64, error) { + return int64(f.DagReader.Size()), nil +} + +func newUnixfsDir(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name string, path string) (files.File, error) { + dir, err := uio.NewDirectoryFromNode(dserv, nd) + if err != nil { + return nil, err + } + + fileCh := make(chan *ipld.Link, prefetchFiles) + go func() { + dir.ForEachLink(ctx, func(link *ipld.Link) error { + select { + case fileCh <- link: + case <-ctx.Done(): + return ctx.Err() + } + return nil + }) + + close(fileCh) + }() + + return &ufsDirectory{ + ctx: ctx, + dserv: dserv, + + files: fileCh, + + name: name, + path: path, + }, nil +} + +func newUnixfsFile(ctx context.Context, dserv ipld.DAGService, nd ipld.Node, name string, parent files.File) (files.File, error) { + path := name + if parent != nil { + path = gopath.Join(parent.FullPath(), name) + } + + switch dn := nd.(type) { + case *dag.ProtoNode: + fsn, err := ft.FSNodeFromBytes(dn.Data()) + if err != nil { + return nil, err + } + if fsn.IsDir() { + return newUnixfsDir(ctx, dserv, nd, name, path) + } + + case *dag.RawNode: + default: + return nil, errors.New("unknown node type") + } + + dr, err := uio.NewDagReader(ctx, nd, dserv) + if err != nil { + return nil, err + } + + return &ufsFile{ + DagReader: dr, + + name: name, + path: path, + }, nil +} + +var _ os.FileInfo = &sizeInfo{} diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index be80b11dae9..51646e1fcfe 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -2,30 +2,143 @@ package coreapi import ( "context" - "io" + "fmt" + "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/filestore" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" - coreunix "github.com/ipfs/go-ipfs/core/coreunix" - uio "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs/io" + "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + "github.com/ipfs/go-ipfs/core/coreunix" - cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" + blockservice "gx/ipfs/QmNozJswSuwiZspexEHcQo5GMqpzM5exUGjNW6s4AAipUX/go-blockservice" + offline "gx/ipfs/QmPXcrGQQEEPswwg6YiE2WLk8qkmvncZ7zphMKKP8bXqY3/go-ipfs-exchange-offline" + cidutil "gx/ipfs/QmQJSeE3CX4zos9qeaG8EhecEK9zvrTEfTG84J8C5NVRwt/go-cidutil" + mfs "gx/ipfs/QmQUjAGdPuNA9tpzrx5osWnPMhht7B5YzJNddjB45DUq2U/go-mfs" + files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files" + dag "gx/ipfs/QmTGpm48qm4fUZ9E5hMXy4ZngJUYCMKu15rTMVR3BSEnPm/go-merkledag" + dagtest "gx/ipfs/QmTGpm48qm4fUZ9E5hMXy4ZngJUYCMKu15rTMVR3BSEnPm/go-merkledag/test" + ft "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs" + uio "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs/io" ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" + bstore "gx/ipfs/QmfUhZX9KpvJiuiziUzP2cjhRAyqHJURsPgRKn1cdDZMKa/go-ipfs-blockstore" ) type UnixfsAPI CoreAPI // Add builds a merkledag node from a reader, adds it to the blockstore, // and returns the key representing that node. -func (api *UnixfsAPI) Add(ctx context.Context, r io.Reader) (coreiface.ResolvedPath, error) { - k, err := coreunix.AddWithContext(ctx, api.node, r) +func (api *UnixfsAPI) Add(ctx context.Context, files files.File, opts ...options.UnixfsAddOption) (coreiface.ResolvedPath, error) { + settings, prefix, err := options.UnixfsAddOptions(opts...) if err != nil { return nil, err } - c, err := cid.Decode(k) + + n := api.node + + cfg, err := n.Repo.Config() if err != nil { return nil, err } - return coreiface.IpfsPath(c), nil + + // check if repo will exceed storage limit if added + // TODO: this doesn't handle the case if the hashed file is already in blocks (deduplicated) + // TODO: conditional GC is disabled due to it is somehow not possible to pass the size to the daemon + //if err := corerepo.ConditionalGC(req.Context(), n, uint64(size)); err != nil { + // res.SetError(err, cmdkit.ErrNormal) + // return + //} + + if settings.NoCopy && !cfg.Experimental.FilestoreEnabled { + return nil, filestore.ErrFilestoreNotEnabled + } + + if settings.OnlyHash { + nilnode, err := core.NewNode(ctx, &core.BuildCfg{ + //TODO: need this to be true or all files + // hashed will be stored in memory! + NilRepo: true, + }) + if err != nil { + return nil, err + } + n = nilnode + } + + addblockstore := n.Blockstore + if !(settings.FsCache || settings.NoCopy) { + addblockstore = bstore.NewGCBlockstore(n.BaseBlocks, n.GCLocker) + } + + exch := n.Exchange + if settings.Local { + exch = offline.Exchange(addblockstore) + } + + bserv := blockservice.New(addblockstore, exch) // hash security 001 + dserv := dag.NewDAGService(bserv) + + fileAdder, err := coreunix.NewAdder(ctx, n.Pinning, n.Blockstore, dserv) + if err != nil { + return nil, err + } + + fileAdder.Chunker = settings.Chunker + if settings.Events != nil { + fileAdder.Out = settings.Events + fileAdder.Progress = settings.Progress + } + fileAdder.Hidden = settings.Hidden + fileAdder.Wrap = settings.Wrap + fileAdder.Pin = settings.Pin && !settings.OnlyHash + fileAdder.Silent = settings.Silent + fileAdder.RawLeaves = settings.RawLeaves + fileAdder.NoCopy = settings.NoCopy + fileAdder.Name = settings.StdinName + fileAdder.CidBuilder = prefix + + switch settings.Layout { + case options.BalancedLayout: + // Default + case options.TrickleLayout: + fileAdder.Trickle = true + default: + return nil, fmt.Errorf("unknown layout: %d", settings.Layout) + } + + if settings.Inline { + fileAdder.CidBuilder = cidutil.InlineBuilder{ + Builder: fileAdder.CidBuilder, + Limit: settings.InlineLimit, + } + } + + if settings.OnlyHash { + md := dagtest.Mock() + emptyDirNode := ft.EmptyDirNode() + // Use the same prefix for the "empty" MFS root as for the file adder. + emptyDirNode.SetCidBuilder(fileAdder.CidBuilder) + mr, err := mfs.NewRoot(ctx, md, emptyDirNode, nil) + if err != nil { + return nil, err + } + + fileAdder.SetMfsRoot(mr) + } + + nd, err := fileAdder.AddAllAndPin(files) + if err != nil { + return nil, err + } + return coreiface.IpfsPath(nd.Cid()), nil +} + +func (api *UnixfsAPI) Get(ctx context.Context, p coreiface.Path) (files.File, error) { + nd, err := api.core().ResolveNode(ctx, p) + if err != nil { + return nil, err + } + + return newUnixfsFile(ctx, api.node.DAG, nd, "", nil) } // Cat returns the data contained by an IPFS or IPNS object(s) at path `p`. diff --git a/core/coreapi/unixfs_test.go b/core/coreapi/unixfs_test.go index 6281c385619..cabd6775a20 100644 --- a/core/coreapi/unixfs_test.go +++ b/core/coreapi/unixfs_test.go @@ -6,22 +6,28 @@ import ( "encoding/base64" "fmt" "io" + "io/ioutil" "math" + "os" + "strconv" "strings" + "sync" "testing" - core "github.com/ipfs/go-ipfs/core" - coreapi "github.com/ipfs/go-ipfs/core/coreapi" + "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/coreapi" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" - options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - coreunix "github.com/ipfs/go-ipfs/core/coreunix" + "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + "github.com/ipfs/go-ipfs/core/coreunix" mock "github.com/ipfs/go-ipfs/core/mock" - keystore "github.com/ipfs/go-ipfs/keystore" - repo "github.com/ipfs/go-ipfs/repo" + "github.com/ipfs/go-ipfs/keystore" + "github.com/ipfs/go-ipfs/repo" mocknet "gx/ipfs/QmNmj2AeM46ZQqHARnWidb5qqHoZJFeYWzmG65jviJDRQY/go-libp2p/p2p/net/mock" + mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash" ci "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto" pstore "gx/ipfs/QmSJ36wcYQyEViJUWUEhJU81tw1KdakTKqLLHbvYbA9zDv/go-libp2p-peerstore" + files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files" config "gx/ipfs/QmSoYrBMibm2T3LupaLuez7LPGnyrJwdRxvTfPUyCp691u/go-ipfs-config" cbor "gx/ipfs/QmSywXfm2v4Qkp4DcFqo8eehj49dJK3bdUnaLVxrdFLMQn/go-ipld-cbor" mdag "gx/ipfs/QmTGpm48qm4fUZ9E5hMXy4ZngJUYCMKu15rTMVR3BSEnPm/go-merkledag" @@ -125,6 +131,37 @@ func makeAPI(ctx context.Context) (*core.IpfsNode, coreiface.CoreAPI, error) { return nd[0], api[0], nil } +func strFile(data string) func() files.File { + return func() files.File { + return files.NewReaderFile("", "", ioutil.NopCloser(strings.NewReader(data)), nil) + } +} + +func twoLevelDir() func() files.File { + return func() files.File { + return files.NewSliceFile("t", "t", []files.File{ + files.NewSliceFile("t/abc", "t/abc", []files.File{ + files.NewReaderFile("t/abc/def", "t/abc/def", ioutil.NopCloser(strings.NewReader("world")), nil), + }), + files.NewReaderFile("t/bar", "t/bar", ioutil.NopCloser(strings.NewReader("hello2")), nil), + files.NewReaderFile("t/foo", "t/foo", ioutil.NopCloser(strings.NewReader("hello1")), nil), + }) + } +} + +func flatDir() files.File { + return files.NewSliceFile("t", "t", []files.File{ + files.NewReaderFile("t/bar", "t/bar", ioutil.NopCloser(strings.NewReader("hello2")), nil), + files.NewReaderFile("t/foo", "t/foo", ioutil.NopCloser(strings.NewReader("hello1")), nil), + }) +} + +func wrapped(f files.File) files.File { + return files.NewSliceFile("", "", []files.File{ + f, + }) +} + func TestAdd(t *testing.T) { ctx := context.Background() _, api, err := makeAPI(ctx) @@ -132,84 +169,454 @@ func TestAdd(t *testing.T) { t.Error(err) } - str := strings.NewReader(helloStr) - p, err := api.Unixfs().Add(ctx, str) - if err != nil { - t.Error(err) - } + cases := []struct { + name string + data func() files.File + expect func(files.File) files.File - if p.String() != hello { - t.Fatalf("expected path %s, got: %s", hello, p) - } + path string + err string - r, err := api.Unixfs().Cat(ctx, p) - if err != nil { - t.Fatal(err) - } - buf := make([]byte, len(helloStr)) - _, err = io.ReadFull(r, buf) - if err != nil { - t.Error(err) + recursive bool + + events []coreiface.AddEvent + + opts []options.UnixfsAddOption + }{ + // Simple cases + { + name: "simpleAdd", + data: strFile(helloStr), + path: hello, + opts: []options.UnixfsAddOption{}, + }, + { + name: "addEmpty", + data: strFile(""), + path: emptyFile, + }, + // CIDv1 version / rawLeaves + { + name: "addCidV1", + data: strFile(helloStr), + path: "/ipfs/zb2rhdhmJjJZs9qkhQCpCQ7VREFkqWw3h1r8utjVvQugwHPFd", + opts: []options.UnixfsAddOption{options.Unixfs.CidVersion(1)}, + }, + { + name: "addCidV1NoLeaves", + data: strFile(helloStr), + path: "/ipfs/zdj7WY4GbN8NDbTW1dfCShAQNVovams2xhq9hVCx5vXcjvT8g", + opts: []options.UnixfsAddOption{options.Unixfs.CidVersion(1), options.Unixfs.RawLeaves(false)}, + }, + // Non sha256 hash vs CID + { + name: "addCidSha3", + data: strFile(helloStr), + path: "/ipfs/zb2wwnYtXBxpndNABjtYxWAPt3cwWNRnc11iT63fvkYV78iRb", + opts: []options.UnixfsAddOption{options.Unixfs.Hash(mh.SHA3_256)}, + }, + { + name: "addCidSha3Cid0", + data: strFile(helloStr), + err: "CIDv0 only supports sha2-256", + opts: []options.UnixfsAddOption{options.Unixfs.CidVersion(0), options.Unixfs.Hash(mh.SHA3_256)}, + }, + // Inline + { + name: "addInline", + data: strFile(helloStr), + path: "/ipfs/zaYomJdLndMku8P9LHngHB5w2CQ7NenLbv", + opts: []options.UnixfsAddOption{options.Unixfs.Inline(true)}, + }, + { + name: "addInlineLimit", + data: strFile(helloStr), + path: "/ipfs/zaYomJdLndMku8P9LHngHB5w2CQ7NenLbv", + opts: []options.UnixfsAddOption{options.Unixfs.InlineLimit(32), options.Unixfs.Inline(true)}, + }, + { + name: "addInlineZero", + data: strFile(""), + path: "/ipfs/z2yYDV", + opts: []options.UnixfsAddOption{options.Unixfs.InlineLimit(0), options.Unixfs.Inline(true), options.Unixfs.RawLeaves(true)}, + }, + { //TODO: after coreapi add is used in `ipfs add`, consider making this default for inline + name: "addInlineRaw", + data: strFile(helloStr), + path: "/ipfs/zj7Gr8AcBreqGEfrnR5kPFe", + opts: []options.UnixfsAddOption{options.Unixfs.InlineLimit(32), options.Unixfs.Inline(true), options.Unixfs.RawLeaves(true)}, + }, + // Chunker / Layout + { + name: "addChunks", + data: strFile(strings.Repeat("aoeuidhtns", 200)), + path: "/ipfs/QmRo11d4QJrST47aaiGVJYwPhoNA4ihRpJ5WaxBWjWDwbX", + opts: []options.UnixfsAddOption{options.Unixfs.Chunker("size-4")}, + }, + { + name: "addChunksTrickle", + data: strFile(strings.Repeat("aoeuidhtns", 200)), + path: "/ipfs/QmNNhDGttafX3M1wKWixGre6PrLFGjnoPEDXjBYpTv93HP", + opts: []options.UnixfsAddOption{options.Unixfs.Chunker("size-4"), options.Unixfs.Layout(options.TrickleLayout)}, + }, + // Local + { + name: "addLocal", // better cases in sharness + data: strFile(helloStr), + path: hello, + opts: []options.UnixfsAddOption{options.Unixfs.Local(true)}, + }, + { + name: "hashOnly", // test (non)fetchability + data: strFile(helloStr), + path: hello, + opts: []options.UnixfsAddOption{options.Unixfs.HashOnly(true)}, + }, + // multi file + { + name: "simpleDir", + data: flatDir, + recursive: true, + path: "/ipfs/QmRKGpFfR32FVXdvJiHfo4WJ5TDYBsM1P9raAp1p6APWSp", + }, + { + name: "twoLevelDir", + data: twoLevelDir(), + recursive: true, + path: "/ipfs/QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr", + }, + // wrapped + { + name: "addWrapped", + path: "/ipfs/QmVE9rNpj5doj7XHzp5zMUxD7BJgXEqx4pe3xZ3JBReWHE", + data: func() files.File { + return files.NewReaderFile("foo", "foo", ioutil.NopCloser(strings.NewReader(helloStr)), nil) + }, + expect: wrapped, + opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true)}, + }, + { + name: "stdinWrapped", + path: "/ipfs/QmU3r81oZycjHS9oaSHw37ootMFuFUw1DvMLKXPsezdtqU", + data: func() files.File { + return files.NewReaderFile("", os.Stdin.Name(), ioutil.NopCloser(strings.NewReader(helloStr)), nil) + }, + expect: func(files.File) files.File { + return files.NewSliceFile("", "", []files.File{ + files.NewReaderFile("QmQy2Dw4Wk7rdJKjThjYXzfFJNaRKRHhHP5gHHXroJMYxk", "QmQy2Dw4Wk7rdJKjThjYXzfFJNaRKRHhHP5gHHXroJMYxk", ioutil.NopCloser(strings.NewReader(helloStr)), nil), + }) + }, + opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true)}, + }, + { + name: "stdinNamed", + path: "/ipfs/QmQ6cGBmb3ZbdrQW1MRm1RJnYnaxCqfssz7CrTa9NEhQyS", + data: func() files.File { + return files.NewReaderFile("", os.Stdin.Name(), ioutil.NopCloser(strings.NewReader(helloStr)), nil) + }, + expect: func(files.File) files.File { + return files.NewSliceFile("", "", []files.File{ + files.NewReaderFile("test", "test", ioutil.NopCloser(strings.NewReader(helloStr)), nil), + }) + }, + opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true), options.Unixfs.StdinName("test")}, + }, + { + name: "twoLevelDirWrapped", + data: twoLevelDir(), + recursive: true, + expect: wrapped, + path: "/ipfs/QmPwsL3T5sWhDmmAWZHAzyjKtMVDS9a11aHNRqb3xoVnmg", + opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true)}, + }, + { + name: "twoLevelInlineHash", + data: twoLevelDir(), + recursive: true, + expect: wrapped, + path: "/ipfs/zBunoruKoyCHKkALNSWxDvj4L7yuQnMgQ4hUa9j1Z64tVcDEcu6Zdetyu7eeFCxMPfxb7YJvHeFHoFoHMkBUQf6vfdhmi", + opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true), options.Unixfs.Inline(true), options.Unixfs.RawLeaves(true), options.Unixfs.Hash(mh.SHA3)}, + }, + // hidden + { + name: "hiddenFiles", + data: func() files.File { + return files.NewSliceFile("t", "t", []files.File{ + files.NewReaderFile("t/.bar", "t/.bar", ioutil.NopCloser(strings.NewReader("hello2")), nil), + files.NewReaderFile("t/bar", "t/bar", ioutil.NopCloser(strings.NewReader("hello2")), nil), + files.NewReaderFile("t/foo", "t/foo", ioutil.NopCloser(strings.NewReader("hello1")), nil), + }) + }, + recursive: true, + path: "/ipfs/QmehGvpf2hY196MzDFmjL8Wy27S4jbgGDUAhBJyvXAwr3g", + opts: []options.UnixfsAddOption{options.Unixfs.Hidden(true)}, + }, + { + name: "hiddenFileAlwaysAdded", + data: func() files.File { + return files.NewReaderFile(".foo", ".foo", ioutil.NopCloser(strings.NewReader(helloStr)), nil) + }, + recursive: true, + path: hello, + }, + { + name: "hiddenFilesNotAdded", + data: func() files.File { + return files.NewSliceFile("t", "t", []files.File{ + files.NewReaderFile("t/.bar", "t/.bar", ioutil.NopCloser(strings.NewReader("hello2")), nil), + files.NewReaderFile("t/bar", "t/bar", ioutil.NopCloser(strings.NewReader("hello2")), nil), + files.NewReaderFile("t/foo", "t/foo", ioutil.NopCloser(strings.NewReader("hello1")), nil), + }) + }, + expect: func(files.File) files.File { + return flatDir() + }, + recursive: true, + path: "/ipfs/QmRKGpFfR32FVXdvJiHfo4WJ5TDYBsM1P9raAp1p6APWSp", + opts: []options.UnixfsAddOption{options.Unixfs.Hidden(false)}, + }, + // Events / Progress + { + name: "simpleAddEvent", + data: strFile(helloStr), + path: "/ipfs/zb2rhdhmJjJZs9qkhQCpCQ7VREFkqWw3h1r8utjVvQugwHPFd", + events: []coreiface.AddEvent{ + {Name: "zb2rhdhmJjJZs9qkhQCpCQ7VREFkqWw3h1r8utjVvQugwHPFd", Hash: "zb2rhdhmJjJZs9qkhQCpCQ7VREFkqWw3h1r8utjVvQugwHPFd", Size: strconv.Itoa(len(helloStr))}, + }, + opts: []options.UnixfsAddOption{options.Unixfs.RawLeaves(true)}, + }, + { + name: "silentAddEvent", + data: twoLevelDir(), + path: "/ipfs/QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr", + events: []coreiface.AddEvent{ + {Name: "t/abc", Hash: "QmU7nuGs2djqK99UNsNgEPGh6GV4662p6WtsgccBNGTDxt", Size: "62"}, + {Name: "t", Hash: "QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr", Size: "229"}, + }, + recursive: true, + opts: []options.UnixfsAddOption{options.Unixfs.Silent(true)}, + }, + { + name: "dirAddEvents", + data: twoLevelDir(), + path: "/ipfs/QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr", + events: []coreiface.AddEvent{ + {Name: "t/abc/def", Hash: "QmNyJpQkU1cEkBwMDhDNFstr42q55mqG5GE5Mgwug4xyGk", Size: "13"}, + {Name: "t/bar", Hash: "QmS21GuXiRMvJKHos4ZkEmQDmRBqRaF5tQS2CQCu2ne9sY", Size: "14"}, + {Name: "t/foo", Hash: "QmfAjGiVpTN56TXi6SBQtstit5BEw3sijKj1Qkxn6EXKzJ", Size: "14"}, + {Name: "t/abc", Hash: "QmU7nuGs2djqK99UNsNgEPGh6GV4662p6WtsgccBNGTDxt", Size: "62"}, + {Name: "t", Hash: "QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr", Size: "229"}, + }, + recursive: true, + }, + { + name: "progress1M", + data: func() files.File { + r := bytes.NewReader(bytes.Repeat([]byte{0}, 1000000)) + return files.NewReaderFile("", "", ioutil.NopCloser(r), nil) + }, + path: "/ipfs/QmXXNNbwe4zzpdMg62ZXvnX1oU7MwSrQ3vAEtuwFKCm1oD", + events: []coreiface.AddEvent{ + {Name: "", Bytes: 262144}, + {Name: "", Bytes: 524288}, + {Name: "", Bytes: 786432}, + {Name: "", Bytes: 1000000}, + {Name: "QmXXNNbwe4zzpdMg62ZXvnX1oU7MwSrQ3vAEtuwFKCm1oD", Hash: "QmXXNNbwe4zzpdMg62ZXvnX1oU7MwSrQ3vAEtuwFKCm1oD", Size: "1000256"}, + }, + recursive: true, + opts: []options.UnixfsAddOption{options.Unixfs.Progress(true)}, + }, } - if string(buf) != helloStr { - t.Fatalf("expected [%s], got [%s] [err=%s]", helloStr, string(buf), err) + for _, testCase := range cases { + t.Run(testCase.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // recursive logic + + data := testCase.data() + if testCase.recursive { + data = files.NewSliceFile("", "", []files.File{ + data, + }) + } + + // handle events if relevant to test case + + opts := testCase.opts + eventOut := make(chan interface{}) + var evtWg sync.WaitGroup + if len(testCase.events) > 0 { + opts = append(opts, options.Unixfs.Events(eventOut)) + evtWg.Add(1) + + go func() { + defer evtWg.Done() + expected := testCase.events + + for evt := range eventOut { + event, ok := evt.(*coreiface.AddEvent) + if !ok { + t.Fatal("unexpected event type") + } + + if len(expected) < 1 { + t.Fatal("got more events than expected") + } + + if expected[0].Size != event.Size { + t.Errorf("Event.Size didn't match, %s != %s", expected[0].Size, event.Size) + } + + if expected[0].Name != event.Name { + t.Errorf("Event.Name didn't match, %s != %s", expected[0].Name, event.Name) + } + + if expected[0].Hash != event.Hash { + t.Errorf("Event.Hash didn't match, %s != %s", expected[0].Hash, event.Hash) + } + if expected[0].Bytes != event.Bytes { + t.Errorf("Event.Bytes didn't match, %d != %d", expected[0].Bytes, event.Bytes) + } + + expected = expected[1:] + } + + if len(expected) > 0 { + t.Fatalf("%d event(s) didn't arrive", len(expected)) + } + }() + } + + // Add! + + p, err := api.Unixfs().Add(ctx, data, opts...) + close(eventOut) + evtWg.Wait() + if testCase.err != "" { + if err == nil { + t.Fatalf("expected an error: %s", testCase.err) + } + if err.Error() != testCase.err { + t.Fatalf("expected an error: '%s' != '%s'", err.Error(), testCase.err) + } + return + } + if err != nil { + t.Fatal(err) + } + + if p.String() != testCase.path { + t.Errorf("expected path %s, got: %s", testCase.path, p) + } + + // compare file structure with Unixfs().Get + + var cmpFile func(orig files.File, got files.File) + cmpFile = func(orig files.File, got files.File) { + if orig.IsDirectory() != got.IsDirectory() { + t.Fatal("file type mismatch") + } + + if !orig.IsDirectory() { + defer orig.Close() + defer got.Close() + + do, err := ioutil.ReadAll(orig) + if err != nil { + t.Fatal(err) + } + + dg, err := ioutil.ReadAll(got) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(do, dg) { + t.Fatal("data not equal") + } + + return + } + + for { + fo, err := orig.NextFile() + fg, err2 := got.NextFile() + + if err != nil { + if err == io.EOF && err2 == io.EOF { + break + } + t.Fatal(err) + } + if err2 != nil { + t.Fatal(err) + } + + cmpFile(fo, fg) + } + } + + f, err := api.Unixfs().Get(ctx, p) + if err != nil { + t.Fatal(err) + } + + orig := testCase.data() + if testCase.expect != nil { + orig = testCase.expect(orig) + } + + cmpFile(orig, f) + }) } } -func TestAddEmptyFile(t *testing.T) { +func TestAddPinned(t *testing.T) { ctx := context.Background() _, api, err := makeAPI(ctx) if err != nil { t.Error(err) } - str := strings.NewReader("") - p, err := api.Unixfs().Add(ctx, str) + _, err = api.Unixfs().Add(ctx, strFile(helloStr)(), options.Unixfs.Pin(true)) if err != nil { t.Error(err) } - if p.String() != emptyFile { - t.Fatalf("expected path %s, got: %s", hello, p) + pins, err := api.Pin().Ls(ctx) + if len(pins) != 1 { + t.Fatalf("expected 1 pin, got %d", len(pins)) } -} -func TestCatBasic(t *testing.T) { - ctx := context.Background() - node, api, err := makeAPI(ctx) - if err != nil { - t.Fatal(err) + if pins[0].Path().String() != "/ipld/QmQy2Dw4Wk7rdJKjThjYXzfFJNaRKRHhHP5gHHXroJMYxk" { + t.Fatalf("got unexpected pin: %s", pins[0].Path().String()) } +} - hr := strings.NewReader(helloStr) - p, err := coreunix.Add(node, hr) +func TestAddHashOnly(t *testing.T) { + ctx := context.Background() + _, api, err := makeAPI(ctx) if err != nil { - t.Fatal(err) - } - p = "/ipfs/" + p - - if p != hello { - t.Fatalf("expected CID %s, got: %s", hello, p) + t.Error(err) } - helloPath, err := coreiface.ParsePath(hello) + p, err := api.Unixfs().Add(ctx, strFile(helloStr)(), options.Unixfs.HashOnly(true)) if err != nil { - t.Fatal(err) + t.Error(err) } - r, err := api.Unixfs().Cat(ctx, helloPath) - if err != nil { - t.Fatal(err) + if p.String() != hello { + t.Errorf("unxepected path: %s", p.String()) } - buf := make([]byte, len(helloStr)) - _, err = io.ReadFull(r, buf) - if err != nil { - t.Error(err) + _, err = api.Block().Get(ctx, p) + if err == nil { + t.Fatal("expected an error") } - if string(buf) != helloStr { - t.Fatalf("expected [%s], got [%s] [err=%s]", helloStr, string(buf), err) + if err.Error() != "blockservice: key not found" { + t.Errorf("unxepected error: %s", err.Error()) } } diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index 779a64da42f..dfd67690065 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net/http" "net/url" "os" @@ -26,6 +27,7 @@ import ( humanize "gx/ipfs/QmPSBJL4momYnE7DcUyk2DVhD6rH488ZmHBGLbxNdhU44K/go-humanize" cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" routing "gx/ipfs/QmQRfifvvbJ8xTKj4KX1VvGWK26hnPiy8eQvW1hmjc82nD/go-libp2p-routing" + files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files" chunker "gx/ipfs/QmULKgr55cSWR8Kiwy3cVRcAiGVnR6EVSaB7hJcWS4138p/go-ipfs-chunker" ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" multibase "gx/ipfs/QmekxXDhCxCJRNuzmHreuaT3BsuJcsjcXWNrtV9C8DRHtd/go-multibase" @@ -398,7 +400,7 @@ func (i *gatewayHandler) serveFile(w http.ResponseWriter, req *http.Request, nam } func (i *gatewayHandler) postHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { - p, err := i.api.Unixfs().Add(ctx, r.Body) + p, err := i.api.Unixfs().Add(ctx, files.NewReaderFile("", "", ioutil.NopCloser(r.Body), nil)) if err != nil { internalWebError(w, err) return diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 4cf54704264..1e3827bc60a 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -11,19 +11,20 @@ import ( "strconv" core "github.com/ipfs/go-ipfs/core" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" "github.com/ipfs/go-ipfs/pin" - dag "gx/ipfs/QmTGpm48qm4fUZ9E5hMXy4ZngJUYCMKu15rTMVR3BSEnPm/go-merkledag" - unixfs "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs" - balanced "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs/importer/balanced" - ihelper "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs/importer/helpers" - trickle "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs/importer/trickle" posinfo "gx/ipfs/QmPG32VXR5jmpo9q8R9FNdR4Ae97Ky9CiZE6SctJLUB79H/go-ipfs-posinfo" cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" mfs "gx/ipfs/QmQUjAGdPuNA9tpzrx5osWnPMhht7B5YzJNddjB45DUq2U/go-mfs" files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files" + dag "gx/ipfs/QmTGpm48qm4fUZ9E5hMXy4ZngJUYCMKu15rTMVR3BSEnPm/go-merkledag" chunker "gx/ipfs/QmULKgr55cSWR8Kiwy3cVRcAiGVnR6EVSaB7hJcWS4138p/go-ipfs-chunker" logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log" + unixfs "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs" + balanced "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs/importer/balanced" + ihelper "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs/importer/helpers" + trickle "gx/ipfs/QmavvHwEZTkNShKWK1jRejv2Y8oF6ZYxdGxytL3Mwvices/go-unixfs/importer/trickle" ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" bstore "gx/ipfs/QmfUhZX9KpvJiuiziUzP2cjhRAyqHJURsPgRKn1cdDZMKa/go-ipfs-blockstore" ) @@ -46,13 +47,6 @@ type Object struct { Size string } -type AddedObject struct { - Name string - Hash string `json:",omitempty"` - Bytes int64 `json:",omitempty"` - Size string `json:",omitempty"` -} - // NewAdder Returns a new Adder used for a file add operation. func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds ipld.DAGService) (*Adder, error) { return &Adder{ @@ -75,7 +69,7 @@ type Adder struct { pinning pin.Pinner blockstore bstore.GCBlockstore dagService ipld.DAGService - Out chan interface{} + Out chan<- interface{} Progress bool Hidden bool Pin bool @@ -399,7 +393,7 @@ func (adder *Adder) addNode(node ipld.Node, path string) error { } // AddAllAndPin adds the given request's files and pin them. -func (adder *Adder) AddAllAndPin(file files.File) error { +func (adder *Adder) AddAllAndPin(file files.File) (ipld.Node, error) { if adder.Pin { adder.unlocker = adder.blockstore.PinLock() } @@ -415,35 +409,35 @@ func (adder *Adder) AddAllAndPin(file files.File) error { // single files.File f is treated as a directory, affecting hidden file // semantics. for { - file, err := file.NextFile() + f, err := file.NextFile() if err == io.EOF { // Finished the list of files. break } else if err != nil { - return err + return nil, err } - if err := adder.addFile(file); err != nil { - return err + if err := adder.addFile(f); err != nil { + return nil, err } } break default: if err := adder.addFile(file); err != nil { - return err + return nil, err } break } // copy intermediary nodes from editor to our actual dagservice - _, err := adder.Finalize() + nd, err := adder.Finalize() if err != nil { - return err + return nil, err } if !adder.Pin { - return nil + return nd, nil } - return adder.PinRoot() + return nd, adder.PinRoot() } func (adder *Adder) addFile(file files.File) error { @@ -570,7 +564,7 @@ func (adder *Adder) maybePauseForGC() error { } // outputDagnode sends dagnode info over the output channel -func outputDagnode(out chan interface{}, name string, dn ipld.Node) error { +func outputDagnode(out chan<- interface{}, name string, dn ipld.Node) error { if out == nil { return nil } @@ -580,7 +574,7 @@ func outputDagnode(out chan interface{}, name string, dn ipld.Node) error { return err } - out <- &AddedObject{ + out <- &coreiface.AddEvent{ Hash: o.Hash, Name: name, Size: o.Size, @@ -615,7 +609,7 @@ func getOutput(dagnode ipld.Node) (*Object, error) { type progressReader struct { file files.File - out chan interface{} + out chan<- interface{} bytes int64 lastProgress int64 } @@ -626,7 +620,7 @@ func (i *progressReader) Read(p []byte) (int, error) { i.bytes += int64(n) if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF { i.lastProgress = i.bytes - i.out <- &AddedObject{ + i.out <- &coreiface.AddEvent{ Name: i.file.FileName(), Bytes: i.bytes, } diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index bfcf4f5ca2b..de03a601237 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/ipfs/go-ipfs/core" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" "github.com/ipfs/go-ipfs/pin/gc" "github.com/ipfs/go-ipfs/repo" @@ -85,7 +86,7 @@ func TestAddGCLive(t *testing.T) { go func() { defer close(addDone) defer close(out) - err := adder.AddAllAndPin(slf) + _, err := adder.AddAllAndPin(slf) if err != nil { t.Fatal(err) @@ -96,7 +97,7 @@ func TestAddGCLive(t *testing.T) { addedHashes := make(map[string]struct{}) select { case o := <-out: - addedHashes[o.(*AddedObject).Hash] = struct{}{} + addedHashes[o.(*coreiface.AddEvent).Hash] = struct{}{} case <-addDone: t.Fatal("add shouldnt complete yet") } @@ -124,7 +125,7 @@ func TestAddGCLive(t *testing.T) { // receive next object from adder o := <-out - addedHashes[o.(*AddedObject).Hash] = struct{}{} + addedHashes[o.(*coreiface.AddEvent).Hash] = struct{}{} <-gcstarted @@ -140,7 +141,7 @@ func TestAddGCLive(t *testing.T) { var last cid.Cid for a := range out { // wait for it to finish - c, err := cid.Decode(a.(*AddedObject).Hash) + c, err := cid.Decode(a.(*coreiface.AddEvent).Hash) if err != nil { t.Fatal(err) } @@ -178,7 +179,8 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) { if err != nil { t.Fatal(err) } - adder.Out = make(chan interface{}) + out := make(chan interface{}) + adder.Out = out adder.Progress = true adder.RawLeaves = rawLeaves adder.NoCopy = true @@ -191,12 +193,12 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) { go func() { defer close(adder.Out) - err = adder.AddAllAndPin(file) + _, err = adder.AddAllAndPin(file) if err != nil { t.Fatal(err) } }() - for range adder.Out { + for range out { } exp := 0