Skip to content

Commit

Permalink
Merge pull request rapidpro#600 from nyaruka/locker_cleanup
Browse files Browse the repository at this point in the history
Replace last usages of old locker code
  • Loading branch information
rowanseymour authored Mar 18, 2022
2 parents a4478a9 + c00eee5 commit 5456723
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 170 deletions.
8 changes: 5 additions & 3 deletions core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/nyaruka/goflow/excellent/types"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/null"
"github.com/nyaruka/redisx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -1341,9 +1342,10 @@ func (i *ContactID) Scan(value interface{}) error {
return null.ScanInt(value, (*null.Int)(i))
}

// ContactLock returns the lock key for a particular contact, used with locker
func ContactLock(orgID OrgID, contactID ContactID) string {
return fmt.Sprintf("c:%d:%d", orgID, contactID)
// GetContactLocker returns the locker for a particular contact
func GetContactLocker(orgID OrgID, contactID ContactID) *redisx.Locker {
key := fmt.Sprintf("lock:c:%d:%d", orgID, contactID)
return redisx.NewLocker(key, time.Minute*5)
}

// UpdateContactModifiedBy updates modified by the passed user id on the passed in contacts
Expand Down
19 changes: 10 additions & 9 deletions core/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/locker"
"github.com/nyaruka/redisx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -461,7 +461,7 @@ func StartFlow(
start := time.Now()

// map of locks we've released
released := make(map[string]bool)
released := make(map[*redisx.Locker]bool)

for len(remaining) > 0 && time.Since(start) < time.Minute*5 {
locked := make([]models.ContactID, 0, len(remaining))
Expand All @@ -470,8 +470,9 @@ func StartFlow(

// try up to a second to get a lock for a contact
for _, contactID := range remaining {
lockID := models.ContactLock(oa.OrgID(), contactID)
lock, err := locker.GrabLock(rt.RP, lockID, time.Minute*5, time.Second)
locker := models.GetContactLocker(oa.OrgID(), contactID)

lock, err := locker.Grab(rt.RP, time.Second)
if err != nil {
return nil, errors.Wrapf(err, "error attempting to grab lock")
}
Expand All @@ -484,8 +485,8 @@ func StartFlow(

// defer unlocking if we exit due to error
defer func() {
if !released[lockID] {
locker.ReleaseLock(rt.RP, lockID, lock)
if !released[locker] {
locker.Release(rt.RP, lock)
}
}()
}
Expand Down Expand Up @@ -517,9 +518,9 @@ func StartFlow(

// release all our locks
for i := range locked {
lockID := models.ContactLock(oa.OrgID(), locked[i])
locker.ReleaseLock(rt.RP, lockID, locks[i])
released[lockID] = true
locker := models.GetContactLocker(oa.OrgID(), locked[i])
locker.Release(rt.RP, locks[i])
released[locker] = true
}

// skipped are now our remaining
Expand Down
8 changes: 4 additions & 4 deletions core/tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/nyaruka/mailroom/core/queue"
"github.com/nyaruka/mailroom/core/runner"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/locker"
"github.com/nyaruka/null"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -61,8 +60,9 @@ func handleContactEvent(ctx context.Context, rt *runtime.Runtime, task *queue.Ta
}

// acquire the lock for this contact
lockID := models.ContactLock(models.OrgID(task.OrgID), eventTask.ContactID)
lock, err := locker.GrabLock(rt.RP, lockID, time.Minute*5, time.Second*10)
locker := models.GetContactLocker(models.OrgID(task.OrgID), eventTask.ContactID)

lock, err := locker.Grab(rt.RP, time.Second*10)
if err != nil {
return errors.Wrapf(err, "error acquiring lock for contact %d", eventTask.ContactID)
}
Expand All @@ -81,7 +81,7 @@ func handleContactEvent(ctx context.Context, rt *runtime.Runtime, task *queue.Ta
}).Info("failed to get lock for contact, requeued and skipping")
return nil
}
defer locker.ReleaseLock(rt.RP, lockID, lock)
defer locker.Release(rt.RP, lock)

// read all the events for this contact, one by one
contactQ := fmt.Sprintf("c:%d:%d", task.OrgID, eventTask.ContactID)
Expand Down
103 changes: 0 additions & 103 deletions utils/locker/locker.go

This file was deleted.

51 changes: 0 additions & 51 deletions utils/locker/locker_test.go

This file was deleted.

0 comments on commit 5456723

Please sign in to comment.