Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: Adding Lock helpers for leader election #594

Merged
merged 3 commits into from
Jan 12, 2015
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 280 additions & 0 deletions api/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package api

import (
"fmt"
"sync"
"time"
)

const (
// DefaultLockSessionName is the Session Name we assign if none is provided
DefaultLockSessionName = "Consul API Lock"

// DefaultLockSessionTTL is the default session TTL if no Session is provided
// when creating a new Lock. This is used because we do not have another
// other check to depend upon.
DefaultLockSessionTTL = "15s"

// DefaultLockWaitTime is how long we block for at a time to check if lock
// acquisition is possible. This affects the minimum time it takes to cancel
// a Lock acquisition.
DefaultLockWaitTime = 15 * time.Second

// DefaultLockRetryTime is how long we wait after a failed lock acquisition
// before attempting to do the lock again. This is so that once a lock-delay
// is in affect, we do not hot loop retrying the acquisition.
DefaultLockRetryTime = 5 * time.Second
)

var (
// ErrLockHeld is returned if we attempt to double lock
ErrLockHeld = fmt.Errorf("Lock already held")

// ErrLockNotHeld is returned if we attempt to unlock a lock
// that we do not hold.
ErrLockNotHeld = fmt.Errorf("Lock not held")
)

// Lock is used to implement client-side leader election. It is follows the
// algorithm as described here: https://consul.io/docs/guides/leader-election.html.
type Lock struct {
c *Client
opts *LockOptions

isHeld bool
sessionRenew chan struct{}
lockSession string
l sync.Mutex
}

// LockOptions is used to parameterize the Lock behavior.
type LockOptions struct {
Key string // Must be set and have write permissions
Value []byte // Optional, value to associate with the lock
Session string // Optional, created if not specified
SessionName string // Optional, defaults to DefaultLockSessionName
SessionTTL string // Optional, defaults to DefaultLockSessionTTL
}

// LockKey returns a handle to a lock struct which can be used
// to acquire and release the mutex. The key used must have
// write permissions.
func (c *Client) LockKey(key string) (*Lock, error) {
opts := &LockOptions{
Key: key,
}
return c.LockOpts(opts)
}

// LockOpts returns a handle to a lock struct which can be used
// to acquire and release the mutex. The key used must have
// write permissions.
func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
if opts.SessionName == "" {
opts.SessionName = DefaultLockSessionName
}
if opts.SessionTTL == "" {
opts.SessionTTL = DefaultLockSessionTTL
} else {
if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
return nil, fmt.Errorf("invalid SessionTTL: %v", err)
}
}
l := &Lock{
c: c,
opts: opts,
}
return l, nil
}

// Lock attempts to acquire the lock and blocks while doing so.
// Providing a non-nil stopCh can be used to abort the lock attempt.
// Returns a channel that is closed if our lock is lost or an error.
// This channel could be closed at any time due to session invalidation,
// communication errors, operator intervention, etc. It is NOT safe to
// assume that the lock is held until Unlock() unless the Session is specifically
// created without any associated health checks. By default Consul sessions
// prefer liveness over safety and an application must be able to handle
// the lock being lost.
func (l *Lock) Lock(stopCh chan struct{}) (chan struct{}, error) {
// Hold the lock as we try to acquire
l.l.Lock()
defer l.l.Unlock()

// Check if we already hold the lock
if l.isHeld {
return nil, ErrLockHeld
}

// Check if we need to create a session first
l.lockSession = l.opts.Session
if l.lockSession == "" {
if s, err := l.createSession(); err != nil {
return nil, fmt.Errorf("failed to create session: %v", err)
} else {
l.sessionRenew = make(chan struct{})
l.lockSession = s
go l.renewSession(s, l.sessionRenew)

// If we fail to acquire the lock, cleanup the session
defer func() {
if !l.isHeld {
close(l.sessionRenew)
l.sessionRenew = nil
}
}()
}
}

// Setup the query options
kv := l.c.KV()
qOpts := &QueryOptions{
WaitTime: DefaultLockWaitTime,
}

WAIT:
// Check if we should quit
select {
case <-stopCh:
return nil, nil
default:
}

// Look for an existing lock, blocking until not taken
pair, meta, err := kv.Get(l.opts.Key, qOpts)
if err != nil {
return nil, fmt.Errorf("failed to read lock: %v", err)
}
if pair != nil && pair.Session != "" {
qOpts.WaitIndex = meta.LastIndex
goto WAIT
}

// Try to acquire the lock
lockEnt := l.lockEntry(l.lockSession)
locked, _, err := kv.Acquire(lockEnt, nil)
if err != nil {
return nil, fmt.Errorf("failed to acquire lock: %v", err)
}

// Handle the case of not getting the lock
if !locked {
select {
case <-time.After(DefaultLockRetryTime):
goto WAIT
case <-stopCh:
return nil, nil
}
}

// Watch to ensure we maintain leadership
leaderCh := make(chan struct{})
go l.monitorLock(l.lockSession, leaderCh)

// Set that we own the lock
l.isHeld = true

// Locked! All done
return leaderCh, nil
}

// Unlock released the lock. It is an error to call this
// if the lock is not currently held.
func (l *Lock) Unlock() error {
// Hold the lock as we try to release
l.l.Lock()
defer l.l.Unlock()

// Ensure the lock is actually held
if !l.isHeld {
return ErrLockNotHeld
}

// Set that we no longwer own the lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo. s/longwer/longer/. :-)

l.isHeld = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to set this just before returning. Otherwise if we fail to release the lock, this gets set to false anyways.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing that can fail is the KV.Release() but we have to assume the lock is no longer held once that call is started.


// Stop the session renew
if l.sessionRenew != nil {
defer func() {
close(l.sessionRenew)
l.sessionRenew = nil
}()
}

// Get the lock entry, and clear the lock session
lockEnt := l.lockEntry(l.lockSession)
l.lockSession = ""

// Release the lock explicitly
kv := l.c.KV()
_, _, err := kv.Release(lockEnt, nil)
if err != nil {
return fmt.Errorf("failed to release lock: %v", err)
}
return nil
}

// createSession is used to create a new managed session
func (l *Lock) createSession() (string, error) {
session := l.c.Session()
se := &SessionEntry{
Name: l.opts.SessionName,
TTL: l.opts.SessionTTL,
}
id, _, err := session.Create(se, nil)
if err != nil {
return "", err
}
return id, nil
}

// lockEntry returns a formatted KVPair for the lock
func (l *Lock) lockEntry(session string) *KVPair {
return &KVPair{
Key: l.opts.Key,
Value: l.opts.Value,
Session: session,
}
}

// renewSession is a long running routine that maintians a session
// by doing a periodic Session renewal.
func (l *Lock) renewSession(id string, doneCh chan struct{}) {
session := l.c.Session()
ttl, _ := time.ParseDuration(l.opts.SessionTTL)
for {
select {
case <-time.After(ttl / 2):
entry, _, err := session.Renew(id, nil)
if err != nil || entry == nil {
return
}

// Handle the server updating the TTL
ttl, _ = time.ParseDuration(entry.TTL)

case <-doneCh:
// Attempt a session destroy
session.Destroy(id, nil)
return
}
}
}

// monitorLock is a long running routine to monitor a lock ownership
// It closes the stopCh if we lose our leadership.
func (l *Lock) monitorLock(session string, stopCh chan struct{}) {
kv := l.c.KV()
opts := &QueryOptions{RequireConsistent: true}
WAIT:
pair, meta, err := kv.Get(l.opts.Key, opts)
if err != nil {
close(stopCh)
return
}
if pair != nil && pair.Session == session {
opts.WaitIndex = meta.LastIndex
goto WAIT
}
close(stopCh)
}
Loading