Skip to content

Commit

Permalink
Update LS Command to support streaming
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: hannahhoward <hannah@hannahhoward.net>
  • Loading branch information
hannahhoward committed Oct 18, 2018
1 parent e2d2c55 commit 7dadaa5
Showing 1 changed file with 161 additions and 62 deletions.
223 changes: 161 additions & 62 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,34 @@ import (
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
)

// printable data for a single ipld link in ls output
// LsLink contains printable data for a single ipld link in ls output
type LsLink struct {
Name, Hash string
Size uint64
Type unixfspb.Data_DataType
}

// printable data for single unixfs directory, content hash + all links
// LsObject is an element of LsOutput
// It can represent a whole directory, a directory header, one or more links,
// Or a the end of a directory
type LsObject struct {
Hash string
Links []LsLink
Hash string
Links []LsLink
HasHeader bool
HasLinks bool
HasFooter bool
}

// printable data for multiple unixfs directories
// LsObject is a set of printable data for directories
type LsOutput struct {
Objects []LsObject
MultipleFolders bool
Objects []LsObject
}

const (
lsHeadersOptionNameTime = "headers"
lsResolveTypeOptionName = "resolve-type"
lsStreamOptionName = "stream"
)

var LsCmd = &cmds.Command{
Expand All @@ -63,6 +70,7 @@ The JSON output contains type information.
Options: []cmdkit.Option{
cmdkit.BoolOption(lsHeadersOptionNameTime, "v", "Print table headers (Hash, Size, Name)."),
cmdkit.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true),
cmdkit.BoolOption(lsStreamOptionName, "s", "Stream directory entries as they are found."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
Expand All @@ -75,7 +83,7 @@ The JSON output contains type information.
return err
}

resolve := req.Options[lsResolveTypeOptionName].(bool)
resolve, _ := req.Options[lsResolveTypeOptionName].(bool)
dserv := nd.DAG
if !resolve {
offlineexch := offline.Exchange(nd.Blockstore)
Expand Down Expand Up @@ -103,96 +111,187 @@ The JSON output contains type information.
}
dagnodes = append(dagnodes, dagnode)
}

output := make([]LsObject, len(req.Arguments))
ng := merkledag.NewSession(req.Context, nd.DAG)
ro := merkledag.NewReadOnlyDagService(ng)

stream, _ := req.Options[lsStreamOptionName].(bool)
multipleFolders := len(req.Arguments) > 1
if !stream {
output := make([]LsObject, len(req.Arguments))

for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir {
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
}

var links []*ipld.Link
if dir == nil {
links = dagnode.Links()
} else {
links, err = dir.Links(req.Context)
if err != nil {
return err
}
}
outputLinks := make([]LsLink, len(links))
for j, link := range links {
lsLink, err := makeLsLink(req, dserv, resolve, link)
if err != nil {
return err
}
outputLinks[j] = *lsLink
}
output[i] = newFullDirectoryLsObject(paths[i], outputLinks)
}

return cmds.EmitOnce(res, &LsOutput{multipleFolders, output})
}

for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir {
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
}

var links []*ipld.Link
var linkResults <-chan unixfs.LinkResult
if dir == nil {
links = dagnode.Links()
linkResults, err = makeDagNodeLinkResults(req, dagnode)
} else {
links, err = dir.Links(req.Context)
if err != nil {
return err
}
linkResults, err = dir.EnumLinksAsync(req.Context)
}

output[i] = LsObject{
Hash: paths[i],
Links: make([]LsLink, len(links)),
if err != nil {
return err
}

for j, link := range links {
t := unixfspb.Data_DataType(-1)

switch link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
t = unixfs.TFile
case cid.DagProtobuf:
linkNode, err := link.GetNode(req.Context, dserv)
if err == ipld.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
return err
}
output := make([]LsObject, 1)
outputLinks := make([]LsLink, 1)

if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := unixfs.FSNodeFromBytes(pn.Data())
if err != nil {
return err
}
t = d.Type()
}
output[0] = newDirectoryHeaderLsObject(paths[i])
if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil {
return nil
}
for linkResult := range linkResults {
if linkResult.Err != nil {
return linkResult.Err
}
link := linkResult.Link
lsLink, err := makeLsLink(req, dserv, resolve, link)
if err != nil {
return err
}
output[i].Links[j] = LsLink{
Name: link.Name,
Hash: link.Cid.String(),
Size: link.Size,
Type: t,
outputLinks[0] = *lsLink
output[0] = newDirectoryLinksLsObject(outputLinks)
if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil {
return err
}
}
output[0] = newDirectoryFooterLsObject()
if err = res.Emit(&LsOutput{multipleFolders, output}); err != nil {
return err
}
}

return cmds.EmitOnce(res, &LsOutput{output})
return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
headers := req.Options[lsHeadersOptionNameTime].(bool)
headers, _ := req.Options[lsHeadersOptionNameTime].(bool)
output, ok := v.(*LsOutput)
if !ok {
return e.TypeErr(output, v)
}

w = tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
for _, object := range output.Objects {
if len(output.Objects) > 1 {
fmt.Fprintf(w, "%s:\n", object.Hash)
}
if headers {
fmt.Fprintln(w, "Hash\tSize\tName")
if object.HasHeader {
if output.MultipleFolders {
fmt.Fprintf(tw, "%s:\n", object.Hash)
}
if headers {
fmt.Fprintln(tw, "Hash\tSize\tName")
}
}
for _, link := range object.Links {
if link.Type == unixfs.TDirectory {
link.Name += "/"
if object.HasLinks {
for _, link := range object.Links {
if link.Type == unixfs.TDirectory {
link.Name += "/"
}

fmt.Fprintf(tw, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
}
fmt.Fprintf(w, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
}
if len(output.Objects) > 1 {
fmt.Fprintln(w)
if object.HasFooter {
if output.MultipleFolders {
fmt.Fprintln(tw)
}
}
}

tw.Flush()
return nil
}),
},
Type: LsOutput{},
}

func makeDagNodeLinkResults(req *cmds.Request, dagnode ipld.Node) (<-chan unixfs.LinkResult, error) {
linkResults := make(chan unixfs.LinkResult)
go func() {
defer close(linkResults)
for _, l := range dagnode.Links() {
select {
case linkResults <- unixfs.LinkResult{
Link: l,
Err: nil,
}:
case <-req.Context.Done():
return
}
}
}()
return linkResults, nil
}

func newFullDirectoryLsObject(hash string, links []LsLink) LsObject {
return LsObject{hash, links, true, true, true}
}
func newDirectoryHeaderLsObject(hash string) LsObject {
return LsObject{hash, nil, true, false, false}
}
func newDirectoryLinksLsObject(links []LsLink) LsObject {
return LsObject{"", links, false, true, false}
}
func newDirectoryFooterLsObject() LsObject {
return LsObject{"", nil, false, false, true}
}

func makeLsLink(req *cmds.Request, dserv ipld.DAGService, resolve bool, link *ipld.Link) (*LsLink, error) {
t := unixfspb.Data_DataType(-1)

switch link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
t = unixfs.TFile
case cid.DagProtobuf:
linkNode, err := link.GetNode(req.Context, dserv)
if err == ipld.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
return nil, err
}

if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := unixfs.FSNodeFromBytes(pn.Data())
if err != nil {
return nil, err
}
t = d.Type()
}
}
return &LsLink{
Name: link.Name,
Hash: link.Cid.String(),
Size: link.Size,
Type: t,
}, nil
}

0 comments on commit 7dadaa5

Please sign in to comment.