From ac092e5f616f351feca83286536abc961fbce8c0 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 22 Jun 2017 18:46:56 -0700 Subject: [PATCH 01/21] nit: document Node --- format.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/format.go b/format.go index 1e2f1f0..353d4ca 100644 --- a/format.go +++ b/format.go @@ -19,6 +19,10 @@ type Resolver interface { Tree(path string, depth int) []string } +// Node is the base interface all IPLD nodes must implement. +// +// Nodes are **Immutable** and all methods defined on the interface are +// **Thread Safe**. type Node interface { blocks.Block Resolver From b3a1f4b945fbb57c0d4439efc9675c9375069105 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 22 Jun 2017 20:21:28 -0700 Subject: [PATCH 02/21] [WIP] [RFC] extract dagservice and friends from go-ipfs This is a WIP/RFC attempt at extracting DAGService from go-ipfs. --- daghelpers.go | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++ format.go | 4 --- merkledag.go | 88 ++++++++++++++++++++++++++++++++++++++++++++++++ promise.go | 88 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 268 insertions(+), 4 deletions(-) create mode 100644 daghelpers.go create mode 100644 merkledag.go create mode 100644 promise.go diff --git a/daghelpers.go b/daghelpers.go new file mode 100644 index 0000000..2328b9e --- /dev/null +++ b/daghelpers.go @@ -0,0 +1,92 @@ +package format + +import ( + "context" + + cid "github.com/ipfs/go-cid" +) + +// FindLinks searches this nodes links for the given key, +// returns the indexes of any links pointing to it +func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int { + var out []int + for i, lnk_c := range links[start:] { + if c.Equals(lnk_c) { + out = append(out, i+start) + } + } + return out +} + +// GetDAG will fill out all of the links of the given Node. +// It returns a channel of nodes, which the caller can receive +// all the child nodes of 'root' on, in proper order. +func GetDAG(ctx context.Context, ds DAGService, root Node) []NodePromise { + var cids []*cid.Cid + for _, lnk := range root.Links() { + cids = append(cids, lnk.Cid) + } + + return GetNodes(ctx, ds, cids) +} + +// GetNodes returns an array of 'FutureNode' promises, with each corresponding +// to the key with the same index as the passed in keys +func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodePromise { + + // Early out if no work to do + if len(keys) == 0 { + return nil + } + + promises := make([]NodePromise, len(keys)) + for i := range keys { + promises[i] = newNodePromise(ctx) + } + + dedupedKeys := dedupeKeys(keys) + go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + nodechan := ds.GetMany(ctx, dedupedKeys) + + for count := 0; count < len(keys); { + select { + case opt, ok := <-nodechan: + if !ok { + for _, p := range promises { + p.Fail(ErrNotFound) + } + return + } + + if opt.Err != nil { + for _, p := range promises { + p.Fail(opt.Err) + } + return + } + + nd := opt.Node + is := FindLinks(keys, nd.Cid(), 0) + for _, i := range is { + count++ + promises[i].Send(nd) + } + case <-ctx.Done(): + return + } + } + }() + return promises +} + +// Remove duplicates from a list of keys +func dedupeKeys(cids []*cid.Cid) []*cid.Cid { + set := cid.NewSet() + for _, c := range cids { + set.Add(c) + } + return set.Keys() +} diff --git a/format.go b/format.go index 353d4ca..699c8c1 100644 --- a/format.go +++ b/format.go @@ -44,10 +44,6 @@ type Node interface { Size() (uint64, error) } -type NodeGetter interface { - Get(context.Context, *cid.Cid) (Node, error) -} - // Link represents an IPFS Merkle DAG Link between Nodes. type Link struct { // utf string name. should be unique per object diff --git a/merkledag.go b/merkledag.go new file mode 100644 index 0000000..b99c8ec --- /dev/null +++ b/merkledag.go @@ -0,0 +1,88 @@ +package format + +import ( + "context" + "fmt" + + cid "github.com/ipfs/go-cid" +) + +var ErrNotFound = fmt.Errorf("merkledag: not found") + +// Either a node or an error. +type NodeOption struct { + Node Node + Err error +} + +// TODO: This name kind of sucks. +// NodeResolver? +// NodeService? +// Just Resolver? +type NodeGetter interface { + Get(context.Context, *cid.Cid) (Node, error) +} + +// DAGService is an IPFS Merkle DAG service. +type DAGService interface { + NodeGetter + + Add(Node) (*cid.Cid, error) + Remove(Node) error + + // TODO: This is returning them in-order?? Why not just use []NodePromise? + // Maybe add a couple of helpers for getting them in-order and as-available? + // GetDAG returns, in order, all the single leve child + // nodes of the passed in node. + GetMany(context.Context, []*cid.Cid) <-chan *NodeOption + + Batch() Batch + + LinkService +} + +// An interface for batch-adding nodes to a DAG. + +// TODO: Is this really the *right* level to do this at? +// Why not just `DAGService.AddMany` + a concrete helper type? +// +// This will be a breaking change *regardless* of what we do as `Batch` *used* +// to be a plain struct (passed around by pointer). I had to change this to +// avoid requiring a `BlockService` (which would introduce the concept of +// exchanges and I really don't want to go down that rabbit hole). +type Batch interface { + Add(nd Node) (*cid.Cid, error) + Commit() error +} + +// TODO: Replace this? I'm really not convinced this interface pulls its weight. +// +// Instead, we could add an `Offline()` function to `NodeGetter` that returns an +// offline `NodeGetter` and then define the following function: +// +// ``` +// func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) { +// if c.Type() == cid.Raw { +// return nil, nil +// } +// node, err := ng.Get(ctx, c) +// if err != nil { +// return nil, err +// } +// return node.Links(), nil +// } +// ``` +// +// Why *not* do this? We might decide to store a light-weight DAG of links +// without actually storing the data. I don't really find that to be a +// convincing argument. +type LinkService interface { + // GetLinks return all links for a node. The complete node does not + // necessarily have to exist locally, or at all. For example, raw + // leaves cannot possibly have links so there is no need to look + // at the node. + // TODO: These *really* should be Cids, not Links + GetLinks(context.Context, *cid.Cid) ([]*Link, error) + + GetOfflineLinkService() LinkService +} diff --git a/promise.go b/promise.go new file mode 100644 index 0000000..b269d16 --- /dev/null +++ b/promise.go @@ -0,0 +1,88 @@ +package format + +import ( + "context" + "sync" +) + +// TODO: I renamed this to NodePromise because: +// 1. NodeGetter is a naming conflict. +// 2. It's a promise... + +// TODO: Should this even be an interface? It seems like a simple struct would +// suffice. + +// NodePromise provides a promise like interface for a dag Node +// the first call to Get will block until the Node is received +// from its internal channels, subsequent calls will return the +// cached node. +type NodePromise interface { + Get(context.Context) (Node, error) + Fail(err error) + Send(Node) +} + +func newNodePromise(ctx context.Context) NodePromise { + return &nodePromise{ + recv: make(chan Node, 1), + ctx: ctx, + err: make(chan error, 1), + } +} + +type nodePromise struct { + cache Node + clk sync.Mutex + recv chan Node + ctx context.Context + err chan error +} + +func (np *nodePromise) Fail(err error) { + np.clk.Lock() + v := np.cache + np.clk.Unlock() + + // if promise has a value, don't fail it + if v != nil { + return + } + + np.err <- err +} + +func (np *nodePromise) Send(nd Node) { + var already bool + np.clk.Lock() + if np.cache != nil { + already = true + } + np.cache = nd + np.clk.Unlock() + + if already { + panic("sending twice to the same promise is an error!") + } + + np.recv <- nd +} + +func (np *nodePromise) Get(ctx context.Context) (Node, error) { + np.clk.Lock() + c := np.cache + np.clk.Unlock() + if c != nil { + return c, nil + } + + select { + case nd := <-np.recv: + return nd, nil + case <-np.ctx.Done(): + return nil, np.ctx.Err() + case <-ctx.Done(): + return nil, ctx.Err() + case err := <-np.err: + return nil, err + } +} From 0f9e9ed70763ea142fc1896f14ebb12bf7f07314 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 26 Jun 2017 16:59:44 -0700 Subject: [PATCH 03/21] Make Batch a helper type/function --- batch.go | 43 +++++++++++++++++++++++++++++++++++++++++++ merkledag.go | 16 +--------------- 2 files changed, 44 insertions(+), 15 deletions(-) create mode 100644 batch.go diff --git a/batch.go b/batch.go new file mode 100644 index 0000000..4dc62b8 --- /dev/null +++ b/batch.go @@ -0,0 +1,43 @@ +package format + +import ( + cid "github.com/ipfs/go-cid" +) + +func Batching(ds DAGService) *Batch { + return &Batch{ + ds: ds, + MaxSize: 8 << 20, + + // By default, only batch up to 128 nodes at a time. + // The current implementation of flatfs opens this many file + // descriptors at the same time for the optimized batch write. + MaxBlocks: 128, + } +} + +type Batch struct { + ds DAGService + + // TODO: try to re-use memory. + nodes []Node + size int + MaxSize int + MaxBlocks int +} + +func (t *Batch) Add(nd Node) (*cid.Cid, error) { + t.nodes = append(t.nodes, nd) + t.size += len(nd.RawData()) + if t.size > t.MaxSize || len(t.nodes) > t.MaxBlocks { + return nd.Cid(), t.Commit() + } + return nd.Cid(), nil +} + +func (t *Batch) Commit() error { + _, err := t.ds.AddMany(t.nodes) + t.nodes = nil + t.size = 0 + return err +} diff --git a/merkledag.go b/merkledag.go index b99c8ec..b093a9f 100644 --- a/merkledag.go +++ b/merkledag.go @@ -36,25 +36,11 @@ type DAGService interface { // nodes of the passed in node. GetMany(context.Context, []*cid.Cid) <-chan *NodeOption - Batch() Batch + AddMany([]Node) ([]*cid.Cid, error) LinkService } -// An interface for batch-adding nodes to a DAG. - -// TODO: Is this really the *right* level to do this at? -// Why not just `DAGService.AddMany` + a concrete helper type? -// -// This will be a breaking change *regardless* of what we do as `Batch` *used* -// to be a plain struct (passed around by pointer). I had to change this to -// avoid requiring a `BlockService` (which would introduce the concept of -// exchanges and I really don't want to go down that rabbit hole). -type Batch interface { - Add(nd Node) (*cid.Cid, error) - Commit() error -} - // TODO: Replace this? I'm really not convinced this interface pulls its weight. // // Instead, we could add an `Offline()` function to `NodeGetter` that returns an From 2696405338a7b25b00e58eadc6d6f04e243d3083 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 26 Jun 2017 17:21:09 -0700 Subject: [PATCH 04/21] replace LinkService with an optional LinkGetter interface This way, not *all* DAGServices need to implement this interface, they can just implement it as an optimization. --- merkledag.go | 59 ++++++++++++++++++++++------------------------------ 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/merkledag.go b/merkledag.go index b093a9f..2ca9c47 100644 --- a/merkledag.go +++ b/merkledag.go @@ -21,6 +21,31 @@ type NodeOption struct { // Just Resolver? type NodeGetter interface { Get(context.Context, *cid.Cid) (Node, error) + + // TODO(ipfs/go-ipfs#4009): Remove this method after fixing. + OfflineNodeGetter() NodeGetter +} + +// NodeGetters can optionally implement this interface to make finding linked +// objects faster. +type LinkGetter interface { + NodeGetter + // TODO(ipfs/go-ipld-format#9): This should return []*cid.Cid + GetLinks(ctx context.Context, nd *cid.Cid) ([]*Link, error) +} + +func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) { + if c.Type() == cid.Raw { + return nil, nil + } + if gl, ok := ng.(LinkGetter); ok { + return gl.GetLinks(ctx, c) + } + node, err := ng.Get(ctx, c) + if err != nil { + return nil, err + } + return node.Links(), nil } // DAGService is an IPFS Merkle DAG service. @@ -37,38 +62,4 @@ type DAGService interface { GetMany(context.Context, []*cid.Cid) <-chan *NodeOption AddMany([]Node) ([]*cid.Cid, error) - - LinkService -} - -// TODO: Replace this? I'm really not convinced this interface pulls its weight. -// -// Instead, we could add an `Offline()` function to `NodeGetter` that returns an -// offline `NodeGetter` and then define the following function: -// -// ``` -// func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) { -// if c.Type() == cid.Raw { -// return nil, nil -// } -// node, err := ng.Get(ctx, c) -// if err != nil { -// return nil, err -// } -// return node.Links(), nil -// } -// ``` -// -// Why *not* do this? We might decide to store a light-weight DAG of links -// without actually storing the data. I don't really find that to be a -// convincing argument. -type LinkService interface { - // GetLinks return all links for a node. The complete node does not - // necessarily have to exist locally, or at all. For example, raw - // leaves cannot possibly have links so there is no need to look - // at the node. - // TODO: These *really* should be Cids, not Links - GetLinks(context.Context, *cid.Cid) ([]*Link, error) - - GetOfflineLinkService() LinkService } From 5837bec53bfa1e6693c1dfe95bec6b0a181e0766 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 26 Jun 2017 17:22:46 -0700 Subject: [PATCH 05/21] add/update comments about issues with DAGService interface --- merkledag.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/merkledag.go b/merkledag.go index 2ca9c47..ff595c0 100644 --- a/merkledag.go +++ b/merkledag.go @@ -53,12 +53,14 @@ type DAGService interface { NodeGetter Add(Node) (*cid.Cid, error) + + // TODO(ipfs/go-ipfs#4010): Change this to take a CID. + // This will require a fair amount of refactoring. Remove(Node) error - // TODO: This is returning them in-order?? Why not just use []NodePromise? - // Maybe add a couple of helpers for getting them in-order and as-available? - // GetDAG returns, in order, all the single leve child - // nodes of the passed in node. + // TODO: Consider using []NodePromise and providing helper functions + // that take []NodePromise and return channels that yield nodes both + // in-order and as-ready. GetMany(context.Context, []*cid.Cid) <-chan *NodeOption AddMany([]Node) ([]*cid.Cid, error) From c0311d7e72eff406aeca77d38d3df2567af3bf1c Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 26 Jun 2017 18:13:24 -0700 Subject: [PATCH 06/21] NodePromise: replace interface with concrete type Also: 1. Specify the threading guarantees. 2. Vastly simplify it to use a single channel for synchronization. --- daghelpers.go | 8 ++--- promise.go | 94 ++++++++++++++++++++------------------------------- 2 files changed, 40 insertions(+), 62 deletions(-) diff --git a/daghelpers.go b/daghelpers.go index 2328b9e..96a3cdd 100644 --- a/daghelpers.go +++ b/daghelpers.go @@ -21,7 +21,7 @@ func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int { // GetDAG will fill out all of the links of the given Node. // It returns a channel of nodes, which the caller can receive // all the child nodes of 'root' on, in proper order. -func GetDAG(ctx context.Context, ds DAGService, root Node) []NodePromise { +func GetDAG(ctx context.Context, ds DAGService, root Node) []*NodePromise { var cids []*cid.Cid for _, lnk := range root.Links() { cids = append(cids, lnk.Cid) @@ -32,16 +32,16 @@ func GetDAG(ctx context.Context, ds DAGService, root Node) []NodePromise { // GetNodes returns an array of 'FutureNode' promises, with each corresponding // to the key with the same index as the passed in keys -func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []NodePromise { +func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []*NodePromise { // Early out if no work to do if len(keys) == 0 { return nil } - promises := make([]NodePromise, len(keys)) + promises := make([]*NodePromise, len(keys)) for i := range keys { - promises[i] = newNodePromise(ctx) + promises[i] = NewNodePromise(ctx) } dedupedKeys := dedupeKeys(keys) diff --git a/promise.go b/promise.go index b269d16..02743b0 100644 --- a/promise.go +++ b/promise.go @@ -2,87 +2,65 @@ package format import ( "context" - "sync" ) -// TODO: I renamed this to NodePromise because: -// 1. NodeGetter is a naming conflict. -// 2. It's a promise... - -// TODO: Should this even be an interface? It seems like a simple struct would -// suffice. - // NodePromise provides a promise like interface for a dag Node // the first call to Get will block until the Node is received // from its internal channels, subsequent calls will return the // cached node. -type NodePromise interface { - Get(context.Context) (Node, error) - Fail(err error) - Send(Node) -} - -func newNodePromise(ctx context.Context) NodePromise { - return &nodePromise{ - recv: make(chan Node, 1), +// +// Thread Safety: This is multiple-consumer/single-producer safe. +func NewNodePromise(ctx context.Context) *NodePromise { + return &NodePromise{ + done: make(chan struct{}), ctx: ctx, - err: make(chan error, 1), } } -type nodePromise struct { - cache Node - clk sync.Mutex - recv chan Node - ctx context.Context - err chan error -} +type NodePromise struct { + value Node + err error + done chan struct{} -func (np *nodePromise) Fail(err error) { - np.clk.Lock() - v := np.cache - np.clk.Unlock() + ctx context.Context +} - // if promise has a value, don't fail it - if v != nil { +// Call this function to fail a promise. +// +// Once a promise has been failed or fulfilled, further attempts to fail it will +// be silently dropped. +func (np *NodePromise) Fail(err error) { + if np.err != nil || np.value != nil { + // Already filled. return } - - np.err <- err + np.err = err + close(np.done) } -func (np *nodePromise) Send(nd Node) { - var already bool - np.clk.Lock() - if np.cache != nil { - already = true - } - np.cache = nd - np.clk.Unlock() - - if already { - panic("sending twice to the same promise is an error!") +// Fulfill this promise. +// +// Once a promise has been fulfilled or failed, calling this function will +// panic. +func (np *NodePromise) Send(nd Node) { + // if promise has a value, don't fail it + if np.err != nil || np.value != nil { + panic("already filled") } - - np.recv <- nd + np.value = nd + close(np.done) } -func (np *nodePromise) Get(ctx context.Context) (Node, error) { - np.clk.Lock() - c := np.cache - np.clk.Unlock() - if c != nil { - return c, nil - } - +// Get the value of this promise. +// +// This function is safe to call concurrently from any number of goroutines. +func (np *NodePromise) Get(ctx context.Context) (Node, error) { select { - case nd := <-np.recv: - return nd, nil + case <-np.done: + return np.value, np.err case <-np.ctx.Done(): return nil, np.ctx.Err() case <-ctx.Done(): return nil, ctx.Err() - case err := <-np.err: - return nil, err } } From 339e9ea9125a9af871e74ff27f42bf022d4a837a Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 26 Jun 2017 18:24:06 -0700 Subject: [PATCH 07/21] remove useless helper function --- daghelpers.go | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/daghelpers.go b/daghelpers.go index 96a3cdd..d57b5d6 100644 --- a/daghelpers.go +++ b/daghelpers.go @@ -6,18 +6,6 @@ import ( cid "github.com/ipfs/go-cid" ) -// FindLinks searches this nodes links for the given key, -// returns the indexes of any links pointing to it -func FindLinks(links []*cid.Cid, c *cid.Cid, start int) []int { - var out []int - for i, lnk_c := range links[start:] { - if c.Equals(lnk_c) { - out = append(out, i+start) - } - } - return out -} - // GetDAG will fill out all of the links of the given Node. // It returns a channel of nodes, which the caller can receive // all the child nodes of 'root' on, in proper order. @@ -69,10 +57,12 @@ func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []*NodePromis } nd := opt.Node - is := FindLinks(keys, nd.Cid(), 0) - for _, i := range is { - count++ - promises[i].Send(nd) + c := nd.Cid() + for i, lnk_c := range keys { + if c.Equals(lnk_c) { + count++ + promises[i].Send(nd) + } } case <-ctx.Done(): return From 5bc8a07cd7e6fef02a5c6ac922eec1c02a9a6133 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 26 Jun 2017 18:41:07 -0700 Subject: [PATCH 08/21] remove TODO for NodeGetter We'll just live with this name. --- merkledag.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/merkledag.go b/merkledag.go index ff595c0..cfda2a0 100644 --- a/merkledag.go +++ b/merkledag.go @@ -15,10 +15,7 @@ type NodeOption struct { Err error } -// TODO: This name kind of sucks. -// NodeResolver? -// NodeService? -// Just Resolver? +// The basic Node resolution service. type NodeGetter interface { Get(context.Context, *cid.Cid) (Node, error) From 30aecc4e70886eecab32d2deec05dcd0008ae186 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 17 Aug 2017 13:52:24 -0700 Subject: [PATCH 09/21] document new methods (and rename Batching to NewBatch for consistency). --- batch.go | 10 +++++++++- daghelpers.go | 4 ++-- merkledag.go | 25 ++++++++++++++++++++++--- 3 files changed, 33 insertions(+), 6 deletions(-) diff --git a/batch.go b/batch.go index 4dc62b8..eadda72 100644 --- a/batch.go +++ b/batch.go @@ -4,7 +4,10 @@ import ( cid "github.com/ipfs/go-cid" ) -func Batching(ds DAGService) *Batch { +// NewBatch returns a node buffer (Batch) that buffers nodes internally and +// commits them to the underlying DAGService in batches. Use this if you intend +// to add a lot of nodes all at once. +func NewBatch(ds DAGService) *Batch { return &Batch{ ds: ds, MaxSize: 8 << 20, @@ -26,6 +29,8 @@ type Batch struct { MaxBlocks int } +// Add a node to this batch of nodes, potentially committing the set of batched +// nodes to the underlying DAGService. func (t *Batch) Add(nd Node) (*cid.Cid, error) { t.nodes = append(t.nodes, nd) t.size += len(nd.RawData()) @@ -35,6 +40,9 @@ func (t *Batch) Add(nd Node) (*cid.Cid, error) { return nd.Cid(), nil } +// Commit commits the buffered of nodes to the underlying DAGService. +// Make sure to call this after you're done adding nodes to the batch to ensure +// that they're actually added to the DAGService. func (t *Batch) Commit() error { _, err := t.ds.AddMany(t.nodes) t.nodes = nil diff --git a/daghelpers.go b/daghelpers.go index d57b5d6..f4a53d6 100644 --- a/daghelpers.go +++ b/daghelpers.go @@ -7,8 +7,8 @@ import ( ) // GetDAG will fill out all of the links of the given Node. -// It returns a channel of nodes, which the caller can receive -// all the child nodes of 'root' on, in proper order. +// It returns an array of NodePromise with the linked nodes all in the proper +// order. func GetDAG(ctx context.Context, ds DAGService, root Node) []*NodePromise { var cids []*cid.Cid for _, lnk := range root.Links() { diff --git a/merkledag.go b/merkledag.go index cfda2a0..7bde383 100644 --- a/merkledag.go +++ b/merkledag.go @@ -17,9 +17,15 @@ type NodeOption struct { // The basic Node resolution service. type NodeGetter interface { + // Get retrieves nodes by CID. Depending on the NodeGetter + // implementation, this may involve fetching the Node from a remote + // machine; consider setting a deadline in the context. Get(context.Context, *cid.Cid) (Node, error) // TODO(ipfs/go-ipfs#4009): Remove this method after fixing. + + // OfflineNodeGetter returns an version of this NodeGetter that will + // make no network requests. OfflineNodeGetter() NodeGetter } @@ -27,10 +33,17 @@ type NodeGetter interface { // objects faster. type LinkGetter interface { NodeGetter + // TODO(ipfs/go-ipld-format#9): This should return []*cid.Cid + + // GetLinks returns the children of the node refered to by the given + // CID. GetLinks(ctx context.Context, nd *cid.Cid) ([]*Link, error) } +// GetLinks returns the CIDs of the children of the given node. Prefer this +// method over looking up the node itself and calling `Links()` on it as this +// method may be able to use a link cache. func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) { if c.Type() == cid.Raw { return nil, nil @@ -49,16 +62,22 @@ func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) { type DAGService interface { NodeGetter + // Add adds a node to this DAG. Add(Node) (*cid.Cid, error) + // Remove removes a node from this DAG. + // + // If the node is not in this DAG, Remove returns ErrNotFound. // TODO(ipfs/go-ipfs#4010): Change this to take a CID. // This will require a fair amount of refactoring. Remove(Node) error - // TODO: Consider using []NodePromise and providing helper functions - // that take []NodePromise and return channels that yield nodes both - // in-order and as-ready. + // GetMany returns a channel of NodeOptions given a set of CIDs. GetMany(context.Context, []*cid.Cid) <-chan *NodeOption + // AddMany adds many nodes to this DAG. + // + // Consider using NewBatch instead of calling this directly if you need + // to add an unbounded number of nodes to avoid buffering too much. AddMany([]Node) ([]*cid.Cid, error) } From 2f52265ee2af7ee4bdd9fb9c9e23e83193fafd4f Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 17 Aug 2017 13:54:15 -0700 Subject: [PATCH 10/21] Make DAGService.Remove take a CID. We're going to want to do this eventually and we have to refactor anyways so we might as well do this now. Fixes ipfs/go-ipfs#4010 --- merkledag.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/merkledag.go b/merkledag.go index 7bde383..ae5e150 100644 --- a/merkledag.go +++ b/merkledag.go @@ -68,9 +68,7 @@ type DAGService interface { // Remove removes a node from this DAG. // // If the node is not in this DAG, Remove returns ErrNotFound. - // TODO(ipfs/go-ipfs#4010): Change this to take a CID. - // This will require a fair amount of refactoring. - Remove(Node) error + Remove(*cid.Cid) error // GetMany returns a channel of NodeOptions given a set of CIDs. GetMany(context.Context, []*cid.Cid) <-chan *NodeOption From 33803890e92705423d0669b48da77a8a016c47f1 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 17 Aug 2017 13:55:42 -0700 Subject: [PATCH 11/21] dag: move GetMany from DAGService to NodeGetter This will allow many consumers of `DAGService` to take `NodeGetter` instead and implementing `GetMany` for all `NodeGetter`s is pretty trivial. --- merkledag.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/merkledag.go b/merkledag.go index ae5e150..17e326c 100644 --- a/merkledag.go +++ b/merkledag.go @@ -22,6 +22,9 @@ type NodeGetter interface { // machine; consider setting a deadline in the context. Get(context.Context, *cid.Cid) (Node, error) + // GetMany returns a channel of NodeOptions given a set of CIDs. + GetMany(context.Context, []*cid.Cid) <-chan *NodeOption + // TODO(ipfs/go-ipfs#4009): Remove this method after fixing. // OfflineNodeGetter returns an version of this NodeGetter that will @@ -70,9 +73,6 @@ type DAGService interface { // If the node is not in this DAG, Remove returns ErrNotFound. Remove(*cid.Cid) error - // GetMany returns a channel of NodeOptions given a set of CIDs. - GetMany(context.Context, []*cid.Cid) <-chan *NodeOption - // AddMany adds many nodes to this DAG. // // Consider using NewBatch instead of calling this directly if you need From f2fc6cece4aa4a6f9756d15739e8b65d931c6ed6 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 17 Aug 2017 14:03:25 -0700 Subject: [PATCH 12/21] helpers: take NodeGetter instead of DAGService. --- daghelpers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daghelpers.go b/daghelpers.go index f4a53d6..7b53a99 100644 --- a/daghelpers.go +++ b/daghelpers.go @@ -9,7 +9,7 @@ import ( // GetDAG will fill out all of the links of the given Node. // It returns an array of NodePromise with the linked nodes all in the proper // order. -func GetDAG(ctx context.Context, ds DAGService, root Node) []*NodePromise { +func GetDAG(ctx context.Context, ds NodeGetter, root Node) []*NodePromise { var cids []*cid.Cid for _, lnk := range root.Links() { cids = append(cids, lnk.Cid) @@ -20,7 +20,7 @@ func GetDAG(ctx context.Context, ds DAGService, root Node) []*NodePromise { // GetNodes returns an array of 'FutureNode' promises, with each corresponding // to the key with the same index as the passed in keys -func GetNodes(ctx context.Context, ds DAGService, keys []*cid.Cid) []*NodePromise { +func GetNodes(ctx context.Context, ds NodeGetter, keys []*cid.Cid) []*NodePromise { // Early out if no work to do if len(keys) == 0 { From 07869d6a2627bbb15702886b7e9f8ebed0949ce8 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 17 Aug 2017 14:05:03 -0700 Subject: [PATCH 13/21] move GetLinks to daghelpers Keep merkledag clean. --- daghelpers.go | 17 +++++++++++++++++ merkledag.go | 17 ----------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/daghelpers.go b/daghelpers.go index 7b53a99..fd72e49 100644 --- a/daghelpers.go +++ b/daghelpers.go @@ -6,6 +6,23 @@ import ( cid "github.com/ipfs/go-cid" ) +// GetLinks returns the CIDs of the children of the given node. Prefer this +// method over looking up the node itself and calling `Links()` on it as this +// method may be able to use a link cache. +func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) { + if c.Type() == cid.Raw { + return nil, nil + } + if gl, ok := ng.(LinkGetter); ok { + return gl.GetLinks(ctx, c) + } + node, err := ng.Get(ctx, c) + if err != nil { + return nil, err + } + return node.Links(), nil +} + // GetDAG will fill out all of the links of the given Node. // It returns an array of NodePromise with the linked nodes all in the proper // order. diff --git a/merkledag.go b/merkledag.go index 17e326c..a6e7048 100644 --- a/merkledag.go +++ b/merkledag.go @@ -44,23 +44,6 @@ type LinkGetter interface { GetLinks(ctx context.Context, nd *cid.Cid) ([]*Link, error) } -// GetLinks returns the CIDs of the children of the given node. Prefer this -// method over looking up the node itself and calling `Links()` on it as this -// method may be able to use a link cache. -func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) { - if c.Type() == cid.Raw { - return nil, nil - } - if gl, ok := ng.(LinkGetter); ok { - return gl.GetLinks(ctx, c) - } - node, err := ng.Get(ctx, c) - if err != nil { - return nil, err - } - return node.Links(), nil -} - // DAGService is an IPFS Merkle DAG service. type DAGService interface { NodeGetter From 52259789c6bcadd1350d117d7fdf8f10dcf95448 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 1 Sep 2017 13:18:21 -0700 Subject: [PATCH 14/21] Get rid of OfflineNodeGetter It really just doesn't fit. We're working on making this method obsolete anyways. --- merkledag.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/merkledag.go b/merkledag.go index a6e7048..50945e9 100644 --- a/merkledag.go +++ b/merkledag.go @@ -24,12 +24,6 @@ type NodeGetter interface { // GetMany returns a channel of NodeOptions given a set of CIDs. GetMany(context.Context, []*cid.Cid) <-chan *NodeOption - - // TODO(ipfs/go-ipfs#4009): Remove this method after fixing. - - // OfflineNodeGetter returns an version of this NodeGetter that will - // make no network requests. - OfflineNodeGetter() NodeGetter } // NodeGetters can optionally implement this interface to make finding linked From 1af7e81da9f453dcf511aae5198820ec5a26855b Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Sun, 15 Oct 2017 17:47:13 -0700 Subject: [PATCH 15/21] port async batch commit code from ipfs (ipfs/go-ipfs#4296) 1. Modern storage devices (i.e., SSDs) tend to be highly parallel. 2. Allows us to read and write at the same time (avoids pausing while flushing). fixes https://github.com/ipfs/go-ipfs/issues/898#issuecomment-331849064 --- batch.go | 102 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 82 insertions(+), 20 deletions(-) diff --git a/batch.go b/batch.go index eadda72..eba48d1 100644 --- a/batch.go +++ b/batch.go @@ -1,51 +1,113 @@ package format import ( + "runtime" + cid "github.com/ipfs/go-cid" ) +// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking. +// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage +// devices, and CPUs to find the right value/formula. +var ParallelBatchCommits = runtime.NumCPU() * 2 + // NewBatch returns a node buffer (Batch) that buffers nodes internally and // commits them to the underlying DAGService in batches. Use this if you intend // to add a lot of nodes all at once. func NewBatch(ds DAGService) *Batch { return &Batch{ - ds: ds, - MaxSize: 8 << 20, + ds: ds, + commitResults: make(chan error, ParallelBatchCommits), + MaxSize: 8 << 20, // By default, only batch up to 128 nodes at a time. // The current implementation of flatfs opens this many file // descriptors at the same time for the optimized batch write. - MaxBlocks: 128, + MaxNodes: 128, } } +// Batch is a buffer for batching adds to a dag. type Batch struct { ds DAGService - // TODO: try to re-use memory. - nodes []Node - size int - MaxSize int - MaxBlocks int + activeCommits int + commitError error + commitResults chan error + + nodes []Node + size int + + MaxSize int + MaxNodes int +} + +func (t *Batch) processResults() { + for t.activeCommits > 0 && t.commitError == nil { + select { + case err := <-t.commitResults: + t.activeCommits-- + if err != nil { + t.commitError = err + } + default: + return + } + } +} + +func (t *Batch) asyncCommit() { + numBlocks := len(t.nodes) + if numBlocks == 0 || t.commitError != nil { + return + } + if t.activeCommits >= ParallelBatchCommits { + err := <-t.commitResults + t.activeCommits-- + + if err != nil { + t.commitError = err + return + } + } + go func(b []Node) { + _, err := t.ds.AddMany(b) + t.commitResults <- err + }(t.nodes) + + t.activeCommits++ + t.nodes = make([]Node, 0, numBlocks) + t.size = 0 + + return } -// Add a node to this batch of nodes, potentially committing the set of batched -// nodes to the underlying DAGService. +// Add adds a node to the batch and commits the batch if necessary. func (t *Batch) Add(nd Node) (*cid.Cid, error) { + // Not strictly necessary but allows us to catch errors early. + t.processResults() + if t.commitError != nil { + return nil, t.commitError + } + t.nodes = append(t.nodes, nd) t.size += len(nd.RawData()) - if t.size > t.MaxSize || len(t.nodes) > t.MaxBlocks { - return nd.Cid(), t.Commit() + if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes { + t.asyncCommit() } - return nd.Cid(), nil + return nd.Cid(), t.commitError } -// Commit commits the buffered of nodes to the underlying DAGService. -// Make sure to call this after you're done adding nodes to the batch to ensure -// that they're actually added to the DAGService. +// Commit commits batched nodes. func (t *Batch) Commit() error { - _, err := t.ds.AddMany(t.nodes) - t.nodes = nil - t.size = 0 - return err + t.asyncCommit() + for t.activeCommits > 0 && t.commitError == nil { + err := <-t.commitResults + t.activeCommits-- + if err != nil { + t.commitError = err + } + } + + return t.commitError } From 0408f8d68412d56f7d0eeb3cce5b2bc177e8d23d Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Sun, 15 Oct 2017 18:35:45 -0700 Subject: [PATCH 16/21] add a basic test for Batch --- batch_test.go | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 batch_test.go diff --git a/batch_test.go b/batch_test.go new file mode 100644 index 0000000..c4328cd --- /dev/null +++ b/batch_test.go @@ -0,0 +1,105 @@ +package format + +import ( + "context" + "sync" + "testing" + + cid "github.com/ipfs/go-cid" +) + +// Test dag +type testDag struct { + mu sync.Mutex + nodes map[string]Node +} + +func newTestDag() *testDag { + return &testDag{nodes: make(map[string]Node)} +} + +func (d *testDag) Get(ctx context.Context, cid *cid.Cid) (Node, error) { + d.mu.Lock() + defer d.mu.Unlock() + if n, ok := d.nodes[cid.KeyString()]; ok { + return n, nil + } + return nil, ErrNotFound +} + +func (d *testDag) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOption { + d.mu.Lock() + defer d.mu.Unlock() + out := make(chan *NodeOption, len(cids)) + for _, c := range cids { + if n, ok := d.nodes[c.KeyString()]; ok { + out <- &NodeOption{Node: n} + } else { + out <- &NodeOption{Err: ErrNotFound} + } + } + return out +} + +func (d *testDag) Add(node Node) (*cid.Cid, error) { + d.mu.Lock() + defer d.mu.Unlock() + c := node.Cid() + d.nodes[c.KeyString()] = node + return c, nil +} + +func (d *testDag) AddMany(nodes []Node) ([]*cid.Cid, error) { + d.mu.Lock() + defer d.mu.Unlock() + cids := make([]*cid.Cid, len(nodes)) + for i, n := range nodes { + c := n.Cid() + d.nodes[c.KeyString()] = n + cids[i] = c + } + return cids, nil +} + +func (d *testDag) Remove(c *cid.Cid) error { + d.mu.Lock() + defer d.mu.Unlock() + key := c.KeyString() + if _, exists := d.nodes[key]; !exists { + return ErrNotFound + } + delete(d.nodes, key) + return nil +} + +var _ DAGService = new(testDag) + +func TestBatch(t *testing.T) { + d := newTestDag() + b := NewBatch(d) + for i := 0; i < 1000; i++ { + // It would be great if we could use *many* different nodes here + // but we can't add any dependencies and I don't feel like adding + // any more testing code. + if _, err := b.Add(new(EmptyNode)); err != nil { + t.Fatal(err) + } + } + if err := b.Commit(); err != nil { + t.Fatal(err) + } + + n, err := d.Get(context.Background(), new(EmptyNode).Cid()) + if err != nil { + t.Fatal(err) + } + switch n.(type) { + case *EmptyNode: + default: + t.Fatal("expected the node to exist in the dag") + } + + if len(d.nodes) != 1 { + t.Fatal("should have one node") + } +} From 6f9115bb766dde187a66fb2d7a4664e0bacd030c Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 15 Nov 2017 09:42:18 -0800 Subject: [PATCH 17/21] Make remove idempotent. 1. Add is already idempotent. This makes remove match. 2. Generally, all we care about is that the node no longer exists in the DAG. If two callers remove a node at the same time, they should both succeed. Currently, we *ignore* the result of Remove in go-ipfs. IMO, it would be better to let errors be *errors* and only return an error if something goes wrong. 3. This can be significantly faster. It allows us to batch/queue removes (as long as we guarantee that they'll eventually happen). 4. This matches how most databases/key-value stores operate. An alternative would be to return `(deleted bool, err error)` but then we don't get the speedup. --- batch_test.go | 6 +----- merkledag.go | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/batch_test.go b/batch_test.go index c4328cd..2ccd162 100644 --- a/batch_test.go +++ b/batch_test.go @@ -64,11 +64,7 @@ func (d *testDag) AddMany(nodes []Node) ([]*cid.Cid, error) { func (d *testDag) Remove(c *cid.Cid) error { d.mu.Lock() defer d.mu.Unlock() - key := c.KeyString() - if _, exists := d.nodes[key]; !exists { - return ErrNotFound - } - delete(d.nodes, key) + delete(d.nodes, c.KeyString()) return nil } diff --git a/merkledag.go b/merkledag.go index 50945e9..b2de21b 100644 --- a/merkledag.go +++ b/merkledag.go @@ -47,7 +47,7 @@ type DAGService interface { // Remove removes a node from this DAG. // - // If the node is not in this DAG, Remove returns ErrNotFound. + // Remove returns no error if the requested node is not present in this DAG. Remove(*cid.Cid) error // AddMany adds many nodes to this DAG. From b4033237b46a7cde5713e5015b076870244e589f Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 15 Nov 2017 22:06:34 -0800 Subject: [PATCH 18/21] don't return CIDs on add The caller can just call `node.Cid()` and returning CIDs from `AddMany` requires allocation. --- batch.go | 2 +- batch_test.go | 18 +++++++----------- merkledag.go | 4 ++-- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/batch.go b/batch.go index eba48d1..cdfd7d5 100644 --- a/batch.go +++ b/batch.go @@ -71,7 +71,7 @@ func (t *Batch) asyncCommit() { } } go func(b []Node) { - _, err := t.ds.AddMany(b) + err := t.ds.AddMany(b) t.commitResults <- err }(t.nodes) diff --git a/batch_test.go b/batch_test.go index 2ccd162..5bb5a33 100644 --- a/batch_test.go +++ b/batch_test.go @@ -41,24 +41,20 @@ func (d *testDag) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOpti return out } -func (d *testDag) Add(node Node) (*cid.Cid, error) { +func (d *testDag) Add(node Node) error { d.mu.Lock() defer d.mu.Unlock() - c := node.Cid() - d.nodes[c.KeyString()] = node - return c, nil + d.nodes[node.Cid().KeyString()] = node + return nil } -func (d *testDag) AddMany(nodes []Node) ([]*cid.Cid, error) { +func (d *testDag) AddMany(nodes []Node) error { d.mu.Lock() defer d.mu.Unlock() - cids := make([]*cid.Cid, len(nodes)) - for i, n := range nodes { - c := n.Cid() - d.nodes[c.KeyString()] = n - cids[i] = c + for _, n := range nodes { + d.nodes[n.Cid().KeyString()] = n } - return cids, nil + return nil } func (d *testDag) Remove(c *cid.Cid) error { diff --git a/merkledag.go b/merkledag.go index b2de21b..efeb249 100644 --- a/merkledag.go +++ b/merkledag.go @@ -43,7 +43,7 @@ type DAGService interface { NodeGetter // Add adds a node to this DAG. - Add(Node) (*cid.Cid, error) + Add(Node) error // Remove removes a node from this DAG. // @@ -54,5 +54,5 @@ type DAGService interface { // // Consider using NewBatch instead of calling this directly if you need // to add an unbounded number of nodes to avoid buffering too much. - AddMany([]Node) ([]*cid.Cid, error) + AddMany([]Node) error } From f10b5dd9ccf77bb0d6949ee7af70f390e3975183 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 15 Nov 2017 22:12:11 -0800 Subject: [PATCH 19/21] Add RemoveMany method --- batch_test.go | 9 +++++++++ merkledag.go | 5 +++++ 2 files changed, 14 insertions(+) diff --git a/batch_test.go b/batch_test.go index 5bb5a33..070b565 100644 --- a/batch_test.go +++ b/batch_test.go @@ -64,6 +64,15 @@ func (d *testDag) Remove(c *cid.Cid) error { return nil } +func (d *testDag) RemoveMany(cids []*cid.Cid) error { + d.mu.Lock() + defer d.mu.Unlock() + for _, c := range cids { + delete(d.nodes, c.KeyString()) + } + return nil +} + var _ DAGService = new(testDag) func TestBatch(t *testing.T) { diff --git a/merkledag.go b/merkledag.go index efeb249..4981ec3 100644 --- a/merkledag.go +++ b/merkledag.go @@ -55,4 +55,9 @@ type DAGService interface { // Consider using NewBatch instead of calling this directly if you need // to add an unbounded number of nodes to avoid buffering too much. AddMany([]Node) error + + // RemoveMany removes many nodes from this DAG. + // + // It returns success even if the nodes were not present in the DAG. + RemoveMany([]*cid.Cid) error } From 44a78014ee9e9b490d2876d8d693731b0558822d Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 15 Nov 2017 22:15:57 -0800 Subject: [PATCH 20/21] add contexts to Add/Remove methods We'll need these for slower/remote datastores. --- batch.go | 3 ++- batch_test.go | 8 ++++---- merkledag.go | 8 ++++---- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/batch.go b/batch.go index cdfd7d5..b0444f0 100644 --- a/batch.go +++ b/batch.go @@ -1,6 +1,7 @@ package format import ( + "context" "runtime" cid "github.com/ipfs/go-cid" @@ -71,7 +72,7 @@ func (t *Batch) asyncCommit() { } } go func(b []Node) { - err := t.ds.AddMany(b) + err := t.ds.AddMany(context.Background(), b) t.commitResults <- err }(t.nodes) diff --git a/batch_test.go b/batch_test.go index 070b565..74cd504 100644 --- a/batch_test.go +++ b/batch_test.go @@ -41,14 +41,14 @@ func (d *testDag) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOpti return out } -func (d *testDag) Add(node Node) error { +func (d *testDag) Add(ctx context.Context, node Node) error { d.mu.Lock() defer d.mu.Unlock() d.nodes[node.Cid().KeyString()] = node return nil } -func (d *testDag) AddMany(nodes []Node) error { +func (d *testDag) AddMany(ctx context.Context, nodes []Node) error { d.mu.Lock() defer d.mu.Unlock() for _, n := range nodes { @@ -57,14 +57,14 @@ func (d *testDag) AddMany(nodes []Node) error { return nil } -func (d *testDag) Remove(c *cid.Cid) error { +func (d *testDag) Remove(ctx context.Context, c *cid.Cid) error { d.mu.Lock() defer d.mu.Unlock() delete(d.nodes, c.KeyString()) return nil } -func (d *testDag) RemoveMany(cids []*cid.Cid) error { +func (d *testDag) RemoveMany(ctx context.Context, cids []*cid.Cid) error { d.mu.Lock() defer d.mu.Unlock() for _, c := range cids { diff --git a/merkledag.go b/merkledag.go index 4981ec3..058fb8f 100644 --- a/merkledag.go +++ b/merkledag.go @@ -43,21 +43,21 @@ type DAGService interface { NodeGetter // Add adds a node to this DAG. - Add(Node) error + Add(context.Context, Node) error // Remove removes a node from this DAG. // // Remove returns no error if the requested node is not present in this DAG. - Remove(*cid.Cid) error + Remove(context.Context, *cid.Cid) error // AddMany adds many nodes to this DAG. // // Consider using NewBatch instead of calling this directly if you need // to add an unbounded number of nodes to avoid buffering too much. - AddMany([]Node) error + AddMany(context.Context, []Node) error // RemoveMany removes many nodes from this DAG. // // It returns success even if the nodes were not present in the DAG. - RemoveMany([]*cid.Cid) error + RemoveMany(context.Context, []*cid.Cid) error } From 6cf32cc871c91b8ed30fc8d314e138d3de572953 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 16 Nov 2017 11:50:20 -0800 Subject: [PATCH 21/21] add context to batch I considered (well, implemented then threw it away) allowing contexts on all calls to Batch (Add, Commit, etc). However, really, you should treat a batch as a single large "operation". I also went down the road of generalizing batches to sessions. However, it became immediately obvious that permitting add *and* remove *and* fetch would require a lot of bookkeeping and that you'd lose a lot of performance. So, we'll do that separately. --- batch.go | 115 ++++++++++++++++++++++++++++++++++++++------------ batch_test.go | 9 ++-- 2 files changed, 95 insertions(+), 29 deletions(-) diff --git a/batch.go b/batch.go index b0444f0..754490b 100644 --- a/batch.go +++ b/batch.go @@ -2,9 +2,8 @@ package format import ( "context" + "errors" "runtime" - - cid "github.com/ipfs/go-cid" ) // ParallelBatchCommits is the number of batch commits that can be in-flight before blocking. @@ -12,12 +11,24 @@ import ( // devices, and CPUs to find the right value/formula. var ParallelBatchCommits = runtime.NumCPU() * 2 +// ErrNotCommited is returned when closing a batch that hasn't been successfully +// committed. +var ErrNotCommited = errors.New("error: batch not commited") + +// ErrClosed is returned when operating on a batch that has already been closed. +var ErrClosed = errors.New("error: batch closed") + // NewBatch returns a node buffer (Batch) that buffers nodes internally and // commits them to the underlying DAGService in batches. Use this if you intend -// to add a lot of nodes all at once. -func NewBatch(ds DAGService) *Batch { +// to add or remove a lot of nodes all at once. +// +// If the passed context is canceled, any in-progress commits are aborted. +func NewBatch(ctx context.Context, ds DAGService) *Batch { + ctx, cancel := context.WithCancel(ctx) return &Batch{ ds: ds, + ctx: ctx, + cancel: cancel, commitResults: make(chan error, ParallelBatchCommits), MaxSize: 8 << 20, @@ -32,8 +43,11 @@ func NewBatch(ds DAGService) *Batch { type Batch struct { ds DAGService + ctx context.Context + cancel func() + activeCommits int - commitError error + err error commitResults chan error nodes []Node @@ -44,12 +58,13 @@ type Batch struct { } func (t *Batch) processResults() { - for t.activeCommits > 0 && t.commitError == nil { + for t.activeCommits > 0 { select { case err := <-t.commitResults: t.activeCommits-- if err != nil { - t.commitError = err + t.setError(err) + return } default: return @@ -59,22 +74,29 @@ func (t *Batch) processResults() { func (t *Batch) asyncCommit() { numBlocks := len(t.nodes) - if numBlocks == 0 || t.commitError != nil { + if numBlocks == 0 { return } if t.activeCommits >= ParallelBatchCommits { - err := <-t.commitResults - t.activeCommits-- + select { + case err := <-t.commitResults: + t.activeCommits-- - if err != nil { - t.commitError = err + if err != nil { + t.setError(err) + return + } + case <-t.ctx.Done(): + t.setError(t.ctx.Err()) return } } - go func(b []Node) { - err := t.ds.AddMany(context.Background(), b) - t.commitResults <- err - }(t.nodes) + go func(ctx context.Context, b []Node, result chan error, ds DAGService) { + select { + case result <- ds.AddMany(ctx, b): + case <-ctx.Done(): + } + }(t.ctx, t.nodes, t.commitResults, t.ds) t.activeCommits++ t.nodes = make([]Node, 0, numBlocks) @@ -84,31 +106,72 @@ func (t *Batch) asyncCommit() { } // Add adds a node to the batch and commits the batch if necessary. -func (t *Batch) Add(nd Node) (*cid.Cid, error) { +func (t *Batch) Add(nd Node) error { + if t.err != nil { + return t.err + } // Not strictly necessary but allows us to catch errors early. t.processResults() - if t.commitError != nil { - return nil, t.commitError + + if t.err != nil { + return t.err } t.nodes = append(t.nodes, nd) t.size += len(nd.RawData()) + if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes { t.asyncCommit() } - return nd.Cid(), t.commitError + return t.err } // Commit commits batched nodes. func (t *Batch) Commit() error { + if t.err != nil { + return t.err + } + t.asyncCommit() - for t.activeCommits > 0 && t.commitError == nil { - err := <-t.commitResults - t.activeCommits-- - if err != nil { - t.commitError = err + +loop: + for t.activeCommits > 0 { + select { + case err := <-t.commitResults: + t.activeCommits-- + if err != nil { + t.setError(err) + break loop + } + case <-t.ctx.Done(): + t.setError(t.ctx.Err()) + break loop + } + } + + return t.err +} + +func (t *Batch) setError(err error) { + t.err = err + + t.cancel() + + // Drain as much as we can without blocking. +loop: + for { + select { + case <-t.commitResults: + default: + break loop } } - return t.commitError + // Be nice and cleanup. These can take a *lot* of memory. + t.commitResults = nil + t.ds = nil + t.ctx = nil + t.nodes = nil + t.size = 0 + t.activeCommits = 0 } diff --git a/batch_test.go b/batch_test.go index 74cd504..71972cb 100644 --- a/batch_test.go +++ b/batch_test.go @@ -76,13 +76,16 @@ func (d *testDag) RemoveMany(ctx context.Context, cids []*cid.Cid) error { var _ DAGService = new(testDag) func TestBatch(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + d := newTestDag() - b := NewBatch(d) + b := NewBatch(ctx, d) for i := 0; i < 1000; i++ { // It would be great if we could use *many* different nodes here // but we can't add any dependencies and I don't feel like adding // any more testing code. - if _, err := b.Add(new(EmptyNode)); err != nil { + if err := b.Add(new(EmptyNode)); err != nil { t.Fatal(err) } } @@ -90,7 +93,7 @@ func TestBatch(t *testing.T) { t.Fatal(err) } - n, err := d.Get(context.Background(), new(EmptyNode).Cid()) + n, err := d.Get(ctx, new(EmptyNode).Cid()) if err != nil { t.Fatal(err) }