Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract dagservice and friends from go-ipfs #8

Merged
merged 21 commits into from
Dec 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package format
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot of code for an interface package. Unfortunately, the interface needs this (unless we decide to introduce a Batch interface.


import (
"context"
"errors"
"runtime"
)

// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2

// ErrNotCommited is returned when closing a batch that hasn't been successfully
// committed.
var ErrNotCommited = errors.New("error: batch not commited")

// ErrClosed is returned when operating on a batch that has already been closed.
var ErrClosed = errors.New("error: batch closed")

// NewBatch returns a node buffer (Batch) that buffers nodes internally and
// commits them to the underlying DAGService in batches. Use this if you intend
// to add or remove a lot of nodes all at once.
//
// If the passed context is canceled, any in-progress commits are aborted.
func NewBatch(ctx context.Context, ds DAGService) *Batch {
ctx, cancel := context.WithCancel(ctx)
return &Batch{
ds: ds,
ctx: ctx,
cancel: cancel,
commitResults: make(chan error, ParallelBatchCommits),
MaxSize: 8 << 20,

// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
// descriptors at the same time for the optimized batch write.
MaxNodes: 128,
}
}

// Batch is a buffer for batching adds to a dag.
type Batch struct {
ds DAGService

ctx context.Context
cancel func()

activeCommits int
err error
commitResults chan error

nodes []Node
size int

MaxSize int
MaxNodes int
}

func (t *Batch) processResults() {
for t.activeCommits > 0 {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.setError(err)
return
}
default:
return
}
}
}

func (t *Batch) asyncCommit() {
numBlocks := len(t.nodes)
if numBlocks == 0 {
return
}
if t.activeCommits >= ParallelBatchCommits {
select {
case err := <-t.commitResults:
t.activeCommits--

if err != nil {
t.setError(err)
return
}
case <-t.ctx.Done():
t.setError(t.ctx.Err())
return
}
}
go func(ctx context.Context, b []Node, result chan error, ds DAGService) {
select {
case result <- ds.AddMany(ctx, b):
case <-ctx.Done():
}
}(t.ctx, t.nodes, t.commitResults, t.ds)

t.activeCommits++
t.nodes = make([]Node, 0, numBlocks)
t.size = 0

return
}

// Add adds a node to the batch and commits the batch if necessary.
func (t *Batch) Add(nd Node) error {
if t.err != nil {
return t.err
}
// Not strictly necessary but allows us to catch errors early.
t.processResults()

if t.err != nil {
return t.err
}

t.nodes = append(t.nodes, nd)
t.size += len(nd.RawData())

if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes {
t.asyncCommit()
}
return t.err
}

// Commit commits batched nodes.
func (t *Batch) Commit() error {
if t.err != nil {
return t.err
}

t.asyncCommit()

loop:
for t.activeCommits > 0 {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.setError(err)
break loop
}
case <-t.ctx.Done():
t.setError(t.ctx.Err())
break loop
}
}

return t.err
}

func (t *Batch) setError(err error) {
t.err = err

t.cancel()

// Drain as much as we can without blocking.
loop:
for {
select {
case <-t.commitResults:
default:
break loop
}
}

// Be nice and cleanup. These can take a *lot* of memory.
t.commitResults = nil
t.ds = nil
t.ctx = nil
t.nodes = nil
t.size = 0
t.activeCommits = 0
}
109 changes: 109 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package format

import (
"context"
"sync"
"testing"

cid "github.com/ipfs/go-cid"
)

// Test dag
type testDag struct {
mu sync.Mutex
nodes map[string]Node
}

func newTestDag() *testDag {
return &testDag{nodes: make(map[string]Node)}
}

func (d *testDag) Get(ctx context.Context, cid *cid.Cid) (Node, error) {
d.mu.Lock()
defer d.mu.Unlock()
if n, ok := d.nodes[cid.KeyString()]; ok {
return n, nil
}
return nil, ErrNotFound
}

func (d *testDag) GetMany(ctx context.Context, cids []*cid.Cid) <-chan *NodeOption {
d.mu.Lock()
defer d.mu.Unlock()
out := make(chan *NodeOption, len(cids))
for _, c := range cids {
if n, ok := d.nodes[c.KeyString()]; ok {
out <- &NodeOption{Node: n}
} else {
out <- &NodeOption{Err: ErrNotFound}
}
}
return out
}

func (d *testDag) Add(ctx context.Context, node Node) error {
d.mu.Lock()
defer d.mu.Unlock()
d.nodes[node.Cid().KeyString()] = node
return nil
}

func (d *testDag) AddMany(ctx context.Context, nodes []Node) error {
d.mu.Lock()
defer d.mu.Unlock()
for _, n := range nodes {
d.nodes[n.Cid().KeyString()] = n
}
return nil
}

func (d *testDag) Remove(ctx context.Context, c *cid.Cid) error {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.nodes, c.KeyString())
return nil
}

func (d *testDag) RemoveMany(ctx context.Context, cids []*cid.Cid) error {
d.mu.Lock()
defer d.mu.Unlock()
for _, c := range cids {
delete(d.nodes, c.KeyString())
}
return nil
}

var _ DAGService = new(testDag)

func TestBatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d := newTestDag()
b := NewBatch(ctx, d)
for i := 0; i < 1000; i++ {
// It would be great if we could use *many* different nodes here
// but we can't add any dependencies and I don't feel like adding
// any more testing code.
if err := b.Add(new(EmptyNode)); err != nil {
t.Fatal(err)
}
}
if err := b.Commit(); err != nil {
t.Fatal(err)
}

n, err := d.Get(ctx, new(EmptyNode).Cid())
if err != nil {
t.Fatal(err)
}
switch n.(type) {
case *EmptyNode:
default:
t.Fatal("expected the node to exist in the dag")
}

if len(d.nodes) != 1 {
t.Fatal("should have one node")
}
}
99 changes: 99 additions & 0 deletions daghelpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package format

import (
"context"

cid "github.com/ipfs/go-cid"
)

// GetLinks returns the CIDs of the children of the given node. Prefer this
// method over looking up the node itself and calling `Links()` on it as this
// method may be able to use a link cache.
func GetLinks(ctx context.Context, ng NodeGetter, c *cid.Cid) ([]*Link, error) {
if c.Type() == cid.Raw {
return nil, nil
}
if gl, ok := ng.(LinkGetter); ok {
return gl.GetLinks(ctx, c)
}
node, err := ng.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links(), nil
}

// GetDAG will fill out all of the links of the given Node.
// It returns an array of NodePromise with the linked nodes all in the proper
// order.
func GetDAG(ctx context.Context, ds NodeGetter, root Node) []*NodePromise {
var cids []*cid.Cid
for _, lnk := range root.Links() {
cids = append(cids, lnk.Cid)
}

return GetNodes(ctx, ds, cids)
}

// GetNodes returns an array of 'FutureNode' promises, with each corresponding
// to the key with the same index as the passed in keys
func GetNodes(ctx context.Context, ds NodeGetter, keys []*cid.Cid) []*NodePromise {

// Early out if no work to do
if len(keys) == 0 {
return nil
}

promises := make([]*NodePromise, len(keys))
for i := range keys {
promises[i] = NewNodePromise(ctx)
}

dedupedKeys := dedupeKeys(keys)
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

nodechan := ds.GetMany(ctx, dedupedKeys)

for count := 0; count < len(keys); {
select {
case opt, ok := <-nodechan:
if !ok {
for _, p := range promises {
p.Fail(ErrNotFound)
}
return
}

if opt.Err != nil {
for _, p := range promises {
p.Fail(opt.Err)
}
return
}

nd := opt.Node
c := nd.Cid()
for i, lnk_c := range keys {
if c.Equals(lnk_c) {
count++
promises[i].Send(nd)
}
}
case <-ctx.Done():
return
}
}
}()
return promises
}

// Remove duplicates from a list of keys
func dedupeKeys(cids []*cid.Cid) []*cid.Cid {
set := cid.NewSet()
for _, c := range cids {
set.Add(c)
}
return set.Keys()
}
Loading