From 61227d7477f684567d58baa43ca71cb0149bb8f0 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 11 Dec 2013 21:40:55 -0700 Subject: [PATCH 1/2] mod/leader --- mod/leader/v2/delete_handler.go | 49 ++++++++++++++++ mod/leader/v2/get_handler.go | 29 ++++++++++ mod/leader/v2/handler.go | 34 +++++++++++ mod/leader/v2/set_handler.go | 63 ++++++++++++++++++++ mod/leader/v2/tests/mod_leader_test.go | 80 ++++++++++++++++++++++++++ mod/lock/v2/acquire_handler.go | 18 ++++-- mod/lock/v2/lock_nodes.go | 8 +-- mod/lock/v2/release_handler.go | 2 +- mod/lock/v2/renew_handler.go | 2 +- mod/mod.go | 3 +- 10 files changed, 275 insertions(+), 13 deletions(-) create mode 100644 mod/leader/v2/delete_handler.go create mode 100644 mod/leader/v2/get_handler.go create mode 100644 mod/leader/v2/handler.go create mode 100644 mod/leader/v2/set_handler.go create mode 100644 mod/leader/v2/tests/mod_leader_test.go diff --git a/mod/leader/v2/delete_handler.go b/mod/leader/v2/delete_handler.go new file mode 100644 index 00000000000..266a9637937 --- /dev/null +++ b/mod/leader/v2/delete_handler.go @@ -0,0 +1,49 @@ +package v2 + +import ( + "fmt" + "io" + "net/http" + "net/url" + + "github.com/gorilla/mux" +) + +// deleteHandler remove a given leader leader. +func (h *handler) deleteHandler(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + name := req.FormValue("name") + if name == "" { + http.Error(w, "leader name required", http.StatusInternalServerError) + return + } + + // Proxy the request to the the lock service. + u, err := url.Parse(fmt.Sprintf("%s/mod/v2/lock/%s", h.addr, vars["key"])) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + q := u.Query() + q.Set("value", name) + u.RawQuery = q.Encode() + + r, err := http.NewRequest("DELETE", u.String(), nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Read from the leader lock. + resp, err := h.client.Do(r) + if err != nil { + http.Error(w, "delete leader http error: " + err.Error(), http.StatusInternalServerError) + return + } + defer resp.Body.Close() + w.WriteHeader(resp.StatusCode) + if resp.StatusCode != http.StatusOK { + w.Write([]byte("delete leader error: ")) + } + io.Copy(w, resp.Body) +} diff --git a/mod/leader/v2/get_handler.go b/mod/leader/v2/get_handler.go new file mode 100644 index 00000000000..7914eb65a09 --- /dev/null +++ b/mod/leader/v2/get_handler.go @@ -0,0 +1,29 @@ +package v2 + +import ( + "fmt" + "io" + "net/http" + + "github.com/gorilla/mux" +) + +// getHandler retrieves the current leader. +func (h *handler) getHandler(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + + // Proxy the request to the lock service. + url := fmt.Sprintf("%s/mod/v2/lock/%s?field=value", h.addr, vars["key"]) + resp, err := h.client.Get(url) + if err != nil { + http.Error(w, "read leader error: " + err.Error(), http.StatusInternalServerError) + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + w.Write([]byte("get leader error: ")) + } + w.WriteHeader(resp.StatusCode) + io.Copy(w, resp.Body) +} diff --git a/mod/leader/v2/handler.go b/mod/leader/v2/handler.go new file mode 100644 index 00000000000..3c88278fb5d --- /dev/null +++ b/mod/leader/v2/handler.go @@ -0,0 +1,34 @@ +package v2 + +import ( + "net/http" + + "github.com/gorilla/mux" +) + +// prefix is appended to the lock's prefix since the leader mod uses the lock mod. +const prefix = "/_mod/leader" + +// handler manages the leader HTTP request. +type handler struct { + *mux.Router + client *http.Client + transport *http.Transport + addr string +} + +// NewHandler creates an HTTP handler that can be registered on a router. +func NewHandler(addr string) (http.Handler) { + transport := &http.Transport{DisableKeepAlives: false} + h := &handler{ + Router: mux.NewRouter(), + client: &http.Client{Transport: transport}, + transport: transport, + addr: addr, + } + h.StrictSlash(false) + h.HandleFunc("/{key:.*}", h.getHandler).Methods("GET") + h.HandleFunc("/{key:.*}", h.setHandler).Methods("PUT") + h.HandleFunc("/{key:.*}", h.deleteHandler).Methods("DELETE") + return h +} diff --git a/mod/leader/v2/set_handler.go b/mod/leader/v2/set_handler.go new file mode 100644 index 00000000000..c517c7cc90d --- /dev/null +++ b/mod/leader/v2/set_handler.go @@ -0,0 +1,63 @@ +package v2 + +import ( + "fmt" + "io" + "net/http" + "net/url" + + "github.com/gorilla/mux" +) + +// setHandler attempts to set the current leader. +func (h *handler) setHandler(w http.ResponseWriter, req *http.Request) { + vars := mux.Vars(req) + name := req.FormValue("name") + if name == "" { + http.Error(w, "leader name required", http.StatusInternalServerError) + return + } + + // Proxy the request to the the lock service. + u, err := url.Parse(fmt.Sprintf("%s/mod/v2/lock/%s", h.addr, vars["key"])) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + q := u.Query() + q.Set("value", name) + q.Set("ttl", req.FormValue("ttl")) + q.Set("timeout", req.FormValue("timeout")) + u.RawQuery = q.Encode() + + r, err := http.NewRequest("POST", u.String(), nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Close request if this connection disconnects. + closeNotifier, _ := w.(http.CloseNotifier) + stopChan := make(chan bool) + defer close(stopChan) + go func() { + select { + case <-closeNotifier.CloseNotify(): + h.transport.CancelRequest(r) + case <-stopChan: + } + }() + + // Read from the leader lock. + resp, err := h.client.Do(r) + if err != nil { + http.Error(w, "set leader http error: " + err.Error(), http.StatusInternalServerError) + return + } + defer resp.Body.Close() + w.WriteHeader(resp.StatusCode) + if resp.StatusCode != http.StatusOK { + w.Write([]byte("set leader error: ")) + } + io.Copy(w, resp.Body) +} diff --git a/mod/leader/v2/tests/mod_leader_test.go b/mod/leader/v2/tests/mod_leader_test.go new file mode 100644 index 00000000000..71f3fc1ac0b --- /dev/null +++ b/mod/leader/v2/tests/mod_leader_test.go @@ -0,0 +1,80 @@ +package leader + +import ( + "fmt" + "testing" + "time" + + "github.com/coreos/etcd/server" + "github.com/coreos/etcd/tests" + "github.com/stretchr/testify/assert" +) + +// Ensure that a leader can be set and read. +func TestModLeaderSet(t *testing.T) { + tests.RunServer(func(s *server.Server) { + // Set leader. + body, err := testSetLeader(s, "foo", "xxx", 10) + assert.NoError(t, err) + assert.Equal(t, body, "2") + + // Check that the leader is set. + body, err = testGetLeader(s, "foo") + assert.NoError(t, err) + assert.Equal(t, body, "xxx") + + // Delete leader. + body, err = testDeleteLeader(s, "foo", "xxx") + assert.NoError(t, err) + assert.Equal(t, body, "") + + // Check that the leader is removed. + body, err = testGetLeader(s, "foo") + assert.NoError(t, err) + assert.Equal(t, body, "") + }) +} + +// Ensure that a leader can be renewed. +func TestModLeaderRenew(t *testing.T) { + tests.RunServer(func(s *server.Server) { + // Set leader. + body, err := testSetLeader(s, "foo", "xxx", 2) + assert.NoError(t, err) + assert.Equal(t, body, "2") + + time.Sleep(1 * time.Second) + + // Renew leader. + body, err = testSetLeader(s, "foo", "xxx", 3) + assert.NoError(t, err) + assert.Equal(t, body, "2") + + time.Sleep(2 * time.Second) + + // Check that the leader is set. + body, err = testGetLeader(s, "foo") + assert.NoError(t, err) + assert.Equal(t, body, "xxx") + }) +} + + + +func testSetLeader(s *server.Server, key string, name string, ttl int) (string, error) { + resp, err := tests.PutForm(fmt.Sprintf("%s/mod/v2/leader/%s?name=%s&ttl=%d", s.URL(), key, name, ttl), nil) + ret := tests.ReadBody(resp) + return string(ret), err +} + +func testGetLeader(s *server.Server, key string) (string, error) { + resp, err := tests.Get(fmt.Sprintf("%s/mod/v2/leader/%s", s.URL(), key)) + ret := tests.ReadBody(resp) + return string(ret), err +} + +func testDeleteLeader(s *server.Server, key string, name string) (string, error) { + resp, err := tests.DeleteForm(fmt.Sprintf("%s/mod/v2/leader/%s?name=%s", s.URL(), key, name), nil) + ret := tests.ReadBody(resp) + return string(ret), err +} diff --git a/mod/lock/v2/acquire_handler.go b/mod/lock/v2/acquire_handler.go index 6da62f6cc33..09b50633cb8 100644 --- a/mod/lock/v2/acquire_handler.go +++ b/mod/lock/v2/acquire_handler.go @@ -49,9 +49,15 @@ func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) { } // If node exists then just watch it. Otherwise create the node and watch it. - index := h.findExistingNode(keypath, value) + node, index, pos := h.findExistingNode(keypath, value) if index > 0 { - err = h.watch(keypath, index, nil) + if pos == 0 { + // If lock is already acquired then update the TTL. + h.client.Update(node.Key, node.Value, uint64(ttl)) + } else { + // Otherwise watch until it becomes acquired (or errors). + err = h.watch(keypath, index, nil) + } } else { index, err = h.createNode(keypath, value, ttl, closeChan, stopChan) } @@ -108,18 +114,18 @@ func (h *handler) createNode(keypath string, value string, ttl int, closeChan <- } // findExistingNode search for a node on the lock with the given value. -func (h *handler) findExistingNode(keypath string, value string) int { +func (h *handler) findExistingNode(keypath string, value string) (*etcd.Node, int, int) { if len(value) > 0 { resp, err := h.client.Get(keypath, true, true) if err == nil { nodes := lockNodes{resp.Node.Nodes} - if node := nodes.FindByValue(value); node != nil { + if node, pos := nodes.FindByValue(value); node != nil { index, _ := strconv.Atoi(path.Base(node.Key)) - return index + return node, index, pos } } } - return 0 + return nil, 0, 0 } // ttlKeepAlive continues to update a key's TTL until the stop channel is closed. diff --git a/mod/lock/v2/lock_nodes.go b/mod/lock/v2/lock_nodes.go index 92446ee3be6..c5ae9836f2b 100644 --- a/mod/lock/v2/lock_nodes.go +++ b/mod/lock/v2/lock_nodes.go @@ -30,15 +30,15 @@ func (s lockNodes) First() *etcd.Node { } // Retrieves the first node with a given value. -func (s lockNodes) FindByValue(value string) *etcd.Node { +func (s lockNodes) FindByValue(value string) (*etcd.Node, int) { sort.Sort(s) - for _, node := range s.Nodes { + for i, node := range s.Nodes { if node.Value == value { - return &node + return &node, i } } - return nil + return nil, 0 } // Retrieves the index that occurs before a given index. diff --git a/mod/lock/v2/release_handler.go b/mod/lock/v2/release_handler.go index f67a769d132..b3e8344db6a 100644 --- a/mod/lock/v2/release_handler.go +++ b/mod/lock/v2/release_handler.go @@ -33,7 +33,7 @@ func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) { return } nodes := lockNodes{resp.Node.Nodes} - node := nodes.FindByValue(value) + node, _ := nodes.FindByValue(value) if node == nil { http.Error(w, "release lock error: cannot find: " + value, http.StatusInternalServerError) return diff --git a/mod/lock/v2/renew_handler.go b/mod/lock/v2/renew_handler.go index 951b52c3294..9d209d5829d 100644 --- a/mod/lock/v2/renew_handler.go +++ b/mod/lock/v2/renew_handler.go @@ -41,7 +41,7 @@ func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) { return } nodes := lockNodes{resp.Node.Nodes} - node := nodes.FindByValue(value) + node, _ := nodes.FindByValue(value) if node == nil { http.Error(w, "renew lock error: cannot find: " + value, http.StatusInternalServerError) return diff --git a/mod/mod.go b/mod/mod.go index 34a380689f8..14fe61c98e3 100644 --- a/mod/mod.go +++ b/mod/mod.go @@ -7,6 +7,7 @@ import ( "github.com/coreos/etcd/mod/dashboard" lock2 "github.com/coreos/etcd/mod/lock/v2" + leader2 "github.com/coreos/etcd/mod/leader/v2" "github.com/gorilla/mux" ) @@ -22,7 +23,7 @@ func HttpHandler(addr string) http.Handler { r.HandleFunc("/dashboard", addSlash) r.PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler())) - // TODO: Use correct addr. r.PathPrefix("/v2/lock").Handler(http.StripPrefix("/v2/lock", lock2.NewHandler(addr))) + r.PathPrefix("/v2/leader").Handler(http.StripPrefix("/v2/leader", leader2.NewHandler(addr))) return r } From 296eaf7b3411dbda0b445553840df441ae9a3802 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 16 Dec 2013 08:00:16 -0700 Subject: [PATCH 2/2] Add leader module to README. --- README.md | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/README.md b/README.md index acbb7262d7a..b883f06b809 100644 --- a/README.md +++ b/README.md @@ -926,6 +926,41 @@ curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?index=customer1 curl -X DELETE http://127.0.0.1:4001/mod/v2/lock/customer1?name=bar ``` + +### Leader Election + +The Leader Election module wraps the Lock module to allow clients to come to consensus on a single value. +This is useful when you want one server to process at a time but allow other servers to fail over. +The API is similar to the Lock module but is limited to simple strings values. + +Here's the API: + +**Attempt to set a value for the "order_processing" leader key:** + +```sh +curl -X POST http://127.0.0.1:4001/mod/v2/leader/order_processing?ttl=60 -d name=myserver1.foo.com +``` + +**Retrieve the current value for the "order_processing" leader key:** + +```sh +curl http://127.0.0.1:4001/mod/v2/leader/order_processing +myserver1.foo.com +``` + +**Remove a value from the "order_processing" leader key:** + +```sh +curl -X POST http://127.0.0.1:4001/mod/v2/leader/order_processing?name=myserver1.foo.com +``` + +If multiple clients attempt to set the value for a key then only one will succeed. +The other clients will hang until the current value is removed because of TTL or because of a `DELETE` operation. +Multiple clients can submit the same value and will all be notified when that value succeeds. + +To update the TTL of a value simply reissue the same `POST` command that you used to set the value. + + ## Contributing See [CONTRIBUTING](https://github.com/coreos/etcd/blob/master/CONTRIBUTING.md) for details on submitting patches and contacting developers via IRC and mailing lists.