From a2c66abc52864bcd7bbb099f5959ec66557c73cd Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 2 Jun 2023 18:16:46 +0200 Subject: [PATCH] pinning: fix pin listings --- core/commands/pin/pin.go | 23 ++++++++++--------- core/coreapi/pin.go | 48 +++++++++++++++------------------------- 2 files changed, 31 insertions(+), 40 deletions(-) diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go index 626c536dbd2..8312c1fb4ce 100644 --- a/core/commands/pin/pin.go +++ b/core/commands/pin/pin.go @@ -342,14 +342,17 @@ Example: } // For backward compatibility, we accumulate the pins in the same output type as before. - emit := res.Emit + var emit func(PinLsOutputWrapper) error lgcList := map[string]PinLsType{} if !stream { - emit = func(v interface{}) error { - obj := v.(*PinLsOutputWrapper) - lgcList[obj.PinLsObject.Cid] = PinLsType{Type: obj.PinLsObject.Type} + emit = func(v PinLsOutputWrapper) error { + lgcList[v.PinLsObject.Cid] = PinLsType{Type: v.PinLsObject.Type} return nil } + } else { + emit = func(v PinLsOutputWrapper) error { + return res.Emit(v) + } } if len(req.Arguments) > 0 { @@ -371,7 +374,7 @@ Example: }, Type: &PinLsOutputWrapper{}, Encoders: cmds.EncoderMap{ - cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error { + cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out PinLsOutputWrapper) error { stream, _ := req.Options[pinStreamOptionName].(bool) enc := json.NewEncoder(w) @@ -382,7 +385,7 @@ Example: return enc.Encode(out.PinLsList) }), - cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error { + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out PinLsOutputWrapper) error { quiet, _ := req.Options[pinQuietOptionName].(bool) stream, _ := req.Options[pinStreamOptionName].(bool) @@ -432,7 +435,7 @@ type PinLsObject struct { Type string `json:",omitempty"` } -func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error { +func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value PinLsOutputWrapper) error) error { enc, err := cmdenv.GetCidEncoder(req) if err != nil { return err @@ -470,7 +473,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fu pinType = "indirect through " + pinType } - err = emit(&PinLsOutputWrapper{ + err = emit(PinLsOutputWrapper{ PinLsObject: PinLsObject{ Type: pinType, Cid: enc.Encode(rp.Cid()), @@ -484,7 +487,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fu return nil } -func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error { +func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value PinLsOutputWrapper) error) error { enc, err := cmdenv.GetCidEncoder(req) if err != nil { return err @@ -511,7 +514,7 @@ func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fun if err := p.Err(); err != nil { return err } - err = emit(&PinLsOutputWrapper{ + err = emit(PinLsOutputWrapper{ PinLsObject: PinLsObject{ Type: p.Type(), Cid: enc.Encode(p.Path().Cid()), diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 98be600b2e4..ec2cedb83fd 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -305,6 +305,7 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac out <- &pinInfo{err: err} return } + rkeys = append(rkeys, streamedCid.C) } } if typeStr == "direct" || typeStr == "all" { @@ -319,27 +320,6 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac } } } - if typeStr == "all" { - walkingSet := cid.NewSet() - for _, k := range rkeys { - err = merkledag.Walk( - ctx, merkledag.GetLinksWithDAG(api.dag), k, - walkingSet.Visit, - merkledag.SkipRoot(), merkledag.Concurrent(), - ) - if err != nil { - out <- &pinInfo{err: err} - return - } - } - err = walkingSet.ForEach(func(c cid.Cid) error { - return AddToResultKeys(c, "indirect") - }) - if err != nil { - out <- &pinInfo{err: err} - return - } - } if typeStr == "indirect" { // We need to first visit the direct pins that have priority // without emitting them @@ -358,13 +338,28 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac return } emittedSet.Add(streamedCid.C) + rkeys = append(rkeys, streamedCid.C) } - + } + if typeStr == "indirect" || typeStr == "all" { walkingSet := cid.NewSet() for _, k := range rkeys { err = merkledag.Walk( ctx, merkledag.GetLinksWithDAG(api.dag), k, - walkingSet.Visit, + func(c cid.Cid) bool { + if !walkingSet.Visit(c) { + return false + } + if emittedSet.Has(c) { + return true // skipped + } + err := AddToResultKeys(c, "indirect") + if err != nil { + out <- &pinInfo{err: err} + return false + } + return true + }, merkledag.SkipRoot(), merkledag.Concurrent(), ) if err != nil { @@ -372,13 +367,6 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac return } } - err = emittedSet.ForEach(func(c cid.Cid) error { - return AddToResultKeys(c, "indirect") - }) - if err != nil { - out <- &pinInfo{err: err} - return - } } }()