Skip to content

Commit

Permalink
cmds/pin: use PostRun in pin add
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Overbool <overbool.xu@gmail.com>
  • Loading branch information
overbool committed Nov 7, 2018
1 parent 5ed708f commit d856824
Showing 1 changed file with 65 additions and 66 deletions.
131 changes: 65 additions & 66 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"fmt"
"io"
"os"
"time"

core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
iface "github.com/ipfs/go-ipfs/core/coreapi/interface"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
Expand All @@ -19,7 +21,7 @@ import (
"gx/ipfs/QmYMQuypUbgsdNHmuCBSUJV6wdQVsBHRivNAp3efHJwZJD/go-verifcid"
cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds"
dag "gx/ipfs/QmaDBne4KeY3UepeqSVKYpSmQGa3q9zP6x3LfVF2UjF3Hc/go-merkledag"
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
)

var PinCmd = &cmds.Command{
Expand Down Expand Up @@ -65,11 +67,6 @@ var addPinCmd = &cmds.Command{
},
Type: AddPinOutput{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
err := req.ParseBodyArgs()
if err != nil {
return err
}

n, err := cmdenv.GetNode(env)
if err != nil {
return err
Expand All @@ -86,6 +83,10 @@ var addPinCmd = &cmds.Command{
recursive, _ := req.Options[pinRecursiveOptionName].(bool)
showProgress, _ := req.Options[pinProgressOptionName].(bool)

if err := req.ParseBodyArgs(); err != nil {
return err
}

if !showProgress {
added, err := corerepo.Pin(n, api, req.Context, req.Arguments, recursive)
if err != nil {
Expand All @@ -94,84 +95,83 @@ var addPinCmd = &cmds.Command{
return cmds.EmitOnce(res, &AddPinOutput{Pins: cidsToStrings(added)})
}

out := make(chan interface{})

v := new(dag.ProgressTracker)
ctx := v.DeriveContext(req.Context)

type pinResult struct {
pins []cid.Cid
err error
}

ch := make(chan pinResult, 1)
go func() {
added, err := corerepo.Pin(n, api, ctx, req.Arguments, recursive)
ch <- pinResult{pins: added, err: err}
}()

errC := make(chan error)
go func() {
var err error
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
defer func() { errC <- err }()
defer close(out)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

for {
select {
case val := <-ch:
if val.err != nil {
err = val.err
return
}
for {
select {
case val := <-ch:
if val.err != nil {
return val.err
}

if pv := v.Value(); pv != 0 {
out <- &AddPinOutput{Progress: v.Value()}
if pv := v.Value(); pv != 0 {
if err := res.Emit(&AddPinOutput{Progress: v.Value()}); err != nil {
return err
}
out <- &AddPinOutput{Pins: cidsToStrings(val.pins)}
return
case <-ticker.C:
out <- &AddPinOutput{Progress: v.Value()}
case <-ctx.Done():
log.Error(ctx.Err())
err = ctx.Err()
return
}
return res.Emit(&AddPinOutput{Pins: cidsToStrings(val.pins)})
case <-ticker.C:
if err := res.Emit(&AddPinOutput{Progress: v.Value()}); err != nil {
return err
}
case <-ctx.Done():
log.Error(ctx.Err())
return ctx.Err()
}
}()

err = res.Emit(out)
if err != nil {
return err
}

return <-errC
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *AddPinOutput) error {
var added []string

if out.Pins != nil {
added = out.Pins
} else {
// this can only happen if the progress option is set
return fmt.Errorf("Fetched/Processed %d nodes\r", out.Progress)
}
PostRun: cmds.PostRunMap{
cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error {
for {
v, err := res.Next()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

var pintype string
rec, found := req.Options["recursive"].(bool)
if rec || !found {
pintype = "recursively"
} else {
pintype = "directly"
}
out, ok := v.(*AddPinOutput)
if !ok {
return e.TypeErr(out, v)
}
var added []string

for _, k := range added {
fmt.Fprintf(w, "pinned %s %s\n", k, pintype)
}
if out.Pins != nil {
added = out.Pins
} else {
// this can only happen if the progress option is set
fmt.Fprintf(os.Stderr, "Fetched/Processed %d nodes\r", out.Progress)
}

return nil
}),
var pintype string
rec, found := res.Request().Options["recursive"].(bool)
if rec || !found {
pintype = "recursively"
} else {
pintype = "directly"
}

for _, k := range added {
fmt.Fprintf(os.Stdout, "pinned %s %s\n", k, pintype)
}
}
},
},
}

Expand All @@ -192,11 +192,6 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)
},
Type: PinOutput{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
err := req.ParseBodyArgs()
if err != nil {
return err
}

n, err := cmdenv.GetNode(env)
if err != nil {
return err
Expand All @@ -210,6 +205,10 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)
// set recursive flag
recursive, _ := req.Options[pinRecursiveOptionName].(bool)

if err := req.ParseBodyArgs(); err != nil {
return err
}

removed, err := corerepo.Unpin(n, api, req.Context, req.Arguments, recursive)
if err != nil {
return err
Expand Down

0 comments on commit d856824

Please sign in to comment.