Skip to content

Commit

Permalink
Merge pull request #347 from benbjohnson/mod-lock
Browse files Browse the repository at this point in the history
WIP: mod/lock
  • Loading branch information
philips committed Dec 5, 2013
2 parents 70e31c7 + b784ced commit af20be8
Show file tree
Hide file tree
Showing 14 changed files with 534 additions and 74 deletions.
128 changes: 128 additions & 0 deletions mod/lock/v2/acquire_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package v2

import (
"net/http"
"path"
"strconv"
"time"

"github.com/coreos/go-etcd/etcd"
"github.com/gorilla/mux"
)

// acquireHandler attempts to acquire a lock on the given key.
// The "key" parameter specifies the resource to lock.
// The "ttl" parameter specifies how long the lock will persist for.
// The "timeout" parameter specifies how long the request should wait for the lock.
func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
h.client.SyncCluster()

// Setup connection watcher.
closeNotifier, _ := w.(http.CloseNotifier)
closeChan := closeNotifier.CloseNotify()

// Parse "key" and "ttl" query parameters.
vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key"])
ttl, err := strconv.Atoi(req.FormValue("ttl"))
if err != nil {
http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
return
}

// Parse "timeout" parameter.
var timeout int
if len(req.FormValue("timeout")) == 0 {
timeout = -1
} else if timeout, err = strconv.Atoi(req.FormValue("timeout")); err != nil {
http.Error(w, "invalid timeout: " + err.Error(), http.StatusInternalServerError)
return
}
timeout = timeout + 1

// Create an incrementing id for the lock.
resp, err := h.client.AddChild(keypath, "-", uint64(ttl))
if err != nil {
http.Error(w, "add lock index error: " + err.Error(), http.StatusInternalServerError)
return
}
indexpath := resp.Node.Key

// Keep updating TTL to make sure lock request is not expired before acquisition.
stop := make(chan bool)
go h.ttlKeepAlive(indexpath, ttl, stop)

// Monitor for broken connection.
stopWatchChan := make(chan bool)
go func() {
select {
case <-closeChan:
stopWatchChan <- true
case <-stop:
// Stop watching for connection disconnect.
}
}()

// Extract the lock index.
index, _ := strconv.Atoi(path.Base(resp.Node.Key))

// Wait until we successfully get a lock or we get a failure.
var success bool
for {
// Read all indices.
resp, err = h.client.Get(keypath, true, true)
if err != nil {
http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
break
}
indices := extractResponseIndices(resp)
waitIndex := resp.Node.ModifiedIndex
prevIndex := findPrevIndex(indices, index)

// If there is no previous index then we have the lock.
if prevIndex == 0 {
success = true
break
}

// Otherwise watch previous index until it's gone.
_, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, false, nil, stopWatchChan)
if err == etcd.ErrWatchStoppedByUser {
break
} else if err != nil {
http.Error(w, "lock watch error: " + err.Error(), http.StatusInternalServerError)
break
}
}

// Check for connection disconnect before we write the lock index.
select {
case <-stopWatchChan:
success = false
default:
}

// Stop the ttl keep-alive.
close(stop)

if success {
// Write lock index to response body if we acquire the lock.
h.client.Update(indexpath, "-", uint64(ttl))
w.Write([]byte(strconv.Itoa(index)))
} else {
// Make sure key is deleted if we couldn't acquire.
h.client.Delete(indexpath, false)
}
}

// ttlKeepAlive continues to update a key's TTL until the stop channel is closed.
func (h *handler) ttlKeepAlive(k string, ttl int, stop chan bool) {
for {
select {
case <-time.After(time.Duration(ttl / 2) * time.Second):
h.client.Update(k, "-", uint64(ttl))
case <-stop:
return
}
}
}
30 changes: 30 additions & 0 deletions mod/lock/v2/get_index_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package v2

import (
"net/http"
"path"
"strconv"

"github.com/gorilla/mux"
)

// getIndexHandler retrieves the current lock index.
func (h *handler) getIndexHandler(w http.ResponseWriter, req *http.Request) {
h.client.SyncCluster()

vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key"])

// Read all indices.
resp, err := h.client.Get(keypath, true, true)
if err != nil {
http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
return
}

// Write out the index of the last one to the response body.
indices := extractResponseIndices(resp)
if len(indices) > 0 {
w.Write([]byte(strconv.Itoa(indices[0])))
}
}
58 changes: 58 additions & 0 deletions mod/lock/v2/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package v2

import (
"net/http"
"path"
"strconv"
"sort"

"github.com/gorilla/mux"
"github.com/coreos/go-etcd/etcd"
)

const prefix = "/_etcd/mod/lock"

// handler manages the lock HTTP request.
type handler struct {
*mux.Router
client *etcd.Client
}

// NewHandler creates an HTTP handler that can be registered on a router.
func NewHandler(addr string) (http.Handler) {
h := &handler{
Router: mux.NewRouter(),
client: etcd.NewClient([]string{addr}),
}
h.StrictSlash(false)
h.HandleFunc("/{key:.*}", h.getIndexHandler).Methods("GET")
h.HandleFunc("/{key:.*}", h.acquireHandler).Methods("POST")
h.HandleFunc("/{key_with_index:.*}", h.renewLockHandler).Methods("PUT")
h.HandleFunc("/{key_with_index:.*}", h.releaseLockHandler).Methods("DELETE")
return h
}


// extractResponseIndices extracts a sorted list of indicies from a response.
func extractResponseIndices(resp *etcd.Response) []int {
var indices []int
for _, node := range resp.Node.Nodes {
if index, _ := strconv.Atoi(path.Base(node.Key)); index > 0 {
indices = append(indices, index)
}
}
sort.Ints(indices)
return indices
}

// findPrevIndex retrieves the previous index before the given index.
func findPrevIndex(indices []int, idx int) int {
var prevIndex int
for _, index := range indices {
if index == idx {
break
}
prevIndex = index
}
return prevIndex
}
24 changes: 24 additions & 0 deletions mod/lock/v2/release_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package v2

import (
"path"
"net/http"

"github.com/gorilla/mux"
)

// releaseLockHandler deletes the lock.
func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) {
h.client.SyncCluster()

vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key_with_index"])

// Delete the lock.
_, err := h.client.Delete(keypath, false)
if err != nil {
http.Error(w, "delete lock index error: " + err.Error(), http.StatusInternalServerError)
return
}
}

30 changes: 30 additions & 0 deletions mod/lock/v2/renew_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package v2

import (
"path"
"net/http"
"strconv"

"github.com/gorilla/mux"
)

// renewLockHandler attempts to update the TTL on an existing lock.
// Returns a 200 OK if successful. Returns non-200 on error.
func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
h.client.SyncCluster()

vars := mux.Vars(req)
keypath := path.Join(prefix, vars["key_with_index"])
ttl, err := strconv.Atoi(req.FormValue("ttl"))
if err != nil {
http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
return
}

// Renew the lock, if it exists.
_, err = h.client.Update(keypath, "-", uint64(ttl))
if err != nil {
http.Error(w, "renew lock index error: " + err.Error(), http.StatusInternalServerError)
return
}
}
Loading

0 comments on commit af20be8

Please sign in to comment.