Skip to content

Commit

Permalink
etcdhttp: time out watch requests after 5 minutes
Browse files Browse the repository at this point in the history
  • Loading branch information
jonboulle committed Sep 23, 2014
1 parent b754406 commit e2e05b7
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 21 deletions.
113 changes: 93 additions & 20 deletions etcdserver/etcdhttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"log"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
Expand All @@ -26,7 +27,11 @@ const (
machinesPrefix = "/v2/machines"
raftPrefix = "/raft"

DefaultTimeout = 500 * time.Millisecond
// time to wait for response from EtcdServer requests
defaultServerTimeout = 500 * time.Millisecond

// time to wait for a Watch request
defaultWatchTimeout = 5 * time.Minute
)

var errClosed = errors.New("etcdhttp: client closed connection")
Expand All @@ -39,7 +44,7 @@ func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Durati
timeout: timeout,
}
if sh.timeout == 0 {
sh.timeout = DefaultTimeout
sh.timeout = defaultServerTimeout
}
mux := http.NewServeMux()
mux.HandleFunc(keysPrefix, sh.serveKeys)
Expand Down Expand Up @@ -77,7 +82,7 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()

rr, err := parseRequest(r, etcdserver.GenID())
rr, stream, err := parseRequest(r, etcdserver.GenID())
if err != nil {
writeError(w, err)
return
Expand All @@ -89,23 +94,16 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
return
}

var ev *store.Event
switch {
case resp.Event != nil:
ev = resp.Event
case resp.Watcher != nil:
if ev, err = waitForEvent(ctx, w, resp.Watcher); err != nil {
http.Error(w, err.Error(), http.StatusGatewayTimeout)
return
if err := writeEvent(w, resp.Event); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
}
case resp.Watcher != nil:
handleWatch(w, resp.Watcher, rr.Stream)
default:
writeError(w, errors.New("received response with no Event/Watcher!"))
return
}

if err = writeEvent(w, ev); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
}
}

Expand Down Expand Up @@ -187,7 +185,7 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
)
}

var rec, sort, wait bool
var rec, sort, wait, stream bool
if rec, err = getBool(r.Form, "recursive"); err != nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
Expand All @@ -206,6 +204,19 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
`invalid value for "wait"`,
)
}
if stream, err = getBool(r.Form, "stream"); err != nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`invalid value for "stream"`,
)
}

if wait && r.Method != "GET" {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`"wait" can only be used with GET requests`,
)
}

// prevExist is nullable, so leave it null if not specified
var pe *bool
Expand All @@ -231,6 +242,7 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
Recursive: rec,
Since: wIdx,
Sorted: sort,
Stream: stream,
Wait: wait,
}

Expand Down Expand Up @@ -285,8 +297,9 @@ func writeError(w http.ResponseWriter, err error) {
}
}

// writeEvent serializes the given Event and writes the resulting JSON to the
// given ResponseWriter
// writeEvent serializes a single Event and writes the resulting
// JSON to the given ResponseWriter, along with the appropriate
// headers
func writeEvent(w http.ResponseWriter, ev *store.Event) error {
if ev == nil {
return errors.New("cannot write empty Event!")
Expand All @@ -305,7 +318,6 @@ func writeEvent(w http.ResponseWriter, ev *store.Event) error {
// event. It returns a non-nil error if the given Context times out
// or the given ResponseWriter triggers a CloseNotify.
func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher) (*store.Event, error) {
// TODO(bmizerany): support streaming?
defer wa.Remove()
var nch <-chan bool
if x, ok := w.(http.CloseNotifier); ok {
Expand All @@ -318,8 +330,69 @@ func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher)
elog.TODO()
return nil, errClosed
case <-ctx.Done():
return nil, ctx.Err()
return nil, errors.New("timed out waiting for watch")
}
}

// handleWatch
func handleWatch(w http.ResponseWriter, wa store.Watcher, stream bool) {
defer wa.Remove()
ech := wa.EventChan()
tch := time.After(defaultWatchTimeout)
var nch <-chan bool
if x, ok := w.(http.CloseNotifier); ok {
nch = x.CloseNotify()
}

w.Header().Set("Content-Type", "application/json")
// WriteHeader will implicitly write a Transfer-Encoding: chunked header, so no need to do it explicitly
w.WriteHeader(http.StatusOK)

// Ensure headers are flushed early, in case of long polling
w.(http.Flusher).Flush()

cw := httputil.NewChunkedWriter(w)

if !stream {
select {
case <-nch:
// Client closed connection. Nothing to do.
case <-tch:
cw.Close()
case ev := <-ech:
if err := json.NewEncoder(cw).Encode(ev); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
}
}
return
}

// streaming loop
for {
select {
case <-nch:
// Client closed connection. Nothing to do.
return
case <-tch:
cw.Close()
return
case ev, ok := <-ech:
if !ok {
// If the channel is closed this may be an indication of
// that notifications are much more than we are able to
// send to the client in time. Then we simply end streaming.
return
}
if err := json.NewEncoder(cw).Encode(ev); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
return
}
w.(http.Flusher).Flush()
}
}

}

// allowMethod verifies that the given method is one of the allowed methods,
Expand Down
1 change: 1 addition & 0 deletions etcdserver/etcdserverpb/etcdserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ message Request {
required bool sorted = 13 [(gogoproto.nullable) = false];
required bool quorum = 14 [(gogoproto.nullable) = false];
required int64 time = 15 [(gogoproto.nullable) = false];
required bool stream = 16 [(gogoproto.nullable) = false];
}
2 changes: 1 addition & 1 deletion etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
case "GET":
switch {
case r.Wait:
wc, err := s.Store.Watch(r.Path, r.Recursive, false, r.Since)
wc, err := s.Store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
if err != nil {
return Response{}, err
}
Expand Down

0 comments on commit e2e05b7

Please sign in to comment.