Skip to content

Commit

Permalink
pin: follow async pinner changes, use directly CoreApi.Pin.Verify
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelMure committed Mar 3, 2023
1 parent fea25f7 commit 385e2aa
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 187 deletions.
132 changes: 39 additions & 93 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,13 @@ import (
"os"
"time"

bserv "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
cidenc "github.com/ipfs/go-cidutil/cidenc"
cmds "github.com/ipfs/go-ipfs-cmds"
offline "github.com/ipfs/go-ipfs-exchange-offline"
dag "github.com/ipfs/go-merkledag"
verifcid "github.com/ipfs/go-verifcid"
coreiface "github.com/ipfs/interface-go-ipfs-core"
options "github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/path"

core "github.com/ipfs/kubo/core"
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
e "github.com/ipfs/kubo/core/commands/e"
)
Expand Down Expand Up @@ -502,13 +497,14 @@ func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fun
panic("unhandled pin type")
}

pins, err := api.Pin().Ls(req.Context, opt)
if err != nil {
return err
}
pins := api.Pin().Ls(req.Context, opt)

for p := range pins {
if err := p.Err(); err != nil {
for {
p, err := pins.ReadContext(req.Context)
if err == io.EOF {
break
}
if err != nil {
return err
}
err = emit(&PinLsOutputWrapper{
Expand Down Expand Up @@ -603,7 +599,7 @@ var verifyPinCmd = &cmds.Command{
cmds.BoolOption(pinQuietOptionName, "q", "Write just hashes of broken pins."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
}
Expand All @@ -620,15 +616,38 @@ var verifyPinCmd = &cmds.Command{
return err
}

opts := pinVerifyOpts{
explain: !quiet,
includeOk: verbose,
}
out, err := pinVerify(req.Context, n, opts, enc)
if err != nil {
return err
out := api.Pin().Verify(req.Context, options.Pin.Verify.Explain(!quiet), options.Pin.Verify.IncludeOk(verbose))

for {
s, err := out.Read()
if err == io.EOF {
break
}
if err != nil {
return err
}
status := PinStatus{
Ok: s.Ok(),
}
badNodes := s.BadNodes()
if len(badNodes) > 0 {
status.BadNodes = make([]BadNode, len(badNodes))
for i, node := range s.BadNodes() {
status.BadNodes[i] = BadNode{
Cid: enc.Encode(node.Path().Cid()),
Err: node.Err().Error(),
}
}
}
err = res.Emit(PinVerifyRes{
Cid: enc.Encode(s.Cid()),
PinStatus: PinStatus{},
})
if err != nil {
return err
}
}
return res.Emit(out)
return nil
},
Type: PinVerifyRes{},
Encoders: cmds.EncoderMap{
Expand Down Expand Up @@ -664,79 +683,6 @@ type BadNode struct {
Err string
}

type pinVerifyOpts struct {
explain bool
includeOk bool
}

func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan interface{}, error) {
visited := make(map[cid.Cid]PinStatus)

bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
recPins, err := n.Pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}

var checkPin func(root cid.Cid) PinStatus
checkPin = func(root cid.Cid) PinStatus {
key := root
if status, ok := visited[key]; ok {
return status
}

if err := verifcid.ValidateCid(root); err != nil {
status := PinStatus{Ok: false}
if opts.explain {
status.BadNodes = []BadNode{{Cid: enc.Encode(key), Err: err.Error()}}
}
visited[key] = status
return status
}

links, err := getLinks(ctx, root)
if err != nil {
status := PinStatus{Ok: false}
if opts.explain {
status.BadNodes = []BadNode{{Cid: enc.Encode(key), Err: err.Error()}}
}
visited[key] = status
return status
}

status := PinStatus{Ok: true}
for _, lnk := range links {
res := checkPin(lnk.Cid)
if !res.Ok {
status.Ok = false
status.BadNodes = append(status.BadNodes, res.BadNodes...)
}
}

visited[key] = status
return status
}

out := make(chan interface{})
go func() {
defer close(out)
for _, cid := range recPins {
pinStatus := checkPin(cid)
if !pinStatus.Ok || opts.includeOk {
select {
case out <- &PinVerifyRes{enc.Encode(cid), pinStatus}:
case <-ctx.Done():
return
}
}
}
}()

return out, nil
}

// Format formats PinVerifyRes
func (r PinVerifyRes) Format(out io.Writer) {
if r.Ok {
Expand Down
Loading

0 comments on commit 385e2aa

Please sign in to comment.