Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pin cmd: stream recursive pins #5005

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 103 additions & 72 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"time"

bserv "github.com/ipfs/go-ipfs/blockservice"
cmds "github.com/ipfs/go-ipfs/commands"
oldcmds "github.com/ipfs/go-ipfs/commands"
lgc "github.com/ipfs/go-ipfs/commands/legacy"
core "github.com/ipfs/go-ipfs/core"
e "github.com/ipfs/go-ipfs/core/commands/e"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
Expand All @@ -21,6 +22,7 @@ import (

u "gx/ipfs/QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A/go-ipfs-util"
offline "gx/ipfs/QmPf114DXfa6TqGKYhBGR7EtXRho4rCJgwyA1xkuMY5vwF/go-ipfs-exchange-offline"
cmds "gx/ipfs/QmaFrNcnXHp579hUixbcTH1TNtNwsMogtBCwUUUwzBwYoM/go-ipfs-cmds"
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
"gx/ipfs/QmdE4gMduCKCGAcczM2F5ioYDfdeKuPix138wrES1YSr7f/go-ipfs-cmdkit"
)
Expand All @@ -31,11 +33,11 @@ var PinCmd = &cmds.Command{
},

Subcommands: map[string]*cmds.Command{
"add": addPinCmd,
"rm": rmPinCmd,
"add": lgc.NewCommand(addPinCmd),
"rm": lgc.NewCommand(rmPinCmd),
"ls": listPinCmd,
"verify": verifyPinCmd,
"update": updatePinCmd,
"verify": lgc.NewCommand(verifyPinCmd),
"update": lgc.NewCommand(updatePinCmd),
},
}

Expand All @@ -48,7 +50,7 @@ type AddPinOutput struct {
Progress int `json:",omitempty"`
}

var addPinCmd = &cmds.Command{
var addPinCmd = &oldcmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Pin objects to local storage.",
ShortDescription: "Stores an IPFS object(s) from a given path locally to disk.",
Expand All @@ -62,7 +64,7 @@ var addPinCmd = &cmds.Command{
cmdkit.BoolOption("progress", "Show progress"),
},
Type: AddPinOutput{},
Run: func(req cmds.Request, res cmds.Response) {
Run: func(req oldcmds.Request, res oldcmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
Expand Down Expand Up @@ -129,8 +131,8 @@ var addPinCmd = &cmds.Command{
}
}
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
Marshalers: oldcmds.MarshalerMap{
oldcmds.Text: func(res oldcmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
Expand Down Expand Up @@ -171,7 +173,7 @@ var addPinCmd = &cmds.Command{
},
}

var rmPinCmd = &cmds.Command{
var rmPinCmd = &oldcmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Remove pinned objects from local storage.",
ShortDescription: `
Expand All @@ -187,7 +189,7 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)
cmdkit.BoolOption("recursive", "r", "Recursively unpin the object linked to by the specified object(s).").WithDefault(true),
},
Type: PinOutput{},
Run: func(req cmds.Request, res cmds.Response) {
Run: func(req oldcmds.Request, res oldcmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
Expand All @@ -209,8 +211,8 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)

res.SetOutput(&PinOutput{cidsToStrings(removed)})
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
Marshalers: oldcmds.MarshalerMap{
oldcmds.Text: func(res oldcmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
Expand Down Expand Up @@ -280,19 +282,17 @@ Example:
Options: []cmdkit.Option{
cmdkit.StringOption("type", "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\".").WithDefault("all"),
cmdkit.BoolOption("quiet", "q", "Write just hashes of objects."),
cmdkit.BoolOption("stream", "Don't buffer pins before sending."),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) {
n, err := GetNode(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

typeStr, _, err := req.Option("type").String()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
typeStr, _ := req.Options["type"].(string)
stream, _ := req.Options["stream"].(bool)

switch typeStr {
case "all", "direct", "indirect", "recursive":
Expand All @@ -302,51 +302,67 @@ Example:
return
}

var keys map[string]RefKeyObject
emit := res.Emit
lgcList := map[string]RefObject{}
if !stream {
emit = func(v interface{}) error {
obj := v.(*RefKeyObject)
lgcList[obj.Cid] = RefObject{Type: obj.Type}
return nil
}
}

if len(req.Arguments()) > 0 {
keys, err = pinLsKeys(req.Context(), req.Arguments(), typeStr, n)
if len(req.Arguments) > 0 {
err = pinLsKeys(req.Context, req.Arguments, typeStr, n, emit)
} else {
keys, err = pinLsAll(req.Context(), typeStr, n)
err = pinLsAll(req.Context, typeStr, n, emit)
}

if !stream {
res.Emit(&RefKeyList{lgcList})
}

if err != nil {
res.SetError(err, cmdkit.ErrNormal)
} else {
res.SetOutput(&RefKeyList{Keys: keys})
}
},
Type: RefKeyList{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}

quiet, _, err := res.Request().Option("quiet").Bool()
if err != nil {
return nil, err
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
quiet, _ := req.Options["quiet"].(bool)
stream, _ := req.Options["stream"].(bool)

if stream {
obj, ok := v.(*RefKeyObject)
if !ok {
return e.TypeErr(obj, v)
}
if quiet {
fmt.Fprintf(w, "%s\n", obj.Cid)
} else {
fmt.Fprintf(w, "%s %s\n", obj.Cid, obj.Type)
}
return nil
}

keys, ok := v.(*RefKeyList)
if !ok {
return nil, e.TypeErr(keys, v)
return e.TypeErr(keys, v)
}
out := new(bytes.Buffer)

for k, v := range keys.Keys {
if quiet {
fmt.Fprintf(out, "%s\n", k)
fmt.Fprintf(w, "%s\n", k)
} else {
fmt.Fprintf(out, "%s %s\n", k, v.Type)
fmt.Fprintf(w, "%s %s\n", k, v.Type)
}
}
return out, nil
},

return nil
}),
},
}

var updatePinCmd = &cmds.Command{
var updatePinCmd = &oldcmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Update a recursive pin",
ShortDescription: `
Expand All @@ -364,7 +380,7 @@ new pin and removing the old one.
cmdkit.BoolOption("unpin", "Remove the old pin.").WithDefault(true),
},
Type: PinOutput{},
Run: func(req cmds.Request, res cmds.Response) {
Run: func(req oldcmds.Request, res oldcmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
Expand Down Expand Up @@ -414,8 +430,8 @@ new pin and removing the old one.

res.SetOutput(&PinOutput{Pins: []string{from.String(), to.String()}})
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
Marshalers: oldcmds.MarshalerMap{
oldcmds.Text: func(res oldcmds.Response) (io.Reader, error) {
added, ok := res.Output().(*PinOutput)
if !ok {
return nil, u.ErrCast()
Expand All @@ -428,15 +444,15 @@ new pin and removing the old one.
},
}

var verifyPinCmd = &cmds.Command{
var verifyPinCmd = &oldcmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Verify that recursive pins are complete.",
},
Options: []cmdkit.Option{
cmdkit.BoolOption("verbose", "Also write the hashes of non-broken pins."),
cmdkit.BoolOption("quiet", "q", "Write just hashes of broken pins."),
},
Run: func(req cmds.Request, res cmds.Response) {
Run: func(req oldcmds.Request, res oldcmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
Expand All @@ -459,8 +475,8 @@ var verifyPinCmd = &cmds.Command{
res.SetOutput(out)
},
Type: PinVerifyRes{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
Marshalers: oldcmds.MarshalerMap{
oldcmds.Text: func(res oldcmds.Response) (io.Reader, error) {
quiet, _, _ := res.Request().Option("quiet").Bool()

out, err := unwrapOutput(res.Output())
Expand All @@ -485,22 +501,25 @@ var verifyPinCmd = &cmds.Command{
}

type RefKeyObject struct {
Cid string
Type string
}

type RefObject struct {
Type string
}

type RefKeyList struct {
Keys map[string]RefKeyObject
Keys map[string]RefObject
}

func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsNode) (map[string]RefKeyObject, error) {
func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error {

mode, ok := pin.StringToMode(typeStr)
if !ok {
return nil, fmt.Errorf("invalid pin mode '%s'", typeStr)
return fmt.Errorf("invalid pin mode '%s'", typeStr)
}

keys := make(map[string]RefKeyObject)

r := &resolver.Resolver{
DAG: n.DAG,
ResolveOnce: uio.ResolveUnixfsOnce,
Expand All @@ -509,44 +528,49 @@ func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsN
for _, p := range args {
pth, err := path.ParsePath(p)
if err != nil {
return nil, err
return err
}

c, err := core.ResolveToCid(ctx, n.Namesys, r, pth)
if err != nil {
return nil, err
return err
}

pinType, pinned, err := n.Pinning.IsPinnedWithType(c, mode)
if err != nil {
return nil, err
return err
}

if !pinned {
return nil, fmt.Errorf("path '%s' is not pinned", p)
return fmt.Errorf("path '%s' is not pinned", p)
}

switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
}
keys[c.String()] = RefKeyObject{

emit(&RefKeyObject{
Type: pinType,
}
Cid: c.String(),
})
}

return keys, nil
return nil
}

func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[string]RefKeyObject, error) {
func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode, emit func(value interface{}) error) error {

keys := make(map[string]RefKeyObject)
keys := cid.NewSet()

AddToResultKeys := func(keyList []*cid.Cid, typeStr string) {
for _, c := range keyList {
keys[c.String()] = RefKeyObject{
Type: typeStr,
if keys.Visit(c) {
emit(&RefKeyObject{
Type: typeStr,
Cid: c.String(),
})
}
}
}
Expand All @@ -555,20 +579,27 @@ func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[string
AddToResultKeys(n.Pinning.DirectKeys(), "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, set.Visit)
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, func(c *cid.Cid) bool {
r := keys.Visit(c)
if r {
emit(&RefKeyObject{
Type: "indirect",
Cid: c.String(),
})
}
return r
})
if err != nil {
return nil, err
return err
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
}

return keys, nil
return nil
}

// PinVerifyRes is the result returned for each pin checked in "pin verify"
Expand Down
2 changes: 1 addition & 1 deletion core/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ var rootSubcommands = map[string]*cmds.Command{
"mount": lgc.NewCommand(MountCmd),
"name": lgc.NewCommand(NameCmd),
"object": ocmd.ObjectCmd,
"pin": lgc.NewCommand(PinCmd),
"pin": PinCmd,
"ping": lgc.NewCommand(PingCmd),
"p2p": lgc.NewCommand(P2PCmd),
"refs": lgc.NewCommand(RefsCmd),
Expand Down