Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: mod/lock #347

Merged
merged 11 commits into from
Dec 5, 2013
128 changes: 128 additions & 0 deletions mod/lock/v2/acquire_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package v2

import (
"net/http"
"path"
"strconv"
"time"

"github.com/coreos/go-etcd/etcd"
"github.com/gorilla/mux"
)

// acquireHandler attempts to acquire a lock on the given key.
// The "key" parameter specifies the resource to lock.
// The "ttl" parameter specifies how long the lock will persist for.
// The "timeout" parameter specifies how long the request should wait for the lock.
func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
h.client.SyncCluster()

// Setup connection watcher.
closeNotifier, _ := w.(http.CloseNotifier)
closeChan := closeNotifier.CloseNotify()

// Parse "key" and "ttl" query parameters.
vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key"])
ttl, err := strconv.Atoi(req.FormValue("ttl"))
if err != nil {
http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
return
}

// Parse "timeout" parameter.
var timeout int
if len(req.FormValue("timeout")) == 0 {
timeout = -1
} else if timeout, err = strconv.Atoi(req.FormValue("timeout")); err != nil {
http.Error(w, "invalid timeout: " + err.Error(), http.StatusInternalServerError)
return
}
timeout = timeout + 1

// Create an incrementing id for the lock.
resp, err := h.client.AddChild(keypath, "-", uint64(ttl))
if err != nil {
http.Error(w, "add lock index error: " + err.Error(), http.StatusInternalServerError)
return
}
indexpath := resp.Node.Key

// Keep updating TTL to make sure lock request is not expired before acquisition.
stop := make(chan bool)
go h.ttlKeepAlive(indexpath, ttl, stop)

// Monitor for broken connection.
stopWatchChan := make(chan bool)
go func() {
select {
case <-closeChan:
stopWatchChan <- true
case <-stop:
// Stop watching for connection disconnect.
}
}()

// Extract the lock index.
index, _ := strconv.Atoi(path.Base(resp.Node.Key))

// Wait until we successfully get a lock or we get a failure.
var success bool
for {
// Read all indices.
resp, err = h.client.Get(keypath, true, true)
if err != nil {
http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
break
}
indices := extractResponseIndices(resp)
waitIndex := resp.Node.ModifiedIndex
prevIndex := findPrevIndex(indices, index)

// If there is no previous index then we have the lock.
if prevIndex == 0 {
success = true
break
}

// Otherwise watch previous index until it's gone.
_, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, false, nil, stopWatchChan)
if err == etcd.ErrWatchStoppedByUser {
break
} else if err != nil {
http.Error(w, "lock watch error: " + err.Error(), http.StatusInternalServerError)
break
}
}

// Check for connection disconnect before we write the lock index.
select {
case <-stopWatchChan:
success = false
default:
}

// Stop the ttl keep-alive.
close(stop)

if success {
// Write lock index to response body if we acquire the lock.
h.client.Update(indexpath, "-", uint64(ttl))
w.Write([]byte(strconv.Itoa(index)))
} else {
// Make sure key is deleted if we couldn't acquire.
h.client.Delete(indexpath, false)
}
}

// ttlKeepAlive continues to update a key's TTL until the stop channel is closed.
func (h *handler) ttlKeepAlive(k string, ttl int, stop chan bool) {
for {
select {
case <-time.After(time.Duration(ttl / 2) * time.Second):
h.client.Update(k, "-", uint64(ttl))
case <-stop:
return
}
}
}
30 changes: 30 additions & 0 deletions mod/lock/v2/get_index_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package v2

import (
"net/http"
"path"
"strconv"

"github.com/gorilla/mux"
)

// getIndexHandler retrieves the current lock index.
func (h *handler) getIndexHandler(w http.ResponseWriter, req *http.Request) {
h.client.SyncCluster()

vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key"])

// Read all indices.
resp, err := h.client.Get(keypath, true, true)
if err != nil {
http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
return
}

// Write out the index of the last one to the response body.
indices := extractResponseIndices(resp)
if len(indices) > 0 {
w.Write([]byte(strconv.Itoa(indices[0])))
}
}
58 changes: 58 additions & 0 deletions mod/lock/v2/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package v2

import (
"net/http"
"path"
"strconv"
"sort"

"github.com/gorilla/mux"
"github.com/coreos/go-etcd/etcd"
)

const prefix = "/_etcd/mod/lock"

// handler manages the lock HTTP request.
type handler struct {
*mux.Router
client *etcd.Client
}

// NewHandler creates an HTTP handler that can be registered on a router.
func NewHandler(addr string) (http.Handler) {
h := &handler{
Router: mux.NewRouter(),
client: etcd.NewClient([]string{addr}),
}
h.StrictSlash(false)
h.HandleFunc("/{key:.*}", h.getIndexHandler).Methods("GET")
h.HandleFunc("/{key:.*}", h.acquireHandler).Methods("POST")
h.HandleFunc("/{key_with_index:.*}", h.renewLockHandler).Methods("PUT")
h.HandleFunc("/{key_with_index:.*}", h.releaseLockHandler).Methods("DELETE")
return h
}


// extractResponseIndices extracts a sorted list of indicies from a response.
func extractResponseIndices(resp *etcd.Response) []int {
var indices []int
for _, node := range resp.Node.Nodes {
if index, _ := strconv.Atoi(path.Base(node.Key)); index > 0 {
indices = append(indices, index)
}
}
sort.Ints(indices)
return indices
}

// findPrevIndex retrieves the previous index before the given index.
func findPrevIndex(indices []int, idx int) int {
var prevIndex int
for _, index := range indices {
if index == idx {
break
}
prevIndex = index
}
return prevIndex
}
24 changes: 24 additions & 0 deletions mod/lock/v2/release_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package v2

import (
"path"
"net/http"

"github.com/gorilla/mux"
)

// releaseLockHandler deletes the lock.
func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) {
h.client.SyncCluster()

vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key_with_index"])

// Delete the lock.
_, err := h.client.Delete(keypath, false)
if err != nil {
http.Error(w, "delete lock index error: " + err.Error(), http.StatusInternalServerError)
return
}
}

30 changes: 30 additions & 0 deletions mod/lock/v2/renew_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package v2

import (
"path"
"net/http"
"strconv"

"github.com/gorilla/mux"
)

// renewLockHandler attempts to update the TTL on an existing lock.
// Returns a 200 OK if successful. Returns non-200 on error.
func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
h.client.SyncCluster()

vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key_with_index"])
ttl, err := strconv.Atoi(req.FormValue("ttl"))
if err != nil {
http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
return
}

// Renew the lock, if it exists.
_, err = h.client.Update(keypath, "-", uint64(ttl))
if err != nil {
http.Error(w, "renew lock index error: " + err.Error(), http.StatusInternalServerError)
return
}
}
Loading