-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlocking.go
360 lines (307 loc) · 12 KB
/
locking.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
package locking
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"net/url"
"regexp"
"strings"
"sync"
"time"
"github.com/Azure/azure-storage-blob-go/2016-05-31/azblob"
"github.com/cenkalti/backoff"
"github.com/satori/go.uuid"
)
const (
lockBlobNamePrefix = "azlk-" // This is appended to the blob containers created by the library
lockContainerName = "azlockcontainer" // This is the name of the container used by the blobs created for locking
)
type (
// Lock represents the status of a lock
Lock struct {
ctx context.Context
used bool // Set to True when a lock has been unlocked
lockAcquired bool // Set to True when a lock has been acquired
panic func(string) // Used for testing to allow panic call to be mocked
unlockContext func(context.Context) error // Used by 'UnlockWhenCancelled' behavior to pass temporary context to unlock
cancel context.CancelFunc // Cancel is used internally to exit goRoutines of behaviors
blobURL azblob.BlobURL // URL of the blob used for this lock
internalMutex sync.Mutex // This is used to prevent multi threaded issues when updating 'used' and 'lockAcquired'
// LockTTL is the duration for which the lock is to be held
// Valid options: 15sec -> 60sec due to Azure Blob https://docs.microsoft.com/en-us/rest/api/storageservices/lease-container
LockTTL time.Duration
// LockLost This channel is signaled by the 'AutoRenew' behavior if the lock is lost
LockLost chan struct{}
// LockID is the ID of the underlying blob lease
LockID uuid.UUID
// Lock will acquire a lock for the specified name
Lock func() error
// Renew will renew the lock, if present
// or return an error if no lock is held
Renew func() error
// Unlock will release the lock, if present
// or return an error if no lock is held
Unlock func() error
}
// BehaviorFunc is a type converter that allows a func to be used as a `Behavior`
BehaviorFunc func(*Lock) *Lock
)
var (
// defaultLockBehaviors are the behaviors which are used when no behavior parameters are provided
defaultLockBehaviors = []BehaviorFunc{AutoRenewLock, PanicOnLostLock, UnlockWhenContextCancelled, RetryObtainingLock}
// azBlobRetryOptions are the default retry settings used for the azure storage calls
azBlobRetryOptions = azblob.RetryOptions{
Policy: azblob.RetryPolicyExponential,
MaxTries: 3,
}
// AutoRenewLock configures the lock to autorenew itself
AutoRenewLock = BehaviorFunc(func(l *Lock) *Lock {
go func() {
for {
select {
case <-l.ctx.Done():
// Context has been cancelled, exit so can be gc'd
return
case <-time.Tick(l.LockTTL / 2):
// If the 'lock' function hasn't been used yet spin
if !l.lockAcquired {
continue
}
// Do a renew. If we fail, clean up and notify that the lock is lost
err := l.Renew()
if err != nil {
l.cancel()
l.LockLost <- struct{}{}
return
}
case <-l.LockLost:
return
}
}
}()
return l
})
// RetryObtainingLock configures the lock to retry getting a lock if it is already held
RetryObtainingLock = BehaviorFunc(func(l *Lock) *Lock {
// Assuming locks will be initialised with roughly
// the correct TTL required to perform the operation
// lets give it 10x time to acquire it
obtainLockBackoffPolicy := backoff.NewExponentialBackOff()
obtainLockBackoffPolicy.MaxElapsedTime = l.LockTTL * 10
existingLockFunc := l.Lock
// Replace existing lock function with exponential retrying one
l.Lock = func() error { return backoff.Retry(existingLockFunc, obtainLockBackoffPolicy) }
return l
})
// UnlockWhenContextCancelled will remove a lease when a context is cancelled
UnlockWhenContextCancelled = BehaviorFunc(func(l *Lock) *Lock {
go func() {
for {
select {
case <-l.ctx.Done():
// If the 'lock' function wasn't ever called don't worry
if !l.lockAcquired {
return
}
// The original context is dead but we don't want to leave the lock in place
// so lets create a new context and give it 3 seconds to get the job done
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*5))
defer cancel()
l.unlockContext(ctx) //nolint: errcheck
return
case <-l.LockLost:
return
}
}
}()
return l
})
// PanicOnLostLock configures the lock to autorenew itself
PanicOnLostLock = BehaviorFunc(func(l *Lock) *Lock {
go func() {
select {
case <-l.ctx.Done():
return
case <-l.LockLost:
l.Unlock() //nolint: errcheck
l.panic("Lock lost and 'PanicOnLostLock' set")
}
}()
return l
})
)
// NewLockInstance returns a new instance of a lock
//
// Params
// StorageAccountURL: HTTPS endpoint for your storage account eg. `https://mystorageaccount.blob.core.windows.net` if your account is named `mystorageaccount`
// StorageAccountKey: The access key for your storage account
// LockName: An alphanumberic string < 58 chars that will represent your lock.
// LockTTL: A duration between 15 and 60 seconds for which the lock will be held. Note, by default the `AutoRenew` behavior will renew locks until `Unlock` is called
//
// Advanced
// Behaviors: Funcs which allow you to mutate the lockInstance's behavior. Leave empty for default behavior
//
func NewLockInstance(ctxParent context.Context, storageAccountURL, storageAccountKey, lockName string, lockTTL time.Duration, behavior ...BehaviorFunc) (*Lock, error) {
if storageAccountKey == "" {
return nil, fmt.Errorf("Empty accountKey is invalid")
}
if lockTTL.Seconds() < 15 || lockTTL.Seconds() > 60 {
return nil, fmt.Errorf("LockTTL of %v seconds is outside allowed range of 15-60seconds", lockTTL.Seconds())
}
if valid, err := IsValidLockName(lockName); !valid {
return nil, err
}
storageAccountURLParsed, err := url.Parse(storageAccountURL)
if err != nil {
return nil, fmt.Errorf("failed to parse storageAccountUrl, err: %+v", err)
}
if storageAccountURLParsed.Scheme != "https" {
return nil, fmt.Errorf("storageAccountURL should be 'https' Expect: 'https://mystorageaccount.blob.core.windows.net' Got: %s", storageAccountURL)
}
if storageAccountURLParsed.Path != "" {
return nil, fmt.Errorf("storageAccountURL should be to the root of the storage account Expect: 'https://mystorageaccount.blob.core.windows.net' Got: %s", storageAccountURL)
}
if _, err = base64.StdEncoding.DecodeString(storageAccountKey); err != nil {
return nil, fmt.Errorf("accountKey isn't valid base64 value - must be valid base64")
}
// Extract the accountname from the storage URL
// for example 'https://mystorageaccount.blob.core.windows.net' -> 'mystorageaccount'
accountName, err := extractAccountNameFromURL(storageAccountURLParsed)
if err != nil {
return nil, err
}
creds := azblob.NewSharedKeyCredential(accountName, storageAccountKey)
// Create a ContainerURL object to a container
u, _ := url.Parse(fmt.Sprintf("%s/%s", storageAccountURL, lockContainerName))
containerURL := azblob.NewContainerURL(*u, azblob.NewPipeline(creds, azblob.PipelineOptions{Retry: azBlobRetryOptions}))
_, err = containerURL.Create(ctxParent, nil, azblob.PublicAccessNone)
// Create will return a ServiceCode of "ContainerAlreadyExists" if the container already exists
// we only error on other conditions as it's expected that a container of this
// name may already exist
errResponse, isReponseError := err.(azblob.StorageError)
if err != nil {
if !isReponseError {
return nil, err
} else if errResponse.ServiceCode() != azblob.ServiceCodeContainerAlreadyExists {
return nil, err
}
}
// Create a blob, we use leases on the blob to implement the lock
blobURL := containerURL.NewBlobURL(lockBlobNamePrefix + lockName)
// Upload an empty blob
buf := bytes.NewReader([]byte{})
_, err = blobURL.ToBlockBlobURL().PutBlob(ctxParent, buf, azblob.BlobHTTPHeaders{}, azblob.Metadata{}, azblob.BlobAccessConditions{})
// It's expected that a lock of this name may already exist
// and may already have an active lease BUT for any other
// ServiceCodes or errors we should return an error
errResponse, isReponseError = err.(azblob.StorageError)
if err != nil {
if !isReponseError {
return nil, err
} else if isReponseError &&
errResponse.ServiceCode() != azblob.ServiceCodeBlobAlreadyExists &&
errResponse.ServiceCode() != azblob.ServiceCodeLeaseIDMissing {
return nil, err
}
}
// Create our own context which will be cancelled independently of
// the parent context
ctx, cancel := context.WithCancel(ctxParent)
lockInstance := &Lock{
ctx: ctx,
cancel: cancel,
blobURL: blobURL,
panic: func(s string) { panic(s) },
LockTTL: lockTTL,
LockLost: make(chan struct{}, 1),
LockID: uuid.NewV4(),
}
// This function handles unlocking
// it accepts a context to allow locks to be unlocked
// even after a context has been cancelled
lockInstance.unlockContext = func(ctx context.Context) error {
lockInstance.internalMutex.Lock()
defer lockInstance.internalMutex.Unlock()
if !lockInstance.lockAcquired {
return fmt.Errorf("Lock not acquired, can't unlock")
}
if lockInstance.used {
return fmt.Errorf("Lock instance already unlocked, cannot call unlock")
}
// No matter what happened cancel the context to close off the go routines running in behaviors
defer lockInstance.cancel()
_, err := lockInstance.blobURL.ReleaseLease(ctx, lockInstance.LockID.String(), azblob.HTTPAccessConditions{})
if err != nil {
return err
}
// Mark this lock instance as used to prevent reuse
// as the library doesn't handle multiple uses per lock instance
lockInstance.used = true
return nil
}
lockInstance.Unlock = func() error {
return lockInstance.unlockContext(lockInstance.ctx)
}
lockInstance.Lock = func() error {
lockInstance.internalMutex.Lock()
defer lockInstance.internalMutex.Unlock()
if lockInstance.used {
return fmt.Errorf("Lock instance already unlocked, cannot be reused")
}
if lockInstance.lockAcquired {
return fmt.Errorf("Lock already acquire, call 'renew' to extend a lock")
}
_, err = lockInstance.blobURL.AcquireLease(lockInstance.ctx, lockInstance.LockID.String(), int32(lockTTL.Seconds()), azblob.HTTPAccessConditions{})
if err != nil {
return err
}
lockInstance.lockAcquired = true
return nil
}
lockInstance.Renew = func() error {
lockInstance.internalMutex.Lock()
defer lockInstance.internalMutex.Unlock()
if !lockInstance.lockAcquired {
return fmt.Errorf("Lock not acquired, can't renew")
}
if lockInstance.used {
return fmt.Errorf("Lock instance already used, cannot be reused")
}
_, err := lockInstance.blobURL.RenewLease(lockInstance.ctx, lockInstance.LockID.String(), azblob.HTTPAccessConditions{})
if err != nil {
return err
}
return nil
}
// If behaviors haven't been defined use the defaults
if len(behavior) == 0 {
behavior = defaultLockBehaviors
}
// Configure behaviors
for _, b := range behavior {
lockInstance = b(lockInstance)
}
return lockInstance, nil
}
func extractAccountNameFromURL(u *url.URL) (string, error) {
parts := strings.Split(u.Hostname(), ".")
if len(parts) < 1 {
return "", fmt.Errorf("couldn't extract accountname from: %s", u.String())
}
return parts[0], nil
}
// validLockNameRegex is a regex used to check the chars are valid as an Azure Storage container name
var validLockNameRegex = regexp.MustCompile("^[a-z0-9]+(-[a-z0-9]+)*$")
// IsValidLockName checks if the lock name is between 3-58 (63 minus 5char prefix used) characters long
// and matches this regex @"^[a-z0-9]+(-[a-z0-9]+)*$"
func IsValidLockName(lockName string) (bool, error) {
lockName = strings.ToLower(lockName)
if len(lockName) < 3 || len(lockName) > 58 {
return false, fmt.Errorf("lock name: %s must be between 3 and 58 characters long", lockName)
}
if !validLockNameRegex.MatchString(lockName) {
return false, fmt.Errorf("lock name: %s must be alphanumberic with no characters other than '-' (regex '^[a-z0-9]+(-[a-z0-9]+)*$')", lockName)
}
return true, nil
}