diff --git a/core/models/contacts.go b/core/models/contacts.go index ade4e38e9..a1c4ba650 100644 --- a/core/models/contacts.go +++ b/core/models/contacts.go @@ -19,6 +19,7 @@ import ( "github.com/nyaruka/goflow/excellent/types" "github.com/nyaruka/goflow/flows" "github.com/nyaruka/null" + "github.com/nyaruka/redisx" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -1341,9 +1342,10 @@ func (i *ContactID) Scan(value interface{}) error { return null.ScanInt(value, (*null.Int)(i)) } -// ContactLock returns the lock key for a particular contact, used with locker -func ContactLock(orgID OrgID, contactID ContactID) string { - return fmt.Sprintf("c:%d:%d", orgID, contactID) +// GetContactLocker returns the locker for a particular contact +func GetContactLocker(orgID OrgID, contactID ContactID) *redisx.Locker { + key := fmt.Sprintf("lock:c:%d:%d", orgID, contactID) + return redisx.NewLocker(key, time.Minute*5) } // UpdateContactModifiedBy updates modified by the passed user id on the passed in contacts diff --git a/core/runner/runner.go b/core/runner/runner.go index 23e99fb57..29705605e 100644 --- a/core/runner/runner.go +++ b/core/runner/runner.go @@ -15,7 +15,7 @@ import ( "github.com/nyaruka/mailroom/core/models" "github.com/nyaruka/mailroom/core/queue" "github.com/nyaruka/mailroom/runtime" - "github.com/nyaruka/mailroom/utils/locker" + "github.com/nyaruka/redisx" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -461,7 +461,7 @@ func StartFlow( start := time.Now() // map of locks we've released - released := make(map[string]bool) + released := make(map[*redisx.Locker]bool) for len(remaining) > 0 && time.Since(start) < time.Minute*5 { locked := make([]models.ContactID, 0, len(remaining)) @@ -470,8 +470,9 @@ func StartFlow( // try up to a second to get a lock for a contact for _, contactID := range remaining { - lockID := models.ContactLock(oa.OrgID(), contactID) - lock, err := locker.GrabLock(rt.RP, lockID, time.Minute*5, time.Second) + locker := models.GetContactLocker(oa.OrgID(), contactID) + + lock, err := locker.Grab(rt.RP, time.Second) if err != nil { return nil, errors.Wrapf(err, "error attempting to grab lock") } @@ -484,8 +485,8 @@ func StartFlow( // defer unlocking if we exit due to error defer func() { - if !released[lockID] { - locker.ReleaseLock(rt.RP, lockID, lock) + if !released[locker] { + locker.Release(rt.RP, lock) } }() } @@ -517,9 +518,9 @@ func StartFlow( // release all our locks for i := range locked { - lockID := models.ContactLock(oa.OrgID(), locked[i]) - locker.ReleaseLock(rt.RP, lockID, locks[i]) - released[lockID] = true + locker := models.GetContactLocker(oa.OrgID(), locked[i]) + locker.Release(rt.RP, locks[i]) + released[locker] = true } // skipped are now our remaining diff --git a/core/tasks/handler/worker.go b/core/tasks/handler/worker.go index b03f5935d..057c335cd 100644 --- a/core/tasks/handler/worker.go +++ b/core/tasks/handler/worker.go @@ -22,7 +22,6 @@ import ( "github.com/nyaruka/mailroom/core/queue" "github.com/nyaruka/mailroom/core/runner" "github.com/nyaruka/mailroom/runtime" - "github.com/nyaruka/mailroom/utils/locker" "github.com/nyaruka/null" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -61,8 +60,9 @@ func handleContactEvent(ctx context.Context, rt *runtime.Runtime, task *queue.Ta } // acquire the lock for this contact - lockID := models.ContactLock(models.OrgID(task.OrgID), eventTask.ContactID) - lock, err := locker.GrabLock(rt.RP, lockID, time.Minute*5, time.Second*10) + locker := models.GetContactLocker(models.OrgID(task.OrgID), eventTask.ContactID) + + lock, err := locker.Grab(rt.RP, time.Second*10) if err != nil { return errors.Wrapf(err, "error acquiring lock for contact %d", eventTask.ContactID) } @@ -81,7 +81,7 @@ func handleContactEvent(ctx context.Context, rt *runtime.Runtime, task *queue.Ta }).Info("failed to get lock for contact, requeued and skipping") return nil } - defer locker.ReleaseLock(rt.RP, lockID, lock) + defer locker.Release(rt.RP, lock) // read all the events for this contact, one by one contactQ := fmt.Sprintf("c:%d:%d", task.OrgID, eventTask.ContactID) diff --git a/utils/locker/locker.go b/utils/locker/locker.go deleted file mode 100644 index 36340f9d8..000000000 --- a/utils/locker/locker.go +++ /dev/null @@ -1,103 +0,0 @@ -package locker - -import ( - "fmt" - "math/rand" - "time" - - "github.com/gomodule/redigo/redis" - "github.com/pkg/errors" -) - -// GrabLock grabs the passed in lock from redis in an atomic operation. It returns the lock value -// if successful. It will retry until the retry period, returning empty string if not acquired -// in that time. -func GrabLock(rp *redis.Pool, key string, expiration time.Duration, retry time.Duration) (string, error) { - // generate our lock value - value := makeRandom(10) - - // convert our expiration to seconds - seconds := int(expiration / time.Second) - if seconds < 1 { - return "", errors.Errorf("can't grab lock with expiration less than a second") - } - - start := time.Now() - for { - rc := rp.Get() - success, err := rc.Do("SET", fmt.Sprintf("lock:%s", key), value, "EX", seconds, "NX") - rc.Close() - - if err != nil { - return "", errors.Wrapf(err, "error trying to get lock") - } - - if success == "OK" { - break - } - - if time.Since(start) > retry { - return "", nil - } - - time.Sleep(time.Second) - } - - return value, nil -} - -var releaseScript = redis.NewScript(2, ` - -- KEYS: [Key, Value] - if redis.call("get", KEYS[1]) == KEYS[2] then - return redis.call("del", KEYS[1]) - else - return 0 - end -`) - -// ReleaseLock releases the passed in lock, returning any error encountered while doing -// so. It is not considered an error to release a lock that is no longer present -func ReleaseLock(rp *redis.Pool, key string, value string) error { - rc := rp.Get() - defer rc.Close() - - // we use lua here because we only want to release the lock if we own it - _, err := releaseScript.Do(rc, fmt.Sprintf("lock:%s", key), value) - return err -} - -var expireScript = redis.NewScript(3, ` - -- KEYS: [Key, Value, Expiration] - if redis.call("get", KEYS[1]) == KEYS[2] then - return redis.call("expire", KEYS[1], KEYS[3]) - else - return 0 - end -`) - -// ExtendLock extends our lock expiration by the passed in number of seconds -func ExtendLock(rp *redis.Pool, key string, value string, expiration time.Duration) error { - rc := rp.Get() - defer rc.Close() - - // convert our expiration to seconds - seconds := int(expiration / time.Second) - if seconds < 1 { - return errors.Errorf("can't grab lock with expiration less than a second") - } - - // we use lua here because we only want to set the expiration time if we own it - _, err := expireScript.Do(rc, fmt.Sprintf("lock:%s", key), value, seconds) - return err -} - -const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" - -// makeRandom creates a random key of the length passed in -func makeRandom(n int) string { - b := make([]byte, n) - for i := range b { - b[i] = letterBytes[rand.Intn(len(letterBytes))] - } - return string(b) -} diff --git a/utils/locker/locker_test.go b/utils/locker/locker_test.go deleted file mode 100644 index 18cd93464..000000000 --- a/utils/locker/locker_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package locker_test - -import ( - "testing" - "time" - - "github.com/nyaruka/mailroom/testsuite" - "github.com/nyaruka/mailroom/utils/locker" - - "github.com/stretchr/testify/assert" -) - -func TestLocker(t *testing.T) { - _, _, _, rp := testsuite.Get() - - defer testsuite.Reset(testsuite.ResetRedis) - - // acquire a lock, but have it expire in 5 seconds - v1, err := locker.GrabLock(rp, "test", time.Second*5, time.Second) - assert.NoError(t, err) - assert.NotZero(t, v1) - - // try to acquire the same lock, should fail - v2, err := locker.GrabLock(rp, "test", time.Second*5, time.Second) - assert.NoError(t, err) - assert.Zero(t, v2) - - // should succeed if we wait longer - v3, err := locker.GrabLock(rp, "test", time.Second*5, time.Second*5) - assert.NoError(t, err) - assert.NotZero(t, v3) - assert.NotEqual(t, v1, v3) - - // extend the lock - err = locker.ExtendLock(rp, "test", v3, time.Second*10) - assert.NoError(t, err) - - // trying to grab it should fail with a 5 second timeout - v4, err := locker.GrabLock(rp, "test", time.Second*5, time.Second*5) - assert.NoError(t, err) - assert.Zero(t, v4) - - // return the lock - err = locker.ReleaseLock(rp, "test", v3) - assert.NoError(t, err) - - // new grab should work - v5, err := locker.GrabLock(rp, "test", time.Second*5, time.Second*5) - assert.NoError(t, err) - assert.NotZero(t, v5) -}