Skip to content

Commit

Permalink
Merge pull request #1138 from jonboulle/1138_timeout
Browse files Browse the repository at this point in the history
etcdserver: handle watch timeouts and streaming
  • Loading branch information
jonboulle committed Sep 24, 2014
2 parents 7aaaf49 + a9caa24 commit ec1df42
Show file tree
Hide file tree
Showing 5 changed files with 355 additions and 146 deletions.
96 changes: 65 additions & 31 deletions etcdserver/etcdhttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strings"
"time"

"github.com/coreos/etcd/elog"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
Expand All @@ -26,7 +25,11 @@ const (
machinesPrefix = "/v2/machines"
raftPrefix = "/raft"

DefaultTimeout = 500 * time.Millisecond
// time to wait for response from EtcdServer requests
defaultServerTimeout = 500 * time.Millisecond

// time to wait for a Watch request
defaultWatchTimeout = 5 * time.Minute
)

var errClosed = errors.New("etcdhttp: client closed connection")
Expand All @@ -39,7 +42,7 @@ func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Durati
timeout: timeout,
}
if sh.timeout == 0 {
sh.timeout = DefaultTimeout
sh.timeout = defaultServerTimeout
}
mux := http.NewServeMux()
mux.HandleFunc(keysPrefix, sh.serveKeys)
Expand Down Expand Up @@ -89,23 +92,18 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
return
}

var ev *store.Event
switch {
case resp.Event != nil:
ev = resp.Event
case resp.Watcher != nil:
if ev, err = waitForEvent(ctx, w, resp.Watcher); err != nil {
http.Error(w, err.Error(), http.StatusGatewayTimeout)
return
if err := writeEvent(w, resp.Event); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
}
case resp.Watcher != nil:
ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
defer cancel()
handleWatch(ctx, w, resp.Watcher, rr.Stream)
default:
writeError(w, errors.New("received response with no Event/Watcher!"))
return
}

if err = writeEvent(w, ev); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
}
}

Expand Down Expand Up @@ -187,7 +185,7 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
)
}

var rec, sort, wait bool
var rec, sort, wait, stream bool
if rec, err = getBool(r.Form, "recursive"); err != nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
Expand All @@ -206,6 +204,19 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
`invalid value for "wait"`,
)
}
if stream, err = getBool(r.Form, "stream"); err != nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`invalid value for "stream"`,
)
}

if wait && r.Method != "GET" {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`"wait" can only be used with GET requests`,
)
}

// prevExist is nullable, so leave it null if not specified
var pe *bool
Expand All @@ -231,6 +242,7 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
Recursive: rec,
Since: wIdx,
Sorted: sort,
Stream: stream,
Wait: wait,
}

Expand Down Expand Up @@ -285,8 +297,9 @@ func writeError(w http.ResponseWriter, err error) {
}
}

// writeEvent serializes the given Event and writes the resulting JSON to the
// given ResponseWriter
// writeEvent serializes a single Event and writes the resulting
// JSON to the given ResponseWriter, along with the appropriate
// headers
func writeEvent(w http.ResponseWriter, ev *store.Event) error {
if ev == nil {
return errors.New("cannot write empty Event!")
Expand All @@ -301,24 +314,45 @@ func writeEvent(w http.ResponseWriter, ev *store.Event) error {
return json.NewEncoder(w).Encode(ev)
}

// waitForEvent waits for a given Watcher to return its associated
// event. It returns a non-nil error if the given Context times out
// or the given ResponseWriter triggers a CloseNotify.
func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher) (*store.Event, error) {
// TODO(bmizerany): support streaming?
func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool) {
defer wa.Remove()
ech := wa.EventChan()
var nch <-chan bool
if x, ok := w.(http.CloseNotifier); ok {
nch = x.CloseNotify()
}
select {
case ev := <-wa.EventChan():
return ev, nil
case <-nch:
elog.TODO()
return nil, errClosed
case <-ctx.Done():
return nil, ctx.Err()

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

// Ensure headers are flushed early, in case of long polling
w.(http.Flusher).Flush()

for {
select {
case <-nch:
// Client closed connection. Nothing to do.
return
case <-ctx.Done():
// Timed out. net/http will close the connection for us, so nothing to do.
return
case ev, ok := <-ech:
if !ok {
// If the channel is closed this may be an indication of
// that notifications are much more than we are able to
// send to the client in time. Then we simply end streaming.
return
}
if err := json.NewEncoder(w).Encode(ev); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
return
}
if !stream {
return
}
w.(http.Flusher).Flush()
}
}
}

Expand Down
Loading

0 comments on commit ec1df42

Please sign in to comment.