-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract dagservice and friends from go-ipfs #8
Merged
+518
−4
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
ac092e5
nit: document Node
Stebalien b3a1f4b
[WIP] [RFC] extract dagservice and friends from go-ipfs
Stebalien 0f9e9ed
Make Batch a helper type/function
Stebalien 2696405
replace LinkService with an optional LinkGetter interface
Stebalien 5837bec
add/update comments about issues with DAGService interface
Stebalien c0311d7
NodePromise: replace interface with concrete type
Stebalien 339e9ea
remove useless helper function
Stebalien 5bc8a07
remove TODO for NodeGetter
Stebalien 30aecc4
document new methods (and rename Batching to NewBatch for consistency).
Stebalien 2f52265
Make DAGService.Remove take a CID.
Stebalien 3380389
dag: move GetMany from DAGService to NodeGetter
Stebalien f2fc6ce
helpers: take NodeGetter instead of DAGService.
Stebalien 07869d6
move GetLinks to daghelpers
Stebalien 5225978
Get rid of OfflineNodeGetter
Stebalien 1af7e81
port async batch commit code from ipfs
Stebalien 0408f8d
add a basic test for Batch
Stebalien 6f9115b
Make remove idempotent.
Stebalien b403323
don't return CIDs on add
Stebalien f10b5dd
Add RemoveMany method
Stebalien 44a7801
add contexts to Add/Remove methods
Stebalien 6cf32cc
add context to batch
Stebalien File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
package format | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"runtime" | ||
) | ||
|
||
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking. | ||
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage | ||
// devices, and CPUs to find the right value/formula. | ||
var ParallelBatchCommits = runtime.NumCPU() * 2 | ||
|
||
// ErrNotCommited is returned when closing a batch that hasn't been successfully | ||
// committed. | ||
var ErrNotCommited = errors.New("error: batch not commited") | ||
|
||
// ErrClosed is returned when operating on a batch that has already been closed. | ||
var ErrClosed = errors.New("error: batch closed") | ||
|
||
// NewBatch returns a node buffer (Batch) that buffers nodes internally and | ||
// commits them to the underlying DAGService in batches. Use this if you intend | ||
// to add or remove a lot of nodes all at once. | ||
// | ||
// If the passed context is canceled, any in-progress commits are aborted. | ||
func NewBatch(ctx context.Context, ds DAGService) *Batch { | ||
ctx, cancel := context.WithCancel(ctx) | ||
return &Batch{ | ||
ds: ds, | ||
ctx: ctx, | ||
cancel: cancel, | ||
commitResults: make(chan error, ParallelBatchCommits), | ||
MaxSize: 8 << 20, | ||
|
||
// By default, only batch up to 128 nodes at a time. | ||
// The current implementation of flatfs opens this many file | ||
// descriptors at the same time for the optimized batch write. | ||
MaxNodes: 128, | ||
} | ||
} | ||
|
||
// Batch is a buffer for batching adds to a dag. | ||
type Batch struct { | ||
ds DAGService | ||
|
||
ctx context.Context | ||
cancel func() | ||
|
||
activeCommits int | ||
err error | ||
commitResults chan error | ||
|
||
nodes []Node | ||
size int | ||
|
||
MaxSize int | ||
MaxNodes int | ||
} | ||
|
||
func (t *Batch) processResults() { | ||
for t.activeCommits > 0 { | ||
select { | ||
case err := <-t.commitResults: | ||
t.activeCommits-- | ||
if err != nil { | ||
t.setError(err) | ||
return | ||
} | ||
default: | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (t *Batch) asyncCommit() { | ||
numBlocks := len(t.nodes) | ||
if numBlocks == 0 { | ||
return | ||
} | ||
if t.activeCommits >= ParallelBatchCommits { | ||
select { | ||
case err := <-t.commitResults: | ||
t.activeCommits-- | ||
|
||
if err != nil { | ||
t.setError(err) | ||
return | ||
} | ||
case <-t.ctx.Done(): | ||
t.setError(t.ctx.Err()) | ||
return | ||
} | ||
} | ||
go func(ctx context.Context, b []Node, result chan error, ds DAGService) { | ||
select { | ||
case result <- ds.AddMany(ctx, b): | ||
case <-ctx.Done(): | ||
} | ||
}(t.ctx, t.nodes, t.commitResults, t.ds) | ||
|
||
t.activeCommits++ | ||
t.nodes = make([]Node, 0, numBlocks) | ||
t.size = 0 | ||
|
||
return | ||
} | ||
|
||
// Add adds a node to the batch and commits the batch if necessary. | ||
func (t *Batch) Add(nd Node) error { | ||
if t.err != nil { | ||
return t.err | ||
} | ||
// Not strictly necessary but allows us to catch errors early. | ||
t.processResults() | ||
|
||
if t.err != nil { | ||
return t.err | ||
} | ||
|
||
t.nodes = append(t.nodes, nd) | ||
t.size += len(nd.RawData()) | ||
|
||
if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes { | ||
t.asyncCommit() | ||
} | ||
return t.err | ||
} | ||
|
||
// Commit commits batched nodes. | ||
func (t *Batch) Commit() error { | ||
if t.err != nil { | ||
return t.err | ||
} | ||
|
||
t.asyncCommit() | ||
|
||
loop: | ||
for t.activeCommits > 0 { | ||
select { | ||
case err := <-t.commitResults: | ||
t.activeCommits-- | ||
if err != nil { | ||
t.setError(err) | ||
break loop | ||
} | ||
case <-t.ctx.Done(): | ||
t.setError(t.ctx.Err()) | ||
break loop | ||
} | ||
} | ||
|
||
return t.err | ||
} | ||
|
||
func (t *Batch) setError(err error) { | ||
t.err = err | ||
|
||
t.cancel() | ||
|
||
// Drain as much as we can without blocking. | ||
loop: | ||
for { | ||
select { | ||
case <-t.commitResults: | ||
default: | ||
break loop | ||
} | ||
} | ||
|
||
// Be nice and cleanup. These can take a *lot* of memory. | ||
t.commitResults = nil | ||
t.ds = nil | ||
t.ctx = nil | ||
t.nodes = nil | ||
t.size = 0 | ||
t.activeCommits = 0 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package format | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"testing" | ||
|
||
cid "github.com/ipfs/go-cid" | ||
) | ||
|
||
// Test dag | ||
type testDag struct { | ||
mu sync.Mutex | ||
nodes map[string]Node | ||
} | ||
|
||
func newTestDag() *testDag { | ||
return &testDag{nodes: make(map[string]Node)} | ||
} | ||
|
||
func (d *testDag) Get(ctx context.Context, cid *cid.Cid) (Node, error) { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
if n, ok := d.nodes[cid.KeyString()]; ok { | ||
return n, nil | ||
} | ||
return nil, ErrNotFound | ||
} | ||
|
||
func (d *testDag) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOption { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
out := make(chan *NodeOption, len(cids)) | ||
for _, c := range cids { | ||
if n, ok := d.nodes[c.KeyString()]; ok { | ||
out <- &NodeOption{Node: n} | ||
} else { | ||
out <- &NodeOption{Err: ErrNotFound} | ||
} | ||
} | ||
return out | ||
} | ||
|
||
func (d *testDag) Add(ctx context.Context, node Node) error { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
d.nodes[node.Cid().KeyString()] = node | ||
return nil | ||
} | ||
|
||
func (d *testDag) AddMany(ctx context.Context, nodes []Node) error { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
for _, n := range nodes { | ||
d.nodes[n.Cid().KeyString()] = n | ||
} | ||
return nil | ||
} | ||
|
||
func (d *testDag) Remove(ctx context.Context, c *cid.Cid) error { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
delete(d.nodes, c.KeyString()) | ||
return nil | ||
} | ||
|
||
func (d *testDag) RemoveMany(ctx context.Context, cids []*cid.Cid) error { | ||
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
for _, c := range cids { | ||
delete(d.nodes, c.KeyString()) | ||
} | ||
return nil | ||
} | ||
|
||
var _ DAGService = new(testDag) | ||
|
||
func TestBatch(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
d := newTestDag() | ||
b := NewBatch(ctx, d) | ||
for i := 0; i < 1000; i++ { | ||
// It would be great if we could use *many* different nodes here | ||
// but we can't add any dependencies and I don't feel like adding | ||
// any more testing code. | ||
if err := b.Add(new(EmptyNode)); err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
if err := b.Commit(); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
n, err := d.Get(ctx, new(EmptyNode).Cid()) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
switch n.(type) { | ||
case *EmptyNode: | ||
default: | ||
t.Fatal("expected the node to exist in the dag") | ||
} | ||
|
||
if len(d.nodes) != 1 { | ||
t.Fatal("should have one node") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
package format | ||
|
||
import ( | ||
"context" | ||
|
||
cid "github.com/ipfs/go-cid" | ||
) | ||
|
||
// GetLinks returns the CIDs of the children of the given node. Prefer this | ||
// method over looking up the node itself and calling `Links()` on it as this | ||
// method may be able to use a link cache. | ||
func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) { | ||
if c.Type() == cid.Raw { | ||
return nil, nil | ||
} | ||
if gl, ok := ng.(LinkGetter); ok { | ||
return gl.GetLinks(ctx, c) | ||
} | ||
node, err := ng.Get(ctx, c) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return node.Links(), nil | ||
} | ||
|
||
// GetDAG will fill out all of the links of the given Node. | ||
// It returns an array of NodePromise with the linked nodes all in the proper | ||
// order. | ||
func GetDAG(ctx context.Context, ds NodeGetter, root Node) []*NodePromise { | ||
var cids []*cid.Cid | ||
for _, lnk := range root.Links() { | ||
cids = append(cids, lnk.Cid) | ||
} | ||
|
||
return GetNodes(ctx, ds, cids) | ||
} | ||
|
||
// GetNodes returns an array of 'FutureNode' promises, with each corresponding | ||
// to the key with the same index as the passed in keys | ||
func GetNodes(ctx context.Context, ds NodeGetter, keys []*cid.Cid) []*NodePromise { | ||
|
||
// Early out if no work to do | ||
if len(keys) == 0 { | ||
return nil | ||
} | ||
|
||
promises := make([]*NodePromise, len(keys)) | ||
for i := range keys { | ||
promises[i] = NewNodePromise(ctx) | ||
} | ||
|
||
dedupedKeys := dedupeKeys(keys) | ||
go func() { | ||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
|
||
nodechan := ds.GetMany(ctx, dedupedKeys) | ||
|
||
for count := 0; count < len(keys); { | ||
select { | ||
case opt, ok := <-nodechan: | ||
if !ok { | ||
for _, p := range promises { | ||
p.Fail(ErrNotFound) | ||
} | ||
return | ||
} | ||
|
||
if opt.Err != nil { | ||
for _, p := range promises { | ||
p.Fail(opt.Err) | ||
} | ||
return | ||
} | ||
|
||
nd := opt.Node | ||
c := nd.Cid() | ||
for i, lnk_c := range keys { | ||
if c.Equals(lnk_c) { | ||
count++ | ||
promises[i].Send(nd) | ||
} | ||
} | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
}() | ||
return promises | ||
} | ||
|
||
// Remove duplicates from a list of keys | ||
func dedupeKeys(cids []*cid.Cid) []*cid.Cid { | ||
set := cid.NewSet() | ||
for _, c := range cids { | ||
set.Add(c) | ||
} | ||
return set.Keys() | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a lot of code for an interface package. Unfortunately, the interface needs this (unless we decide to introduce a
Batch
interface.