Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command channel output #499

Merged
merged 16 commits into from
Jan 7, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions commands/channelmarshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package commands

import "io"

type ChannelMarshaler struct {
Channel <-chan interface{}
Marshaler func(interface{}) (io.Reader, error)

reader io.Reader
}

func (cr *ChannelMarshaler) Read(p []byte) (int, error) {
if cr.reader == nil {
val, more := <-cr.Channel
if !more {
return 0, io.EOF
}

r, err := cr.Marshaler(val)
if err != nil {
return 0, err
}
cr.reader = r
}

n, err := cr.reader.Read(p)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.ReadFull just in case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure? It looks like that would error if len(p) is greater than the length of one of the readers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll stop on io.EOF and return io.ErrUnexpectedEOF (which we can ignore). The thing i'm worried about is that Read call not reading all it's supposed to. there's no guarantee the network won't just give it a few bytes and return.

Looking closer at this function, looks like if there are any bytes read, we keep the reader, so we just read again. if so then that may be fine (i thought we lost the reader right away)

if err != nil && err != io.EOF {
return n, err
}
if n == 0 {
cr.reader = nil
}
return n, nil
}
31 changes: 17 additions & 14 deletions commands/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commands
import (
"errors"
"fmt"
"io"
"reflect"
"strings"

Expand All @@ -15,9 +16,9 @@ var log = u.Logger("command")
// It reads from the Request, and writes results to the Response.
type Function func(Request) (interface{}, error)

// Marshaler is a function that takes in a Response, and returns a marshalled []byte
// Marshaler is a function that takes in a Response, and returns an io.Reader
// (or an error on failure)
type Marshaler func(Response) ([]byte, error)
type Marshaler func(Response) (io.Reader, error)

// MarshalerMap is a map of Marshaler functions, keyed by EncodingType
// (or an error on failure)
Expand Down Expand Up @@ -113,25 +114,27 @@ func (c *Command) Call(req Request) Response {
return res
}

isChan := false
actualType := reflect.TypeOf(output)
if actualType != nil {
if actualType.Kind() == reflect.Ptr {
actualType = actualType.Elem()
}

// test if output is a channel
isChan = actualType.Kind() == reflect.Chan
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool idea, fancy!


// If the command specified an output type, ensure the actual value returned is of that type
if cmd.Type != nil {
definedType := reflect.ValueOf(cmd.Type).Type()
actualType := reflect.ValueOf(output).Type()
if cmd.Type != nil && !isChan {
expectedType := reflect.TypeOf(cmd.Type)

if definedType != actualType {
if actualType != expectedType {
res.SetError(ErrIncorrectType, ErrNormal)
return res
}
}

// clean up the request (close the readers, e.g. fileargs)
// NOTE: this means commands can't expect to keep reading after cmd.Run returns (in a goroutine)
err = req.Cleanup()
if err != nil {
res.SetError(err, ErrNormal)
return res
}

res.SetOutput(output)
return res
}
Expand Down
35 changes: 33 additions & 2 deletions commands/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net/http"
"net/url"
"reflect"
"strings"

cmds "github.com/jbenet/go-ipfs/commands"
Expand Down Expand Up @@ -42,6 +43,9 @@ func (c *client) Send(req cmds.Request) (cmds.Response, error) {
// override with json to send to server
req.SetOption(cmds.EncShort, cmds.JSON)

// stream channel output
req.SetOption(cmds.ChanOpt, "true")

query, err := getQuery(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -137,8 +141,34 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
contentType = strings.Split(contentType, ";")[0]

if len(httpRes.Header.Get(streamHeader)) > 0 {
// if output is a stream, we can just use the body reader
res.SetOutput(httpRes.Body)
return res, nil

} else if len(httpRes.Header.Get(channelHeader)) > 0 {
// if output is coming from a channel, decode each chunk
outChan := make(chan interface{})
go func() {
dec := json.NewDecoder(httpRes.Body)
outputType := reflect.TypeOf(req.Command().Type)

for {
v := reflect.New(outputType).Interface()
err := dec.Decode(v)
if err != nil && err != io.EOF {
fmt.Println(err.Error())
return
}
if err == io.EOF {
close(outChan)
return
}
outChan <- v
}
}()

res.SetOutput(outChan)
return res, nil
}

dec := json.NewDecoder(httpRes.Body)
Expand Down Expand Up @@ -169,8 +199,9 @@ func getResponse(httpRes *http.Response, req cmds.Request) (cmds.Response, error
res.SetError(e, e.Code)

} else {
v := req.Command().Type
err = dec.Decode(&v)
outputType := reflect.TypeOf(req.Command().Type)
v := reflect.New(outputType).Interface()
err = dec.Decode(v)
if err != nil && err != io.EOF {
return nil, err
}
Expand Down
70 changes: 68 additions & 2 deletions commands/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"errors"
"fmt"
"io"
"net/http"

Expand All @@ -20,8 +21,11 @@ type Handler struct {
var ErrNotFound = errors.New("404 page not found")

const (
streamHeader = "X-Stream-Output"
contentTypeHeader = "Content-Type"
streamHeader = "X-Stream-Output"
channelHeader = "X-Chunked-Output"
contentTypeHeader = "Content-Type"
contentLengthHeader = "Content-Length"
transferEncodingHeader = "Transfer-Encoding"
)

var mimeTypes = map[string]string{
Expand Down Expand Up @@ -97,5 +101,67 @@ func (i Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

// if output is a channel and user requested streaming channels,
// use chunk copier for the output
_, isChan := res.Output().(chan interface{})
streamChans, _, _ := req.Option("stream-channels").Bool()
if isChan && streamChans {
err = copyChunks(w, out)
if err != nil {
log.Error(err)
}
return
}

io.Copy(w, out)
}

// Copies from an io.Reader to a http.ResponseWriter.
// Flushes chunks over HTTP stream as they are read (if supported by transport).
func copyChunks(w http.ResponseWriter, out io.Reader) error {
hijacker, ok := w.(http.Hijacker)
if !ok {
return errors.New("Could not create hijacker")
}
conn, writer, err := hijacker.Hijack()
if err != nil {
return err
}
defer conn.Close()

writer.WriteString("HTTP/1.1 200 OK\r\n")
writer.WriteString(contentTypeHeader + ": application/json\r\n")
writer.WriteString(transferEncodingHeader + ": chunked\r\n")
writer.WriteString(channelHeader + ": 1\r\n\r\n")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice :) love how simple http is
didn't know about \r\n. guess that makes sense for the time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was probably chosen just so Tim Berners-Lee could test out his server via telnet :P

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. he didn't have protobufs. :]


buf := make([]byte, 32*1024)

for {
n, err := out.Read(buf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think so, but just to check: so you mean to just read whatever n chars? or for it to be atomic object-by-object (io.ReadFull then)? (dont think so, but if so, your buffer may run out of space)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, it can be any n chars. (But the current reader implementation will typically end up being one chunk per object).


if n > 0 {
length := fmt.Sprintf("%x\r\n", n)
writer.WriteString(length)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just http-specific framing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to do this as a hack around the stdlib's http.ResponseWriter :( The built-in HTTP code doesn't let us start writing the response while the client is still writing the request (which is necessary for add, where the client is writing files as we stream the output).

So to get around it we have to hijack the underlying TCP connection for the request and handle the HTTP stuff manually.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The built-in HTTP code doesn't let us start writing the response while the client is still writing the request

Sad. Yeah makes sense. I was just wondering whether the length there was some chunk framing part of http, or framing on our end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's HTTP chunked encoding, there's a hex length before each chunk.


_, err := writer.Write(buf[0:n])
if err != nil {
return err
}

writer.WriteString("\r\n")
writer.Flush()
}

if err != nil && err != io.EOF {
return err
}
if err == io.EOF {
break
}
}

writer.WriteString("0\r\n\r\n")
writer.Flush()

return nil
}
3 changes: 3 additions & 0 deletions commands/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,18 @@ const (
EncLong = "encoding"
RecShort = "r"
RecLong = "recursive"
ChanOpt = "stream-channels"
)

// options that are used by this package
var OptionEncodingType = StringOption(EncShort, EncLong, "The encoding type the output should be encoded with (json, xml, or text)")
var OptionRecursivePath = BoolOption(RecShort, RecLong, "Add directory paths recursively")
var OptionStreamChannels = BoolOption(ChanOpt, "Stream channel output")

// global options, added to every command
var globalOptions = []Option{
OptionEncodingType,
OptionStreamChannels,
}

// the above array of Options, wrapped in a Command
Expand Down
6 changes: 0 additions & 6 deletions commands/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type Request interface {
Context() *Context
SetContext(Context)
Command() *Command
Cleanup() error

ConvertOptions() error
}
Expand Down Expand Up @@ -174,11 +173,6 @@ func (r *request) Command() *Command {
return r.cmd
}

func (r *request) Cleanup() error {
// TODO
return nil
}

type converter func(string) (interface{}, error)

var converters = map[reflect.Kind]converter{
Expand Down
Loading