Skip to content

Commit

Permalink
Merge pull request #9859 from MichaelMure/streamed-pins2
Browse files Browse the repository at this point in the history
pin: follow async pinner changes
  • Loading branch information
Jorropo committed Jun 2, 2023
2 parents eb265f7 + a2c66ab commit 8f0359a
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 153 deletions.
50 changes: 32 additions & 18 deletions client/rpc/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"context"
"encoding/json"
"io"
"strings"

iface "github.com/ipfs/boxo/coreiface"
Expand Down Expand Up @@ -129,26 +130,31 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt
type pinVerifyRes struct {
ok bool
badNodes []iface.BadPinNode
err error
}

func (r *pinVerifyRes) Ok() bool {
func (r pinVerifyRes) Ok() bool {
return r.ok
}

func (r *pinVerifyRes) BadNodes() []iface.BadPinNode {
func (r pinVerifyRes) BadNodes() []iface.BadPinNode {
return r.badNodes
}

func (r pinVerifyRes) Err() error {
return r.err
}

type badNode struct {
err error
cid cid.Cid
}

func (n *badNode) Path() path.Resolved {
func (n badNode) Path() path.Resolved {
return path.IpldPath(n.cid)
}

func (n *badNode) Err() error {
func (n badNode) Err() error {
return n.err
}

Expand All @@ -169,6 +175,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan iface.PinStatus, error) {
for {
var out struct {
Cid string
Err string
Ok bool

BadNodes []struct {
Expand All @@ -177,35 +184,42 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan iface.PinStatus, error) {
}
}
if err := dec.Decode(&out); err != nil {
return // todo: handle non io.EOF somehow
if err == io.EOF {
return
}
select {
case res <- pinVerifyRes{err: err}:
return
case <-ctx.Done():
return
}
}

if out.Err != "" {
select {
case res <- pinVerifyRes{err: errors.New(out.Err)}:
return
case <-ctx.Done():
return
}
}

badNodes := make([]iface.BadPinNode, len(out.BadNodes))
for i, n := range out.BadNodes {
c, err := cid.Decode(n.Cid)
if err != nil {
badNodes[i] = &badNode{
cid: c,
err: err,
}
badNodes[i] = badNode{cid: c, err: err}
continue
}

if n.Err != "" {
err = errors.New(n.Err)
}
badNodes[i] = &badNode{
cid: c,
err: err,
}
badNodes[i] = badNode{cid: c, err: err}
}

select {
case res <- &pinVerifyRes{
ok: out.Ok,

badNodes: badNodes,
}:
case res <- pinVerifyRes{ok: out.Ok, badNodes: badNodes}:
case <-ctx.Done():
return
}
Expand Down
63 changes: 37 additions & 26 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,17 @@ Example:
}

// For backward compatibility, we accumulate the pins in the same output type as before.
emit := res.Emit
var emit func(PinLsOutputWrapper) error
lgcList := map[string]PinLsType{}
if !stream {
emit = func(v interface{}) error {
obj := v.(*PinLsOutputWrapper)
lgcList[obj.PinLsObject.Cid] = PinLsType{Type: obj.PinLsObject.Type}
emit = func(v PinLsOutputWrapper) error {
lgcList[v.PinLsObject.Cid] = PinLsType{Type: v.PinLsObject.Type}
return nil
}
} else {
emit = func(v PinLsOutputWrapper) error {
return res.Emit(v)
}
}

if len(req.Arguments) > 0 {
Expand All @@ -371,7 +374,7 @@ Example:
},
Type: &PinLsOutputWrapper{},
Encoders: cmds.EncoderMap{
cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out PinLsOutputWrapper) error {
stream, _ := req.Options[pinStreamOptionName].(bool)

enc := json.NewEncoder(w)
Expand All @@ -382,7 +385,7 @@ Example:

return enc.Encode(out.PinLsList)
}),
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error {
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out PinLsOutputWrapper) error {
quiet, _ := req.Options[pinQuietOptionName].(bool)
stream, _ := req.Options[pinStreamOptionName].(bool)

Expand Down Expand Up @@ -432,7 +435,7 @@ type PinLsObject struct {
Type string `json:",omitempty"`
}

func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error {
func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value PinLsOutputWrapper) error) error {
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
Expand Down Expand Up @@ -470,7 +473,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fu
pinType = "indirect through " + pinType
}

err = emit(&PinLsOutputWrapper{
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: pinType,
Cid: enc.Encode(rp.Cid()),
Expand All @@ -484,7 +487,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fu
return nil
}

func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error {
func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value PinLsOutputWrapper) error) error {
enc, err := cmdenv.GetCidEncoder(req)
if err != nil {
return err
Expand All @@ -511,7 +514,7 @@ func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fun
if err := p.Err(); err != nil {
return err
}
err = emit(&PinLsOutputWrapper{
err = emit(PinLsOutputWrapper{
PinLsObject: PinLsObject{
Type: p.Type(),
Cid: enc.Encode(p.Path().Cid()),
Expand Down Expand Up @@ -648,13 +651,14 @@ var verifyPinCmd = &cmds.Command{

// PinVerifyRes is the result returned for each pin checked in "pin verify"
type PinVerifyRes struct {
Cid string
Cid string `json:",omitempty"`
Err string `json:",omitempty"`
PinStatus
}

// PinStatus is part of PinVerifyRes, do not use directly
type PinStatus struct {
Ok bool
Ok bool `json:",omitempty"`
BadNodes []BadNode `json:",omitempty"`
}

Expand All @@ -669,16 +673,13 @@ type pinVerifyOpts struct {
includeOk bool
}

func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan interface{}, error) {
// FIXME: this implementation is duplicated sith core/coreapi.PinAPI.Verify, remove this one and exclusively rely on CoreAPI.
func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan any, error) {
visited := make(map[cid.Cid]PinStatus)

bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
recPins, err := n.Pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}

var checkPin func(root cid.Cid) PinStatus
checkPin = func(root cid.Cid) PinStatus {
Expand Down Expand Up @@ -719,14 +720,18 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci
return status
}

out := make(chan interface{})
out := make(chan any)
go func() {
defer close(out)
for _, cid := range recPins {
pinStatus := checkPin(cid)
for p := range n.Pinning.RecursiveKeys(ctx) {
if p.Err != nil {
out <- PinVerifyRes{Err: p.Err.Error()}
return
}
pinStatus := checkPin(p.C)
if !pinStatus.Ok || opts.includeOk {
select {
case out <- &PinVerifyRes{enc.Encode(cid), pinStatus}:
case out <- PinVerifyRes{Cid: enc.Encode(p.C), PinStatus: pinStatus}:
case <-ctx.Done():
return
}
Expand All @@ -739,12 +744,18 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci

// Format formats PinVerifyRes
func (r PinVerifyRes) Format(out io.Writer) {
if r.Err != "" {
fmt.Fprintf(out, "error: %s\n", r.Err)
return
}

if r.Ok {
fmt.Fprintf(out, "%s ok\n", r.Cid)
} else {
fmt.Fprintf(out, "%s broken\n", r.Cid)
for _, e := range r.BadNodes {
fmt.Fprintf(out, " %s: %s\n", e.Cid, e.Err)
}
return
}

fmt.Fprintf(out, "%s broken\n", r.Cid)
for _, e := range r.BadNodes {
fmt.Fprintf(out, " %s: %s\n", e.Cid, e.Err)
}
}
Loading

0 comments on commit 8f0359a

Please sign in to comment.