Skip to content

Commit

Permalink
etcd coordinator should retry until it's no longer safe
Browse files Browse the repository at this point in the history
  • Loading branch information
schmichael committed Dec 8, 2014
1 parent 5b730d5 commit 4862f92
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 16 deletions.
4 changes: 1 addition & 3 deletions m_etcd/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ const (
MetadataKey = "_metafora" // _{KEYs} are hidden files, so this will not trigger our watches
OwnerMarker = "owner"

ClaimTTL = 120 // seconds
NodeIDTTL = 86400 // 24 hours
ForeverTTL = 0 //Ref: https://github.com/coreos/go-etcd/blob/e10c58ee110f54c2f385ac99764e8a7ca4cb13df/etcd/requests.go#L356
ForeverTTL = 0 //Ref: https://github.com/coreos/go-etcd/blob/e10c58ee110f54c2f385ac99764e8a7ca4cb13df/etcd/requests.go#L356

//Etcd Error codes are passed directly through go-etcd from the http response,
//So to find the error codes use this ref:
Expand Down
60 changes: 47 additions & 13 deletions m_etcd/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ const (
)

var (
DefaultNodePathTTL uint64 = 20 // seconds
ClaimTTL uint64 = 120 // seconds
DefaultNodePathTTL uint64 = 20 // seconds

// etcd actions signifying a claim key was released
releaseActions = map[string]bool{
Expand Down Expand Up @@ -164,27 +165,57 @@ func (ec *EtcdCoordinator) upsertDir(path string, ttl uint64) {
}
}

// nodeRefresher is in chage of keeping the node entry in etcd alive. If it's
// unable to communicate with etcd it must shutdown the coordinator.
//
// watch retries on errors and taskmgr calls Lost(task) on tasks it can't
// refresh, so it's up to nodeRefresher to cause the coordinator to close if
// it's unable to communicate with etcd.
func (ec *EtcdCoordinator) nodeRefresher() {
ttl := ec.nodePathTTL - 5 // try to have some leeway before ttl expires
ttl := ec.nodePathTTL >> 1 // have some leeway before ttl expires
if ttl < 1 {
ttl = 1
}
for {
// Deadline for refreshes to finish by or the coordinator closes.
deadline := time.Now().Add(time.Duration(ec.nodePathTTL) * time.Second)
select {
case <-ec.stop:
return
case <-time.After(time.Duration(ttl) * time.Second):
if _, err := ec.Client.UpdateDir(ec.nodePath, ec.nodePathTTL); err != nil {
ec.cordCtx.Log(metafora.LogLevelError, "Unexpected error updating node key, shutting down. Error: %v", err)

if err := ec.refreshBy(deadline); err != nil {
// We're in a bad state; shut everything down
ec.cordCtx.Log(metafora.LogLevelError,
"Unable to refresh node key before deadline %s. Last error: %v", deadline, err)
ec.Close()
return
}
}
}
}

// refreshBy retries refreshing the node key until the deadline is reached.
func (ec *EtcdCoordinator) refreshBy(deadline time.Time) (err error) {
for time.Now().Before(deadline) {
// Make sure we shouldn't exit
select {
case <-ec.stop:
return err
default:
}

_, err = ec.Client.UpdateDir(ec.nodePath, ec.nodePathTTL)
if err == nil {
// It worked!
return nil
}
ec.cordCtx.Log(metafora.LogLevelWarn, "Unexpected error updating node key: %v", err)
transport.CloseIdleConnections() // paranoia; let's get fresh connections on errors.
time.Sleep(500 * time.Millisecond) // rate limit retries a bit
}
// Didn't get a successful response before deadline, exit with error
return err
}

// Watch will do a blocking etcd watch on taskPath until a claimable task is
// found or Close() is called.
//
Expand Down Expand Up @@ -381,10 +412,10 @@ func (ec *EtcdCoordinator) Close() {
default:
}

ec.taskManager.stop()

close(ec.stop)

ec.taskManager.stop()

// Finally remove the node entry
const recursive = true
_, err := ec.Client.Delete(ec.nodePath, recursive)
Expand Down Expand Up @@ -421,11 +452,14 @@ func (ec *EtcdCoordinator) watch(path string, index uint64) (*etcd.Response, err
// This isn't actually an error, the stop chan was closed. Time to stop!
return nil, err
}
// RawWatch errors should be treated as fatal since it internally
// retries on network problems, and recoverable Etcd errors aren't
// parsed until rawResp.Unmarshal is called later.
ec.cordCtx.Log(metafora.LogLevelError, "%s Unrecoverable watch error: %v", path, err)
return nil, err

// Other RawWatch errors should be retried forever. If the node refresher
// also fails to communicate with etcd it will close the coordinator,
// closing ec.stop in the process which will cause this function to with
// ErrWatchStoppedByUser.
ec.cordCtx.Log(metafora.LogLevelError, "%s Retrying after unexpected watch error: %v", path, err)
transport.CloseIdleConnections() // paranoia; let's get fresh connections on errors.
continue
}

if len(rawResp.Body) == 0 {
Expand Down

0 comments on commit 4862f92

Please sign in to comment.