Skip to content

Commit

Permalink
client/rpc: bring up to speed with streaming pins
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Jun 2, 2023
1 parent 165473e commit de63278
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 15 deletions.
36 changes: 32 additions & 4 deletions client/rpc/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"context"
"encoding/json"
"io"
"strings"

iface "github.com/ipfs/boxo/coreiface"
Expand Down Expand Up @@ -129,16 +130,21 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt
type pinVerifyRes struct {
ok bool
badNodes []iface.BadPinNode
err error
}

func (r *pinVerifyRes) Ok() bool {
func (r pinVerifyRes) Ok() bool {
return r.ok
}

func (r *pinVerifyRes) BadNodes() []iface.BadPinNode {
func (r pinVerifyRes) BadNodes() []iface.BadPinNode {
return r.badNodes
}

func (r pinVerifyRes) Err() error {
return r.err
}

type badNode struct {
err error
cid cid.Cid
Expand Down Expand Up @@ -169,6 +175,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan iface.PinStatus, error) {
for {
var out struct {
Cid string
Err string
Ok bool

BadNodes []struct {
Expand All @@ -177,7 +184,28 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan iface.PinStatus, error) {
}
}
if err := dec.Decode(&out); err != nil {
return // todo: handle non io.EOF somehow
if err == io.EOF {
return
}
select {
case res <- pinVerifyRes{
err: err,
}:
return
case <-ctx.Done():
return
}
}

if out.Err != "" {
select {
case res <- pinVerifyRes{
err: errors.New(out.Err),
}:
return
case <-ctx.Done():
return
}
}

badNodes := make([]iface.BadPinNode, len(out.BadNodes))
Expand All @@ -201,7 +229,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan iface.PinStatus, error) {
}

select {
case res <- &pinVerifyRes{
case res <- pinVerifyRes{
ok: out.Ok,

badNodes: badNodes,
Expand Down
30 changes: 19 additions & 11 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,13 +648,14 @@ var verifyPinCmd = &cmds.Command{

// PinVerifyRes is the result returned for each pin checked in "pin verify"
type PinVerifyRes struct {
Cid string
Cid string `json:",omitempty"`
Err string `json:",omitempty"`
PinStatus
}

// PinStatus is part of PinVerifyRes, do not use directly
type PinStatus struct {
Ok bool
Ok bool `json:",omitempty"`
BadNodes []BadNode `json:",omitempty"`
}

Expand All @@ -669,7 +670,8 @@ type pinVerifyOpts struct {
includeOk bool
}

func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan interface{}, error) {
// FIXME: this implementation is duplicated sith core/coreapi.PinAPI.Verify, remove this one and exclusively rely on CoreAPI.
func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan any, error) {
visited := make(map[cid.Cid]PinStatus)

bs := n.Blocks.Blockstore()
Expand Down Expand Up @@ -715,18 +717,18 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci
return status
}

out := make(chan interface{})
out := make(chan any)
go func() {
defer close(out)
for p := range n.Pinning.RecursiveKeys(ctx) {
if p.Err != nil {
out <- p.Err
out <- PinVerifyRes{Err: p.Err.Error()}
return
}
pinStatus := checkPin(p.C)
if !pinStatus.Ok || opts.includeOk {
select {
case out <- &PinVerifyRes{enc.Encode(p.C), pinStatus}:
case out <- PinVerifyRes{Cid: enc.Encode(p.C), PinStatus: pinStatus}:
case <-ctx.Done():
return
}
Expand All @@ -739,12 +741,18 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci

// Format formats PinVerifyRes
func (r PinVerifyRes) Format(out io.Writer) {
if r.Err != "" {
fmt.Fprintf(out, "error: %s\n", r.Err)
return
}

if r.Ok {
fmt.Fprintf(out, "%s ok\n", r.Cid)
} else {
fmt.Fprintf(out, "%s broken\n", r.Cid)
for _, e := range r.BadNodes {
fmt.Fprintf(out, " %s: %s\n", e.Cid, e.Err)
}
return
}

fmt.Fprintf(out, "%s broken\n", r.Cid)
for _, e := range r.BadNodes {
fmt.Fprintf(out, " %s: %s\n", e.Cid, e.Err)
}
}

0 comments on commit de63278

Please sign in to comment.