Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report progress during pin add #3671

Merged
merged 1 commit into from
Mar 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 82 additions & 9 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"time"

cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core"
Expand Down Expand Up @@ -33,6 +34,11 @@ type PinOutput struct {
Pins []*cid.Cid
}

type AddPinOutput struct {
Pins []*cid.Cid
Progress int `json:",omitempty"`
}

var addPinCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Pin objects to local storage.",
Expand All @@ -44,8 +50,9 @@ var addPinCmd = &cmds.Command{
},
Options: []cmds.Option{
cmds.BoolOption("recursive", "r", "Recursively pin the object linked to by the specified object(s).").Default(true),
cmds.BoolOption("progress", "Show progress"),
},
Type: PinOutput{},
Type: AddPinOutput{},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
Expand All @@ -61,22 +68,88 @@ var addPinCmd = &cmds.Command{
res.SetError(err, cmds.ErrNormal)
return
}
showProgress, _, _ := req.Option("progress").Bool()

added, err := corerepo.Pin(n, req.Context(), req.Arguments(), recursive)
if err != nil {
res.SetError(err, cmds.ErrNormal)
if !showProgress {
added, err := corerepo.Pin(n, req.Context(), req.Arguments(), recursive)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
res.SetOutput(&AddPinOutput{Pins: added})
return
}

res.SetOutput(&PinOutput{added})
v := new(dag.ProgressTracker)
ctx := v.DeriveContext(req.Context())

ch := make(chan []*cid.Cid)
go func() {
defer close(ch)
added, err := corerepo.Pin(n, ctx, req.Arguments(), recursive)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
ch <- added
}()
out := make(chan interface{})
res.SetOutput((<-chan interface{})(out))
go func() {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
defer close(out)
for {
select {
case val, ok := <-ch:
if !ok {
// error already set just return
return
}
if pv := v.Value(); pv != 0 {
out <- &AddPinOutput{Progress: v.Value()}
}
out <- &AddPinOutput{Pins: val}
return
case <-ticker.C:
out <- &AddPinOutput{Progress: v.Value()}
case <-ctx.Done():
res.SetError(ctx.Err(), cmds.ErrNormal)
return
}
}
}()
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
added, ok := res.Output().(*PinOutput)
if !ok {
var added []*cid.Cid

switch out := res.Output().(type) {
case *AddPinOutput:
added = out.Pins
case <-chan interface{}:
progressLine := false
for r0 := range out {
r := r0.(*AddPinOutput)
if r.Pins != nil {
added = r.Pins
} else {
if progressLine {
fmt.Fprintf(res.Stderr(), "\r")
}
fmt.Fprintf(res.Stderr(), "Fetched/Processed %d nodes", r.Progress)
progressLine = true
}
}
if progressLine {
fmt.Fprintf(res.Stderr(), "\n")
}
if res.Error() != nil {
return nil, res.Error()
}
default:
return nil, u.ErrCast()
}

var pintype string
rec, found, _ := res.Request().Option("recursive").Bool()
if rec || !found {
Expand All @@ -86,7 +159,7 @@ var addPinCmd = &cmds.Command{
}

buf := new(bytes.Buffer)
for _, k := range added.Pins {
for _, k := range added {
fmt.Fprintf(buf, "pinned %s %s\n", k, pintype)
}
return buf, nil
Expand Down
38 changes: 36 additions & 2 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,21 @@ func (n *dagService) Remove(nd node.Node) error {
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
v, _ := ctx.Value("progress").(*ProgressTracker)
if v == nil {
return EnumerateChildrenAsync(ctx, serv, root, cid.NewSet().Visit)
}
set := cid.NewSet()
visit := func(c *cid.Cid) bool {
if set.Visit(c) {
v.Increment()
return true
} else {
return false
}
}
return EnumerateChildrenAsync(ctx, serv, root, visit)
}

// FindLinks searches this nodes links for the given key,
Expand Down Expand Up @@ -389,6 +402,27 @@ func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit
return nil
}

type ProgressTracker struct {
Total int
lk sync.Mutex
}

func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
return context.WithValue(ctx, "progress", p)
}

func (p *ProgressTracker) Increment() {
p.lk.Lock()
defer p.lk.Unlock()
p.Total++
}

func (p *ProgressTracker) Value() int {
p.lk.Lock()
defer p.lk.Unlock()
return p.Total
}

// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8
Expand Down
78 changes: 78 additions & 0 deletions merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -547,3 +548,80 @@ func TestEnumerateAsyncFailsNotFound(t *testing.T) {
t.Fatal("this should have failed")
}
}

func TestProgressIndicator(t *testing.T) {
testProgressIndicator(t, 5)
}

func TestProgressIndicatorNoChildren(t *testing.T) {
testProgressIndicator(t, 0)
}

func testProgressIndicator(t *testing.T, depth int) {
ds := dstest.Mock()

top, numChildren := mkDag(ds, depth)

v := new(ProgressTracker)
ctx := v.DeriveContext(context.Background())

err := FetchGraph(ctx, top, ds)
if err != nil {
t.Fatal(err)
}

if v.Value() != numChildren+1 {
t.Errorf("wrong number of children reported in progress indicator, expected %d, got %d",
numChildren+1, v.Value())
}
}

func mkDag(ds DAGService, depth int) (*cid.Cid, int) {
totalChildren := 0
f := func() *ProtoNode {
p := new(ProtoNode)
buf := make([]byte, 16)
rand.Read(buf)

p.SetData(buf)
_, err := ds.Add(p)
if err != nil {
panic(err)
}
return p
}

for i := 0; i < depth; i++ {
thisf := f
f = func() *ProtoNode {
pn := mkNodeWithChildren(thisf, 10)
_, err := ds.Add(pn)
if err != nil {
panic(err)
}
totalChildren += 10
return pn
}
}

nd := f()
c, err := ds.Add(nd)
if err != nil {
panic(err)
}

return c, totalChildren
}

func mkNodeWithChildren(getChild func() *ProtoNode, width int) *ProtoNode {
cur := new(ProtoNode)

for i := 0; i < width; i++ {
c := getChild()
if err := cur.AddNodeLinkClean(fmt.Sprint(i), c); err != nil {
panic(err)
}
}

return cur
}
50 changes: 46 additions & 4 deletions test/sharness/t0085-pins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ test_description="Test ipfs pinning operations"


test_pins() {
EXTRA_ARGS=$1

test_expect_success "create some hashes" '
HASH_A=$(echo "A" | ipfs add -q --pin=false) &&
HASH_B=$(echo "B" | ipfs add -q --pin=false) &&
Expand All @@ -30,24 +32,39 @@ test_pins() {
echo $HASH_G >> hashes
'

test_expect_success "pin those hashes via stdin" '
cat hashes | ipfs pin add
test_expect_success "'ipfs pin add $EXTRA_ARGS' via stdin" '
cat hashes | ipfs pin add $EXTRA_ARGS
'

test_expect_success "unpin those hashes" '
cat hashes | ipfs pin rm
'
}

test_pin_dag() {
RANDOM_HASH=Qme8uX5n9hn15pw9p6WcVKoziyyC9LXv4LEgvsmKMULjnV

test_pins_error_reporting() {
EXTRA_ARGS=$1

test_expect_success "'ipfs pin add $EXTRA_ARGS' on non-existent hash should fail" '
test_must_fail ipfs pin add $EXTRA_ARGS $RANDOM_HASH 2> err &&
grep -q "not found" err
'
}

test_pin_dag_init() {
EXTRA_ARGS=$1

test_expect_success "'ipfs add $EXTRA_ARGS --pin=false' 1MB file" '
random 1048576 56 > afile &&
HASH=`ipfs add $EXTRA_ARGS --pin=false -q afile`
'
}

test_expect_success "'ipfs pin add' file" '
test_pin_dag() {
test_pin_dag_init $1

test_expect_success "'ipfs pin add --progress' file" '
ipfs pin add --recursive=true $HASH
'

Expand All @@ -67,20 +84,45 @@ test_pin_dag() {
'
}

test_pin_progress() {
test_pin_dag_init

test_expect_success "'ipfs pin add --progress' file" '
ipfs pin add --progress $HASH 2> err
'

test_expect_success "pin progress reported correctly" '
cat err
grep -q " 5 nodes" err
'
}

test_init_ipfs

test_pins
test_pins --progress

test_pins_error_reporting
test_pins_error_reporting --progress

test_pin_dag
test_pin_dag --raw-leaves

test_pin_progress

test_launch_ipfs_daemon --offline

test_pins
test_pins --progress

test_pins_error_reporting
test_pins_error_reporting --progress

test_pin_dag
test_pin_dag --raw-leaves

test_pin_progress

test_kill_ipfs_daemon

test_done