Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: handle watch timeouts and streaming #1138

Merged
merged 3 commits into from
Sep 24, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 65 additions & 31 deletions etcdserver/etcdhttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strings"
"time"

"github.com/coreos/etcd/elog"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
Expand All @@ -26,7 +25,11 @@ const (
machinesPrefix = "/v2/machines"
raftPrefix = "/raft"

DefaultTimeout = 500 * time.Millisecond
// time to wait for response from EtcdServer requests
defaultServerTimeout = 500 * time.Millisecond

// time to wait for a Watch request
defaultWatchTimeout = 5 * time.Minute
)

var errClosed = errors.New("etcdhttp: client closed connection")
Expand All @@ -39,7 +42,7 @@ func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Durati
timeout: timeout,
}
if sh.timeout == 0 {
sh.timeout = DefaultTimeout
sh.timeout = defaultServerTimeout
}
mux := http.NewServeMux()
mux.HandleFunc(keysPrefix, sh.serveKeys)
Expand Down Expand Up @@ -89,23 +92,18 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
return
}

var ev *store.Event
switch {
case resp.Event != nil:
ev = resp.Event
case resp.Watcher != nil:
if ev, err = waitForEvent(ctx, w, resp.Watcher); err != nil {
http.Error(w, err.Error(), http.StatusGatewayTimeout)
return
if err := writeEvent(w, resp.Event); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
}
case resp.Watcher != nil:
ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
defer cancel()
handleWatch(ctx, w, resp.Watcher, rr.Stream)
default:
writeError(w, errors.New("received response with no Event/Watcher!"))
return
}

if err = writeEvent(w, ev); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
}
}

Expand Down Expand Up @@ -187,7 +185,7 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
)
}

var rec, sort, wait bool
var rec, sort, wait, stream bool
if rec, err = getBool(r.Form, "recursive"); err != nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
Expand All @@ -206,6 +204,19 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
`invalid value for "wait"`,
)
}
if stream, err = getBool(r.Form, "stream"); err != nil {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`invalid value for "stream"`,
)
}

if wait && r.Method != "GET" {
return emptyReq, etcdErr.NewRequestError(
etcdErr.EcodeInvalidField,
`"wait" can only be used with GET requests`,
)
}

// prevExist is nullable, so leave it null if not specified
var pe *bool
Expand All @@ -231,6 +242,7 @@ func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
Recursive: rec,
Since: wIdx,
Sorted: sort,
Stream: stream,
Wait: wait,
}

Expand Down Expand Up @@ -285,8 +297,9 @@ func writeError(w http.ResponseWriter, err error) {
}
}

// writeEvent serializes the given Event and writes the resulting JSON to the
// given ResponseWriter
// writeEvent serializes a single Event and writes the resulting
// JSON to the given ResponseWriter, along with the appropriate
// headers
func writeEvent(w http.ResponseWriter, ev *store.Event) error {
if ev == nil {
return errors.New("cannot write empty Event!")
Expand All @@ -301,24 +314,45 @@ func writeEvent(w http.ResponseWriter, ev *store.Event) error {
return json.NewEncoder(w).Encode(ev)
}

// waitForEvent waits for a given Watcher to return its associated
// event. It returns a non-nil error if the given Context times out
// or the given ResponseWriter triggers a CloseNotify.
func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher) (*store.Event, error) {
// TODO(bmizerany): support streaming?
func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool) {
defer wa.Remove()
ech := wa.EventChan()
var nch <-chan bool
if x, ok := w.(http.CloseNotifier); ok {
nch = x.CloseNotify()
}
select {
case ev := <-wa.EventChan():
return ev, nil
case <-nch:
elog.TODO()
return nil, errClosed
case <-ctx.Done():
return nil, ctx.Err()

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)

// Ensure headers are flushed early, in case of long polling
w.(http.Flusher).Flush()

for {
select {
case <-nch:
// Client closed connection. Nothing to do.
return
case <-ctx.Done():
// Timed out. net/http will close the connection for us, so nothing to do.
return
case ev, ok := <-ech:
if !ok {
// If the channel is closed this may be an indication of
// that notifications are much more than we are able to
// send to the client in time. Then we simply end streaming.
return
}
if err := json.NewEncoder(w).Encode(ev); err != nil {
// Should never be reached
log.Println("error writing event: %v", err)
return
}
if !stream {
return
}
w.(http.Flusher).Flush()
}
}
}

Expand Down
Loading