Skip to content

Commit

Permalink
pinning: fix pin listings
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Jun 2, 2023
1 parent 9b63ab6 commit a2c66ab
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 40 deletions.
23 changes: 13 additions & 10 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,17 @@ Example:
}

// For backward compatibility, we accumulate the pins in the same output type as before.
emit := res.Emit
var emit func(PinLsOutputWrapper) error
lgcList := map[string]PinLsType{}
if !stream {
emit = func(v interface{}) error {
obj := v.(*PinLsOutputWrapper)
lgcList[obj.PinLsObject.Cid] = PinLsType{Type: obj.PinLsObject.Type}
emit = func(v PinLsOutputWrapper) error {
lgcList[v.PinLsObject.Cid] = PinLsType{Type: v.PinLsObject.Type}
return nil
}
} else {
emit = func(v PinLsOutputWrapper) error {
return res.Emit(v)
}
}

if len(req.Arguments) > 0 {
Expand All @@ -371,7 +374,7 @@ Example:
},
Type: &PinLsOutputWrapper{},
Encoders: cmds.EncoderMap{
cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out PinLsOutputWrapper) error {
stream, _ := req.Options[pinStreamOptionName].(bool)

enc := json.NewEncoder(w)
Expand All @@ -382,7 +385,7 @@ Example:

return enc.Encode(out.PinLsList)
}),
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out PinLsOutputWrapper) error {
quiet, _ := req.Options[pinQuietOptionName].(bool)
stream, _ := req.Options[pinStreamOptionName].(bool)

Expand Down Expand Up @@ -432,7 +435,7 @@ type PinLsObject struct {
Type string `json:",omitempty"`
}

func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error {
func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value PinLsOutputWrapper) error) error {
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
Expand Down Expand Up @@ -470,7 +473,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fu
pinType = "indirect through " + pinType
}

err = emit(&PinLsOutputWrapper{
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: pinType,
Cid: enc.Encode(rp.Cid()),
Expand All @@ -484,7 +487,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fu
return nil
}

func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error {
func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value PinLsOutputWrapper) error) error {
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
Expand All @@ -511,7 +514,7 @@ func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fun
if err := p.Err(); err != nil {
return err
}
err = emit(&PinLsOutputWrapper{
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: p.Type(),
Cid: enc.Encode(p.Path().Cid()),
Expand Down
48 changes: 18 additions & 30 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac
out <- &pinInfo{err: err}
return
}
rkeys = append(rkeys, streamedCid.C)
}
}
if typeStr == "direct" || typeStr == "all" {
Expand All @@ -319,27 +320,6 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac
}
}
}
if typeStr == "all" {
walkingSet := cid.NewSet()
for _, k := range rkeys {
err = merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
walkingSet.Visit,
merkledag.SkipRoot(), merkledag.Concurrent(),
)
if err != nil {
out <- &pinInfo{err: err}
return
}
}
err = walkingSet.ForEach(func(c cid.Cid) error {
return AddToResultKeys(c, "indirect")
})
if err != nil {
out <- &pinInfo{err: err}
return
}
}
if typeStr == "indirect" {
// We need to first visit the direct pins that have priority
// without emitting them
Expand All @@ -358,27 +338,35 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac
return
}
emittedSet.Add(streamedCid.C)
rkeys = append(rkeys, streamedCid.C)
}

}
if typeStr == "indirect" || typeStr == "all" {
walkingSet := cid.NewSet()
for _, k := range rkeys {
err = merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
walkingSet.Visit,
func(c cid.Cid) bool {
if !walkingSet.Visit(c) {
return false
}
if emittedSet.Has(c) {
return true // skipped
}
err := AddToResultKeys(c, "indirect")
if err != nil {
out <- &pinInfo{err: err}
return false
}
return true
},
merkledag.SkipRoot(), merkledag.Concurrent(),
)
if err != nil {
out <- &pinInfo{err: err}
return
}
}
err = emittedSet.ForEach(func(c cid.Cid) error {
return AddToResultKeys(c, "indirect")
})
if err != nil {
out <- &pinInfo{err: err}
return
}
}
}()

Expand Down

0 comments on commit a2c66ab

Please sign in to comment.