Skip to content

Commit

Permalink
etcdserver: correct timeout and streaming handling
Browse files Browse the repository at this point in the history
This reintroduces the 'stream' parameter to support long-lived watch
sessions. These sessions respect a server timeout (set to 5 minutes by
default).
  • Loading branch information
jonboulle committed Sep 23, 2014
1 parent b754406 commit 561e797
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 164 deletions.
100 changes: 69 additions & 31 deletions etcdserver/etcdhttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"io/ioutil"
"log"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"time"

"github.com/coreos/etcd/elog"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
Expand All @@ -26,7 +26,11 @@ const (
machinesPrefix = "/v2/machines"
raftPrefix = "/raft"

DefaultTimeout = 500 * time.Millisecond
// time to wait for response from EtcdServer requests
defaultServerTimeout = 500 * time.Millisecond

// time to wait for a Watch request
defaultWatchTimeout = 5 * time.Minute
)

var errClosed = errors.New("etcdhttp: client closed connection")
Expand All @@ -39,7 +43,7 @@ func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Durati
timeout: timeout,
}
if sh.timeout == 0 {
sh.timeout = DefaultTimeout
sh.timeout = defaultServerTimeout
}
mux := http.NewServeMux()
mux.HandleFunc(keysPrefix, sh.serveKeys)
Expand Down Expand Up @@ -89,23 +93,16 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
return
}

var ev *store.Event
switch {
case resp.Event != nil:
ev = resp.Event
case resp.Watcher != nil:
if ev, err = waitForEvent(ctx, w, resp.Watcher); err != nil {
http.Error(w, err.Error(), http.StatusGatewayTimeout)
return
if err := writeEvent(w, resp.Event); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
}
case resp.Watcher != nil:
handleWatch(w, resp.Watcher, rr.Stream)
default:
writeError(w, errors.New("received response with no Event/Watcher!"))
return
}

if err = writeEvent(w, ev); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
}
}

Expand Down Expand Up @@ -187,7 +184,7 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
)
}

var rec, sort, wait bool
var rec, sort, wait, stream bool
if rec, err = getBool(r.Form, "recursive"); err != nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
Expand All @@ -206,6 +203,19 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
`invalid value for "wait"`,
)
}
if stream, err = getBool(r.Form, "stream"); err != nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`invalid value for "stream"`,
)
}

if wait && r.Method != "GET" {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`"wait" can only be used with GET requests`,
)
}

// prevExist is nullable, so leave it null if not specified
var pe *bool
Expand All @@ -231,6 +241,7 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
Recursive: rec,
Since: wIdx,
Sorted: sort,
Stream: stream,
Wait: wait,
}

Expand Down Expand Up @@ -285,8 +296,9 @@ func writeError(w http.ResponseWriter, err error) {
}
}

// writeEvent serializes the given Event and writes the resulting JSON to the
// given ResponseWriter
// writeEvent serializes a single Event and writes the resulting
// JSON to the given ResponseWriter, along with the appropriate
// headers
func writeEvent(w http.ResponseWriter, ev *store.Event) error {
if ev == nil {
return errors.New("cannot write empty Event!")
Expand All @@ -301,25 +313,51 @@ func writeEvent(w http.ResponseWriter, ev *store.Event) error {
return json.NewEncoder(w).Encode(ev)
}

// waitForEvent waits for a given Watcher to return its associated
// event. It returns a non-nil error if the given Context times out
// or the given ResponseWriter triggers a CloseNotify.
func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher) (*store.Event, error) {
// TODO(bmizerany): support streaming?
func handleWatch(w http.ResponseWriter, wa store.Watcher, stream bool) {
defer wa.Remove()
ech := wa.EventChan()
tch := time.After(defaultWatchTimeout)
var nch <-chan bool
if x, ok := w.(http.CloseNotifier); ok {
nch = x.CloseNotify()
}
select {
case ev := <-wa.EventChan():
return ev, nil
case <-nch:
elog.TODO()
return nil, errClosed
case <-ctx.Done():
return nil, ctx.Err()

w.Header().Set("Content-Type", "application/json")
// WriteHeader will implicitly write a Transfer-Encoding: chunked header, so no need to do it explicitly
w.WriteHeader(http.StatusOK)

// Ensure headers are flushed early, in case of long polling
w.(http.Flusher).Flush()

cw := httputil.NewChunkedWriter(w)

for {
select {
case <-nch:
// Client closed connection. Nothing to do.
return
case <-tch:
cw.Close()
return
case ev, ok := <-ech:
if !ok {
// If the channel is closed this may be an indication of
// that notifications are much more than we are able to
// send to the client in time. Then we simply end streaming.
return
}
if err := json.NewEncoder(cw).Encode(ev); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
return
}
if !stream {
return
}
w.(http.Flusher).Flush()
}
}

}

// allowMethod verifies that the given method is one of the allowed methods,
Expand Down
137 changes: 23 additions & 114 deletions etcdserver/etcdhttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"path"
"reflect"
"strings"
"sync"
"testing"
"time"

Expand All @@ -36,8 +36,12 @@ func mustNewURL(t *testing.T, s string) *url.URL {
// mustNewRequest takes a path, appends it to the standard keysPrefix, and constructs
// a GET *http.Request referencing the resulting URL
func mustNewRequest(t *testing.T, p string) *http.Request {
return mustNewMethodRequest(t, "GET", p)
}

func mustNewMethodRequest(t *testing.T, m, p string) *http.Request {
return &http.Request{
Method: "GET",
Method: m,
URL: mustNewURL(t, path.Join(keysPrefix, p)),
}
}
Expand Down Expand Up @@ -99,7 +103,7 @@ func TestBadParseRequest(t *testing.T) {
mustNewForm(t, "foo", url.Values{"ttl": []string{"-1"}}),
etcdErr.EcodeTTLNaN,
},
// bad values for recursive, sorted, wait, prevExist
// bad values for recursive, sorted, wait, prevExist, stream
{
mustNewForm(t, "foo", url.Values{"recursive": []string{"hahaha"}}),
etcdErr.EcodeInvalidField,
Expand Down Expand Up @@ -136,6 +140,19 @@ func TestBadParseRequest(t *testing.T) {
mustNewForm(t, "foo", url.Values{"prevExist": []string{"#2"}}),
etcdErr.EcodeInvalidField,
},
{
mustNewForm(t, "foo", url.Values{"stream": []string{"zzz"}}),
etcdErr.EcodeInvalidField,
},
{
mustNewForm(t, "foo", url.Values{"stream": []string{"something"}}),
etcdErr.EcodeInvalidField,
},
// wait is only valid with GET requests
{
mustNewMethodRequest(t, "HEAD", "foo?wait=true"),
etcdErr.EcodeInvalidField,
},
// query values are considered
{
mustNewRequest(t, "foo?prevExist=wrong"),
Expand Down Expand Up @@ -256,14 +273,10 @@ func TestGoodParseRequest(t *testing.T) {
},
{
// wait specified
mustNewForm(
t,
"foo",
url.Values{"wait": []string{"true"}},
),
mustNewRequest(t, "foo?wait=true"),
etcdserverpb.Request{
Id: 1234,
Method: "PUT",
Method: "GET",
Wait: true,
Path: "/foo",
},
Expand Down Expand Up @@ -492,100 +505,6 @@ func (w *dummyWatcher) EventChan() chan *store.Event {
}
func (w *dummyWatcher) Remove() {}

type dummyResponseWriter struct {
cnchan chan bool
http.ResponseWriter
}

func (rw *dummyResponseWriter) CloseNotify() <-chan bool {
return rw.cnchan
}

func TestWaitForEventChan(t *testing.T) {
ctx := context.Background()
ec := make(chan *store.Event)
dw := &dummyWatcher{
echan: ec,
}
w := httptest.NewRecorder()
var wg sync.WaitGroup
var ev *store.Event
var err error
wg.Add(1)
go func() {
ev, err = waitForEvent(ctx, w, dw)
wg.Done()
}()
ec <- &store.Event{
Action: store.Get,
Node: &store.NodeExtern{
Key: "/foo/bar",
ModifiedIndex: 12345,
},
}
wg.Wait()
want := &store.Event{
Action: store.Get,
Node: &store.NodeExtern{
Key: "/foo/bar",
ModifiedIndex: 12345,
},
}
if !reflect.DeepEqual(ev, want) {
t.Fatalf("bad event: got %#v, want %#v", ev, want)
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
}

func TestWaitForEventCloseNotify(t *testing.T) {
ctx := context.Background()
dw := &dummyWatcher{}
cnchan := make(chan bool)
w := &dummyResponseWriter{
cnchan: cnchan,
}
var wg sync.WaitGroup
var ev *store.Event
var err error
wg.Add(1)
go func() {
ev, err = waitForEvent(ctx, w, dw)
wg.Done()
}()
close(cnchan)
wg.Wait()
if ev != nil {
t.Fatalf("non-nil Event returned with CloseNotifier: %v", ev)
}
if err == nil {
t.Fatalf("nil err returned with CloseNotifier!")
}
}

func TestWaitForEventCancelledContext(t *testing.T) {
cctx, cancel := context.WithCancel(context.Background())
dw := &dummyWatcher{}
w := httptest.NewRecorder()
var wg sync.WaitGroup
var ev *store.Event
var err error
wg.Add(1)
go func() {
ev, err = waitForEvent(cctx, w, dw)
wg.Done()
}()
cancel()
wg.Wait()
if ev != nil {
t.Fatalf("non-nil Event returned with cancelled context: %v", ev)
}
if err == nil {
t.Fatalf("nil err returned with cancelled context!")
}
}

func TestV2MachinesEndpoint(t *testing.T) {
tests := []struct {
method string
Expand Down Expand Up @@ -950,17 +869,6 @@ func TestBadServeKeys(t *testing.T) {

http.StatusInternalServerError,
},
{
// timeout waiting for event (watcher never returns)
mustNewRequest(t, "foo"),
&resServer{
etcdserver.Response{
Watcher: &dummyWatcher{},
},
},

http.StatusGatewayTimeout,
},
{
// non-event/watcher response from etcdserver.Server
mustNewRequest(t, "foo"),
Expand Down Expand Up @@ -1056,6 +964,7 @@ func TestServeKeysWatch(t *testing.T) {
Node: &store.NodeExtern{},
},
)
wbody = fmt.Sprintf("%x\r\n%s\r\n", len(wbody), wbody)

if rw.Code != wcode {
t.Errorf("got code=%d, want %d", rw.Code, wcode)
Expand Down
Loading

0 comments on commit 561e797

Please sign in to comment.