diff --git a/core/commands/filestore.go b/core/commands/filestore.go index 9bff637cf60..70c02a3a41e 100644 --- a/core/commands/filestore.go +++ b/core/commands/filestore.go @@ -1,16 +1,14 @@ package commands import ( - "context" "fmt" "io" + "os" - oldCmds "github.com/ipfs/go-ipfs/commands" - lgc "github.com/ipfs/go-ipfs/commands/legacy" - "github.com/ipfs/go-ipfs/core" + core "github.com/ipfs/go-ipfs/core" cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" e "github.com/ipfs/go-ipfs/core/commands/e" - "github.com/ipfs/go-ipfs/filestore" + filestore "github.com/ipfs/go-ipfs/filestore" cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds" @@ -23,8 +21,8 @@ var FileStoreCmd = &cmds.Command{ }, Subcommands: map[string]*cmds.Command{ "ls": lsFileStore, - "verify": lgc.NewCommand(verifyFileStore), - "dups": lgc.NewCommand(dupsFileStore), + "verify": verifyFileStore, + "dups": dupsFileStore, }, } @@ -59,11 +57,7 @@ The output is: } args := req.Arguments if len(args) > 0 { - out := perKeyActionToChan(req.Context, args, func(c cid.Cid) *filestore.ListRes { - return filestore.List(fs, c) - }) - - return res.Emit(out) + return listByArgs(res, fs, args) } fileOrder, _ := req.Options[fileOrderOptionName].(bool) @@ -72,8 +66,17 @@ The output is: return err } - out := listResToChan(req.Context, next) - return res.Emit(out) + for { + r := next() + if r == nil { + break + } + if err := res.Emit(r); err != nil { + return err + } + } + + return nil }, PostRun: cmds.PostRunMap{ cmds.CLI: streamResult(func(v interface{}, out io.Writer) nonFatalError { @@ -88,7 +91,7 @@ The output is: Type: filestore.ListRes{}, } -var verifyFileStore = &oldCmds.Command{ +var verifyFileStore = &cmds.Command{ Helptext: cmdkit.HelpText{ Tagline: "Verify objects in filestore.", LongDescription: ` @@ -118,96 +121,103 @@ For ERROR entries the error will also be printed to stderr. Options: []cmdkit.Option{ cmdkit.BoolOption(fileOrderOptionName, "verify the objects based on the order of the backing file"), }, - Run: func(req oldCmds.Request, res oldCmds.Response) { - _, fs, err := getFilestore(req.InvocContext()) + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + _, fs, err := getFilestore(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - args := req.Arguments() + args := req.Arguments if len(args) > 0 { - out := perKeyActionToChan(req.Context(), args, func(c cid.Cid) *filestore.ListRes { - return filestore.Verify(fs, c) - }) - res.SetOutput(out) - } else { - fileOrder, _, _ := req.Option(fileOrderOptionName).Bool() - next, err := filestore.VerifyAll(fs, fileOrder) - if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return listByArgs(res, fs, args) + } + + fileOrder, _ := req.Options[fileOrderOptionName].(bool) + next, err := filestore.VerifyAll(fs, fileOrder) + if err != nil { + return err + } + + for { + r := next() + if r == nil { + break + } + if err := res.Emit(r); err != nil { + return err } - out := listResToChan(req.Context(), next) - res.SetOutput(out) } + + return nil }, - Marshalers: oldCmds.MarshalerMap{ - oldCmds.Text: func(res oldCmds.Response) (io.Reader, error) { - v, err := unwrapOutput(res.Output()) - if err != nil { - return nil, err - } + PostRun: cmds.PostRunMap{ + cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error { + for { + v, err := res.Next() + if err != nil { + if err == io.EOF { + return nil + } + return err + } - r, ok := v.(*filestore.ListRes) - if !ok { - return nil, e.TypeErr(r, v) - } + list, ok := v.(*filestore.ListRes) + if !ok { + return e.TypeErr(list, v) + } - if r.Status == filestore.StatusOtherError { - fmt.Fprintf(res.Stderr(), "%s\n", r.ErrorMsg) + if list.Status == filestore.StatusOtherError { + fmt.Fprintf(os.Stderr, "%s\n", list.ErrorMsg) + } + fmt.Fprintf(os.Stdout, "%s %s\n", list.Status.Format(), list.FormatLong()) } - fmt.Fprintf(res.Stdout(), "%s %s\n", r.Status.Format(), r.FormatLong()) - return nil, nil }, }, Type: filestore.ListRes{}, } -var dupsFileStore = &oldCmds.Command{ +var dupsFileStore = &cmds.Command{ Helptext: cmdkit.HelpText{ Tagline: "List blocks that are both in the filestore and standard block storage.", }, - Run: func(req oldCmds.Request, res oldCmds.Response) { - _, fs, err := getFilestore(req.InvocContext()) + Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { + _, fs, err := getFilestore(env) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - ch, err := fs.FileManager().AllKeysChan(req.Context()) + ch, err := fs.FileManager().AllKeysChan(req.Context) if err != nil { - res.SetError(err, cmdkit.ErrNormal) - return + return err } - out := make(chan interface{}, 128) - res.SetOutput((<-chan interface{})(out)) - - go func() { - defer close(out) - for cid := range ch { - have, err := fs.MainBlockstore().Has(cid) - if err != nil { - select { - case out <- &RefWrapper{Err: err.Error()}: - case <-req.Context().Done(): - } - return - } - if have { - select { - case out <- &RefWrapper{Ref: cid.String()}: - case <-req.Context().Done(): - return - } + for cid := range ch { + have, err := fs.MainBlockstore().Has(cid) + if err != nil { + return res.Emit(&RefWrapper{Err: err.Error()}) + } + if have { + if err := res.Emit(&RefWrapper{Ref: cid.String()}); err != nil { + return err } } - }() + } + + return nil }, - Marshalers: refsMarshallerMap, - Type: RefWrapper{}, + Encoders: cmds.EncoderMap{ + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefWrapper) error { + if out.Err != "" { + return fmt.Errorf(out.Err) + } + + fmt.Fprintln(w, out.Ref) + + return nil + }), + }, + Type: RefWrapper{}, } -func getFilestore(env interface{}) (*core.IpfsNode, *filestore.Filestore, error) { +func getFilestore(env cmds.Environment) (*core.IpfsNode, *filestore.Filestore, error) { n, err := cmdenv.GetNode(env) if err != nil { return nil, nil, err @@ -219,49 +229,24 @@ func getFilestore(env interface{}) (*core.IpfsNode, *filestore.Filestore, error) return n, fs, err } -func listResToChan(ctx context.Context, next func() *filestore.ListRes) <-chan interface{} { - out := make(chan interface{}, 128) - go func() { - defer close(out) - for { - r := next() - if r == nil { - return +func listByArgs(res cmds.ResponseEmitter, fs *filestore.Filestore, args []string) error { + for _, arg := range args { + c, err := cid.Decode(arg) + if err != nil { + ret := &filestore.ListRes{ + Status: filestore.StatusOtherError, + ErrorMsg: fmt.Sprintf("%s: %v", arg, err), } - select { - case out <- r: - case <-ctx.Done(): - return + if err := res.Emit(ret); err != nil { + return err } + continue } - }() - return out -} - -func perKeyActionToChan(ctx context.Context, args []string, action func(cid.Cid) *filestore.ListRes) <-chan interface{} { - out := make(chan interface{}, 128) - go func() { - defer close(out) - for _, arg := range args { - c, err := cid.Decode(arg) - if err != nil { - select { - case out <- &filestore.ListRes{ - Status: filestore.StatusOtherError, - ErrorMsg: fmt.Sprintf("%s: %v", arg, err), - }: - case <-ctx.Done(): - } - - continue - } - r := action(c) - select { - case out <- r: - case <-ctx.Done(): - return - } + r := filestore.Verify(fs, c) + if err := res.Emit(r); err != nil { + return err } - }() - return out + } + + return nil }