Skip to content

Commit

Permalink
Merge pull request #499 from jbenet/command-channels
Browse files Browse the repository at this point in the history
Command channel output
  • Loading branch information
jbenet committed Jan 7, 2015
2 parents 0395a7a + 2816ed0 commit 9bd2f42
Show file tree
Hide file tree
Showing 27 changed files with 380 additions and 185 deletions.
34 changes: 34 additions & 0 deletions commands/channelmarshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package commands

import "io"

type ChannelMarshaler struct {
Channel <-chan interface{}
Marshaler func(interface{}) (io.Reader, error)

reader io.Reader
}

func (cr *ChannelMarshaler) Read(p []byte) (int, error) {
if cr.reader == nil {
val, more := <-cr.Channel
if !more {
return 0, io.EOF
}

r, err := cr.Marshaler(val)
if err != nil {
return 0, err
}
cr.reader = r
}

n, err := cr.reader.Read(p)
if err != nil && err != io.EOF {
return n, err
}
if n == 0 {
cr.reader = nil
}
return n, nil
}
31 changes: 17 additions & 14 deletions commands/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commands
import (
"errors"
"fmt"
"io"
"reflect"
"strings"

Expand All @@ -15,9 +16,9 @@ var log = u.Logger("command")
// It reads from the Request, and writes results to the Response.
type Function func(Request) (interface{}, error)

// Marshaler is a function that takes in a Response, and returns a marshalled []byte
// Marshaler is a function that takes in a Response, and returns an io.Reader
// (or an error on failure)
type Marshaler func(Response) ([]byte, error)
type Marshaler func(Response) (io.Reader, error)

// MarshalerMap is a map of Marshaler functions, keyed by EncodingType
// (or an error on failure)
Expand Down Expand Up @@ -113,25 +114,27 @@ func (c *Command) Call(req Request) Response {
return res
}

isChan := false
actualType := reflect.TypeOf(output)
if actualType != nil {
if actualType.Kind() == reflect.Ptr {
actualType = actualType.Elem()
}

// test if output is a channel
isChan = actualType.Kind() == reflect.Chan
}

// If the command specified an output type, ensure the actual value returned is of that type
if cmd.Type != nil {
definedType := reflect.ValueOf(cmd.Type).Type()
actualType := reflect.ValueOf(output).Type()
if cmd.Type != nil && !isChan {
expectedType := reflect.TypeOf(cmd.Type)

if definedType != actualType {
if actualType != expectedType {
res.SetError(ErrIncorrectType, ErrNormal)
return res
}
}

// clean up the request (close the readers, e.g. fileargs)
// NOTE: this means commands can't expect to keep reading after cmd.Run returns (in a goroutine)
err = req.Cleanup()
if err != nil {
res.SetError(err, ErrNormal)
return res
}

res.SetOutput(output)
return res
}
Expand Down
35 changes: 33 additions & 2 deletions commands/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net/http"
"net/url"
"reflect"
"strings"

cmds "github.com/jbenet/go-ipfs/commands"
Expand Down Expand Up @@ -42,6 +43,9 @@ func (c *client) Send(req cmds.Request) (cmds.Response, error) {
// override with json to send to server
req.SetOption(cmds.EncShort, cmds.JSON)

// stream channel output
req.SetOption(cmds.ChanOpt, "true")

query, err := getQuery(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -137,8 +141,34 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
contentType = strings.Split(contentType, ";")[0]

if len(httpRes.Header.Get(streamHeader)) > 0 {
// if output is a stream, we can just use the body reader
res.SetOutput(httpRes.Body)
return res, nil

} else if len(httpRes.Header.Get(channelHeader)) > 0 {
// if output is coming from a channel, decode each chunk
outChan := make(chan interface{})
go func() {
dec := json.NewDecoder(httpRes.Body)
outputType := reflect.TypeOf(req.Command().Type)

for {
v := reflect.New(outputType).Interface()
err := dec.Decode(v)
if err != nil && err != io.EOF {
fmt.Println(err.Error())
return
}
if err == io.EOF {
close(outChan)
return
}
outChan <- v
}
}()

res.SetOutput(outChan)
return res, nil
}

dec := json.NewDecoder(httpRes.Body)
Expand Down Expand Up @@ -169,8 +199,9 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
res.SetError(e, e.Code)

} else {
v := req.Command().Type
err = dec.Decode(&v)
outputType := reflect.TypeOf(req.Command().Type)
v := reflect.New(outputType).Interface()
err = dec.Decode(v)
if err != nil && err != io.EOF {
return nil, err
}
Expand Down
70 changes: 68 additions & 2 deletions commands/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"errors"
"fmt"
"io"
"net/http"

Expand All @@ -20,8 +21,11 @@ type Handler struct {
var ErrNotFound = errors.New("404 page not found")

const (
streamHeader = "X-Stream-Output"
contentTypeHeader = "Content-Type"
streamHeader = "X-Stream-Output"
channelHeader = "X-Chunked-Output"
contentTypeHeader = "Content-Type"
contentLengthHeader = "Content-Length"
transferEncodingHeader = "Transfer-Encoding"
)

var mimeTypes = map[string]string{
Expand Down Expand Up @@ -97,5 +101,67 @@ func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

// if output is a channel and user requested streaming channels,
// use chunk copier for the output
_, isChan := res.Output().(chan interface{})
streamChans, _, _ := req.Option("stream-channels").Bool()
if isChan && streamChans {
err = copyChunks(w, out)
if err != nil {
log.Error(err)
}
return
}

io.Copy(w, out)
}

// Copies from an io.Reader to a http.ResponseWriter.
// Flushes chunks over HTTP stream as they are read (if supported by transport).
func copyChunks(w http.ResponseWriter, out io.Reader) error {
hijacker, ok := w.(http.Hijacker)
if !ok {
return errors.New("Could not create hijacker")
}
conn, writer, err := hijacker.Hijack()
if err != nil {
return err
}
defer conn.Close()

writer.WriteString("HTTP/1.1 200 OK\r\n")
writer.WriteString(contentTypeHeader + ": application/json\r\n")
writer.WriteString(transferEncodingHeader + ": chunked\r\n")
writer.WriteString(channelHeader + ": 1\r\n\r\n")

buf := make([]byte, 32*1024)

for {
n, err := out.Read(buf)

if n > 0 {
length := fmt.Sprintf("%x\r\n", n)
writer.WriteString(length)

_, err := writer.Write(buf[0:n])
if err != nil {
return err
}

writer.WriteString("\r\n")
writer.Flush()
}

if err != nil && err != io.EOF {
return err
}
if err == io.EOF {
break
}
}

writer.WriteString("0\r\n\r\n")
writer.Flush()

return nil
}
3 changes: 3 additions & 0 deletions commands/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,18 @@ const (
EncLong = "encoding"
RecShort = "r"
RecLong = "recursive"
ChanOpt = "stream-channels"
)

// options that are used by this package
var OptionEncodingType = StringOption(EncShort, EncLong, "The encoding type the output should be encoded with (json, xml, or text)")
var OptionRecursivePath = BoolOption(RecShort, RecLong, "Add directory paths recursively")
var OptionStreamChannels = BoolOption(ChanOpt, "Stream channel output")

// global options, added to every command
var globalOptions = []Option{
OptionEncodingType,
OptionStreamChannels,
}

// the above array of Options, wrapped in a Command
Expand Down
6 changes: 0 additions & 6 deletions commands/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type Request interface {
Context() *Context
SetContext(Context)
Command() *Command
Cleanup() error

ConvertOptions() error
}
Expand Down Expand Up @@ -174,11 +173,6 @@ func (r *request) Command() *Command {
return r.cmd
}

func (r *request) Cleanup() error {
// TODO
return nil
}

type converter func(string) (interface{}, error)

var converters = map[reflect.Kind]converter{
Expand Down
Loading

0 comments on commit 9bd2f42

Please sign in to comment.