Skip to content

Commit

Permalink
Dag import/export final-touches-WiP
Browse files Browse the repository at this point in the history
  • Loading branch information
ribasushi committed Mar 19, 2020
1 parent c4b515e commit c6f9e3b
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 118 deletions.
285 changes: 175 additions & 110 deletions core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,29 @@ import (
"fmt"
"io"
"math"
"os"
"strings"

"github.com/davecgh/go-spew/spew"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"
"github.com/ipfs/go-ipfs/core/coredag"

gocar "github.com/ipfs/go-car"
blk "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
cidenc "github.com/ipfs/go-cidutil/cidenc"
cmds "github.com/ipfs/go-ipfs-cmds"
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
mdag "github.com/ipfs/go-merkledag"
ipfspath "github.com/ipfs/go-path"
"github.com/ipfs/interface-go-ipfs-core/options"
path "github.com/ipfs/interface-go-ipfs-core/path"
mh "github.com/multiformats/go-multihash"

gocar "github.com/ipld/go-car"
gipfree "github.com/ipld/go-ipld-prime/impl/free"
gipselector "github.com/ipld/go-ipld-prime/traversal/selector"
gipselectorbuilder "github.com/ipld/go-ipld-prime/traversal/selector/builder"
)

const (
Expand All @@ -44,6 +52,7 @@ to deprecate and replace the existing 'ipfs object' command moving forward.
"get": DagGetCmd,
"resolve": DagResolveCmd,
"import": DagImportCmd,
"export": DagExportCmd,
},
}

Expand Down Expand Up @@ -265,97 +274,63 @@ var DagImportCmd = &cmds.Command{
cmds.FileArg("path", true, true, "The path of a .car file.").EnableStdin(),
},
Options: []cmds.Option{
// FIXME - need to copy the progress bar from core/commands/add.go
// Base it on a proxy-reader
//cmds.BoolOption(progressOptionName, "p", "Stream progress data.").WithDefault(true),
//cmds.BoolOption(quietOptionName, "q", "Write minimal output."),
cmds.BoolOption(silentOptionName, "Write no output."),
cmds.BoolOption(pinRootsOptionName, "Atomically pin (optional) roots listed in the .car headers after importing.").WithDefault(true),
},
// FIXME - progress bar option disabled above
// PreRun: func(req *cmds.Request, env cmds.Environment) error {
// quiet, _ := req.Options[quietOptionName].(bool)
// silent, _ := req.Options[silentOptionName].(bool)

// if quiet || silent {
// return nil
// }

// // ipfs cli progress bar defaults to true unless quiet or silent is used
// _, found := req.Options[progressOptionName].(bool)
// if !found {
// req.Options[progressOptionName] = true
// }

// return nil
// },
Type: OutputObject{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {

node, err := cmdenv.GetNode(env)
if err != nil {
return err
}

api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}

// on import ensure we do not reach out to the network for any reason
api, _ = api.WithOptions(options.Api.Offline(true))

doPinRoots, _ := req.Options[pinRootsOptionName].(bool)
silent, _ := req.Options[silentOptionName].(bool)

keepTrackOfRoots := doPinRoots || !silent

// It is not guaranteed that a root in a header is actually present in the same ( or any .car )
// It is not guaranteed that a root in a header is actually present in the same ( or any )
// .car file. This is the case in version 1, and ideally in other versions too
// Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
// We will pin *only* at the end in case the entire operation is successful
// Q: - way to do above better?
// Q: - can I *not* keep a full copy of the node and still pin it anyway?
var expectRoots map[cid.Cid]*ipld.Node

if keepTrackOfRoots {
expectRoots = make(map[cid.Cid]*ipld.Node, 32)
}

// If more than one file - parse all headers first
// ( due to misdesign we do "reparse" each car's header twice, but that's cheap )
// Q: there seems to be no management of file closures... ANYWHERE
argCount, _ := req.Files.Size()
if keepTrackOfRoots && argCount > 1 {

it := req.Files.Entries()
for it.Next() {

file := files.FileFromEntry(it)
if file == nil {
return errors.New("expected a file")
}

buf := make([]byte, 1)
file.Read(buf)
file.Seek(0, 0)
spew.Dump(buf)

continue

car, err := gocar.NewCarReader(file)
if err != nil {
return err
}

for _, cid := range car.Header.Roots {
if _, exists := expectRoots[cid]; !exists {
expectRoots[cid] = nil
}
}
}
}

// Q: How do I combine this with pinnig?
adder := api.Dag()
b := ipld.NewBatch(req.Context, adder)
// We will attempt a pin *only* at the end in case all car files were well formed
//
// The boolean value indicates whether we have encountered the root within the car file's
expectedRootsSeen := make(map[cid.Cid]bool)

// grab a gc lock so that regardless of the size of the streamed in cars
// nothing will disappear on us before we had a chance to pin the roots
// at the very end
// This is especially important for use cases like dagger:
// ipfs dag import $( ... | ipfs-dagger --stdout=carfifos )

// FIXME - none of these locker options work!
//
// This version deadlocks within this function, once you get to
// api.Pin().Add
//unlocker := node.Blockstore.GCLock()
//
// This version works in isolation but deadlocks with a concurrent GC run
// if I insert a time.Sleep(15 * time.Second) before the pin
unlocker := node.Blockstore.PinLock()
defer func() { unlocker.Unlock() }()

// this is *not* a transaction
// it is simply a way to relieve pressure on the blockstore
batch := ipld.NewBatch(req.Context, api.Dag())

it := req.Files.Entries()
for it.Next() {

file := files.FileFromEntry(it)
if file == nil {
return errors.New("expected a file")
return errors.New("expected a file handle")
}
defer func() { file.Close() }()

Expand All @@ -364,16 +339,14 @@ var DagImportCmd = &cmds.Command{
return err
}

// Be explicit here, until the spec is finished
if car.Header.Version != 1 {
return errors.New("only car files version 1 supported at present")
}

// duplicate of a block from above: in case we were streaming and never pre-read the file
if keepTrackOfRoots {
for _, cid := range car.Header.Roots {
if _, exists := expectRoots[cid]; !exists {
expectRoots[cid] = nil
}
for _, c := range car.Header.Roots {
if _, exists := expectedRootsSeen[c]; !exists {
expectedRootsSeen[c] = false
}
}

Expand All @@ -385,28 +358,22 @@ var DagImportCmd = &cmds.Command{
break
}

// Q: this dual-pass is so odd, but nothing woks with blocks.Block...
nd, err := ipld.Decode(block)
if err != nil {
return err
}

if err := b.Add(req.Context, nd); err != nil {
if err := batch.Add(req.Context, nd); err != nil {
return err
}

if keepTrackOfRoots {
val, exists := expectRoots[nd.Cid()]

// encountered something known to be a root, for the first time
if exists && val == nil {
expectRoots[nd.Cid()] = &nd
// encountered something known to be a root, for the first time
if seen, exists := expectedRootsSeen[nd.Cid()]; exists && !seen {
expectedRootsSeen[nd.Cid()] = true

if !silent {
// Q: everything makes thing async - should I?
if err := res.Emit(&OutputObject{Cid: nd.Cid()}); err != nil {
return err
}
if !silent {
if err := res.Emit(&OutputObject{Cid: nd.Cid()}); err != nil {
return err
}
}
}
Expand All @@ -417,26 +384,29 @@ var DagImportCmd = &cmds.Command{
return err
}

if doPinRoots && len(expectRoots) > 0 {
// run through everything making sure we found all roots
toPin := make([]ipld.Node, 0, len(expectRoots))
for cid, nd := range expectRoots {
if nd == nil {
return fmt.Errorf("the CID '%s' named as root, but never encountered in any .car", cid)
}
toPin = append(toPin, *nd)
}

// all found - pin them down
// ( this does in effect call b.Commit()... it seems )
if err := api.Dag().Pinning().AddMany(req.Context, toPin); err != nil {
return err
}
if err := batch.Commit(); err != nil {
return err
}

// well... here goes nothing
if err := b.Commit(); err != nil {
return err
if doPinRoots {
for c, seen := range expectedRootsSeen {
if !seen {
os.Stderr.WriteString("replace with seenfail emit\n")
}
// not ideal, but the pinning api takes only paths :(
rp := path.NewResolvedPath(
ipfspath.FromCid(c),
c,
c,
"",
)

if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil {
os.Stderr.WriteString("replace with pin fail error\n")
} else {
os.Stderr.WriteString("replace with pin success emit\n")
}
}
}

return nil
Expand All @@ -452,3 +422,98 @@ var DagImportCmd = &cmds.Command{
}),
},
}

var DagExportCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Streams the selected DAG as a .car stream on stdout.",
ShortDescription: `
'ipfs dag export' fetches a dag and streams it out as a well-formed .car file.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("selector", true, false, "Selector representing the dag to export").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {

c, err := cid.Decode(req.Arguments[0])
if err != nil {
return fmt.Errorf(
"unable to parse selector (currently only bare CIDs are supported): %s",
err,
)
}

// The current interface of go-car is rather suboptimal as it
// only takes a blockstore, instead of accepting a dagservice,
// and leveraging parallel-fetch capabilities
//
// Until the above is fixed, pre-warm the blockstore before doing
// anything else. We explicitly *DO NOT* take a lock during this
// operation: even if we lose some of the blocks we just received
// due to a conflicting GC: we will just re-retrieve everything when
// the car is being streamed out
node, err := cmdenv.GetNode(env)
if err != nil {
return err
}

if err := mdag.FetchGraph(req.Context, c, node.DAG); err != nil {
if !node.IsOnline {
err = fmt.Errorf("%s (currently offline, perhaps retry after attaching to the network)", err)
}
return err
}

sess := blockservice.NewSession(
req.Context,
node.Blocks,
)

// The second part of the above - make a super-thin wrapper around
// a blockservice session, translating Session.GetBlock() to Blockstore.Get()
var wrapper getBlockFromSessionWrapper = func(c cid.Cid) (blk.Block, error) {
return sess.GetBlock(req.Context, c)
}
sb := gipselectorbuilder.NewSelectorSpecBuilder(gipfree.NodeBuilder())
car := gocar.NewSelectiveCar(
req.Context,
&wrapper,
[]gocar.Dag{gocar.Dag{
Root: c,
Selector: sb.ExploreRecursive(
gipselector.RecursionLimitNone(),
sb.ExploreAll(sb.ExploreRecursiveEdge()),
).Node(),
}},
)

pipeR, pipeW := io.Pipe()

errCh := make(chan error, 2)
go func() {
defer func() {
if err := pipeW.Close(); err != nil {
errCh <- fmt.Errorf("stream flush failed: %s", err)
}
close(errCh)
}()

if err := car.Write(pipeW); err != nil {
errCh <- err
}
}()

if err := res.Emit(pipeR); err != nil {
pipeW.Close() // ignore errors if any
return err
}

return <-errCh
},
}

type getBlockFromSessionWrapper func(cid.Cid) (blk.Block, error)

func (w *getBlockFromSessionWrapper) Get(c cid.Cid) (blk.Block, error) {
return (*w)(c)
}
Loading

0 comments on commit c6f9e3b

Please sign in to comment.