Skip to content

Commit

Permalink
add tests for schedule cron
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Sep 23, 2019
1 parent 3f3d96d commit 1e8f795
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 44 deletions.
1 change: 1 addition & 0 deletions cmd/mailroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "github.com/nyaruka/mailroom/tasks/campaigns"
_ "github.com/nyaruka/mailroom/tasks/expirations"
_ "github.com/nyaruka/mailroom/tasks/interrupts"
_ "github.com/nyaruka/mailroom/tasks/schedules"
_ "github.com/nyaruka/mailroom/tasks/starts"
_ "github.com/nyaruka/mailroom/tasks/stats"
_ "github.com/nyaruka/mailroom/tasks/timeouts"
Expand Down
6 changes: 3 additions & 3 deletions models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,12 +518,12 @@ func NewBroadcast(
bcast := &Broadcast{}
bcast.b.OrgID = orgID
bcast.b.BroadcastID = id
bcast.b.Translations = translations
bcast.b.TemplateState = state
bcast.b.BaseLanguage = baseLanguage
bcast.b.GroupIDs = groupIDs
bcast.b.ContactIDs = contactIDs
bcast.b.URNs = urns
bcast.b.Translations = translations
bcast.b.ContactIDs = contactIDs
bcast.b.GroupIDs = groupIDs

return bcast
}
Expand Down
15 changes: 8 additions & 7 deletions models/schedules.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (s *Schedule) UpdateFires(ctx context.Context, db *sqlx.DB, last time.Time,

// GetNextFire returns the next fire for this schedule (if any)
func (s *Schedule) GetNextFire(tz *time.Location, now time.Time) (*time.Time, error) {
// Never repeats? no next fire
if s.s.RepeatPeriod == RepeatPeriodNever {
return nil, nil
}

// should have hour and minute on everything else
if s.s.HourOfDay == nil {
return nil, errors.Errorf("schedule %d has no repeat_hour_of_day set", s.s.ID)
Expand All @@ -104,12 +109,6 @@ func (s *Schedule) GetNextFire(tz *time.Location, now time.Time) (*time.Time, er

switch s.s.RepeatPeriod {

case RepeatPeriodNever:
if !next.After(now) {
return nil, nil
}
return &next, nil

case RepeatPeriodDaily:
for !next.After(now) {
next = next.AddDate(0, 0, 1)
Expand Down Expand Up @@ -244,7 +243,9 @@ SELECT ROW_TO_JSON(s) FROM (SELECT
triggers_trigger t JOIN
flows_flow f on t.flow_id = f.id
WHERE
t.schedule_id = s.id
t.schedule_id = s.id AND
t.is_active = TRUE AND
t.is_archived = FALSE
) st) as start
FROM
schedules_schedule s JOIN
Expand Down
9 changes: 7 additions & 2 deletions models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,13 @@ type startGroup struct {
// InsertFlowStarts inserts all the passed in starts
func InsertFlowStarts(ctx context.Context, db Queryer, starts []*FlowStart) error {
is := make([]interface{}, len(starts))
for i := range starts {
is[i] = &starts[i].s
for i, s := range starts {
// populate UUID if needbe
if s.s.UUID == "" {
s.s.UUID = uuids.New()
}

is[i] = &s.s
}

// insert our starts
Expand Down
53 changes: 21 additions & 32 deletions tasks/schedules/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ const (
)

func init() {
mailroom.AddInitFunction(StartFireSchedules)
mailroom.AddInitFunction(StartCheckSchedules)
}

// StartFireSchedules starts our cron job of firing schedules every minute
func StartFireSchedules(mr *mailroom.Mailroom) error {
// StartCheckSchedules starts our cron job of firing schedules every minute
func StartCheckSchedules(mr *mailroom.Mailroom) error {
cron.StartCron(mr.Quit, mr.RP, scheduleLock, time.Minute*5,
func(lockName string, lockValue string) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
return fireSchedules(ctx, mr.DB, mr.RP, lockName, lockValue)
return checkSchedules(ctx, mr.DB, mr.RP, lockName, lockValue)
},
)
return nil
}

// fireSchedules looks up any expired schedules and fires them, setting the next fire as needed
func fireSchedules(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName string, lockValue string) error {
// checkSchedules looks up any expired schedules and fires them, setting the next fire as needed
func checkSchedules(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName string, lockValue string) error {
log := logrus.WithField("comp", "schedules_cron").WithField("lock", lockValue)
start := time.Now()

Expand All @@ -51,6 +51,7 @@ func fireSchedules(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName st
// for each unfired schedule
broadcasts := 0
triggers := 0
noops := 0

for _, s := range unfired {
log := log.WithField("schedule_id", s.ID())
Expand All @@ -68,22 +69,13 @@ func fireSchedules(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName st
// clone our broadcast, our schedule broadcast is just a template
bcast := models.CloneBroadcast(s.Broadcast())

taskQ := queue.HandlerQueue
priority := queue.DefaultPriority

// if we are starting groups, queue to our batch queue instead, but with high priority
if len(bcast.GroupIDs()) > 0 {
taskQ = queue.BatchQueue
priority = queue.HighPriority
}

// add our task to send this broadcast
err = queue.AddTask(rc, taskQ, queue.SendBroadcast, int(bcast.OrgID()), bcast, priority)
err = queue.AddTask(rc, queue.BatchQueue, queue.SendBroadcast, int(bcast.OrgID()), bcast, queue.HighPriority)
if err != nil {
log.WithError(err).Error("error firing scheduled broadcast")
continue
}
broadcasts += 1
broadcasts++

} else if s.FlowStart() != nil {
start := s.FlowStart()
Expand All @@ -95,25 +87,16 @@ func fireSchedules(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName st
continue
}

// and queue it
taskQ := queue.HandlerQueue
priority := queue.DefaultPriority

// if we are starting groups, queue to our batch queue instead, but with high priority
if len(start.GroupIDs()) > 0 {
taskQ = queue.BatchQueue
priority = queue.HighPriority
}

err = queue.AddTask(rc, taskQ, queue.StartFlow, int(start.OrgID()), start, priority)
// add our flow start task
err = queue.AddTask(rc, queue.BatchQueue, queue.StartFlow, int(start.OrgID()), start, queue.HighPriority)
if err != nil {
log.WithError(err).Error("error firing scheduled trigger")
}

triggers += 1
triggers++
} else {
log.Error("schedule found with no associated broadcast or trigger")
continue
log.Info("schedule found with no associated active broadcast or trigger, ignoring")
noops++
}

// calculate the next fire and update it
Expand All @@ -128,7 +111,13 @@ func fireSchedules(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName st
log.WithError(err).Error("error updating next fire for schedule")
}
}
log.WithField("broadcasts", broadcasts).WithField("triggers", triggers).WithField("elapsed", time.Since(start)).Info("fired schedules")

log.WithFields(logrus.Fields{
"broadcasts": broadcasts,
"triggers": triggers,
"noops": noops,
"elapsed": time.Since(start),
}).Info("fired schedules")

return nil
}
110 changes: 110 additions & 0 deletions tasks/schedules/cron_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package schedules

import (
"testing"

"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/queue"
"github.com/nyaruka/mailroom/testsuite"
"github.com/stretchr/testify/assert"
)

func TestCheckSchedules(t *testing.T) {
testsuite.Reset()
ctx := testsuite.CTX()
rp := testsuite.RP()

rc := rp.Get()
defer rc.Close()

// add a schedule and tie a broadcast to it
db := testsuite.DB()
var s1 models.ScheduleID
err := db.Get(
&s1,
`INSERT INTO schedules_schedule(is_active, repeat_period, created_on, modified_on, next_fire, created_by_id, modified_by_id, org_id)
VALUES(TRUE, 'O', NOW(), NOW(), NOW()- INTERVAL '1 DAY', 1, 1, $1) RETURNING id`,
models.Org1,
)
assert.NoError(t, err)
var b1 models.BroadcastID
err = db.Get(
&b1,
`INSERT INTO msgs_broadcast(status, text, base_language, is_active, created_on, modified_on, send_all, created_by_id, modified_by_id, org_id, schedule_id)
VALUES('P', hstore(ARRAY['eng','Test message', 'fra', 'Un Message']), 'eng', TRUE, NOW(), NOW(), TRUE, 1, 1, $1, $2) RETURNING id`,
models.Org1, s1,
)
assert.NoError(t, err)

// add a few contacts to the broadcast
db.MustExec(`INSERT INTO msgs_broadcast_contacts(broadcast_id, contact_id) VALUES($1, $2),($1, $3)`, b1, models.CathyID, models.GeorgeID)

// and a group
db.MustExec(`INSERT INTO msgs_broadcast_groups(broadcast_id, contactgroup_id) VALUES($1, $2)`, b1, models.DoctorsGroupID)

// add another and tie a trigger to it
var s2 models.ScheduleID
err = db.Get(
&s2,
`INSERT INTO schedules_schedule(is_active, repeat_period, created_on, modified_on, next_fire, created_by_id, modified_by_id, org_id)
VALUES(TRUE, 'O', NOW(), NOW(), NOW()- INTERVAL '2 DAY', 1, 1, $1) RETURNING id`,
models.Org1,
)
assert.NoError(t, err)
var t1 models.TriggerID
err = db.Get(
&t1,
`INSERT INTO triggers_trigger(is_active, created_on, modified_on, is_archived, trigger_type, created_by_id, modified_by_id, org_id, flow_id, schedule_id)
VALUES(TRUE, NOW(), NOW(), FALSE, 'S', 1, 1, $1, $2, $3) RETURNING id`,
models.Org1, models.FavoritesFlowID, s2,
)
assert.NoError(t, err)

// add a few contacts to the trigger
db.MustExec(`INSERT INTO triggers_trigger_contacts(trigger_id, contact_id) VALUES($1, $2),($1, $3)`, t1, models.CathyID, models.GeorgeID)

// and a group
db.MustExec(`INSERT INTO triggers_trigger_groups(trigger_id, contactgroup_id) VALUES($1, $2)`, t1, models.DoctorsGroupID)

var s3 models.ScheduleID
err = db.Get(
&s3,
`INSERT INTO schedules_schedule(is_active, repeat_period, created_on, modified_on, next_fire, created_by_id, modified_by_id, org_id)
VALUES(TRUE, 'O', NOW(), NOW(), NOW()- INTERVAL '3 DAY', 1, 1, $1) RETURNING id`,
models.Org1,
)
assert.NoError(t, err)

// run our task
err = checkSchedules(ctx, db, rp, "lock", "lock")
assert.NoError(t, err)

// should have one flow start added to our DB ready to go
testsuite.AssertQueryCount(t, db,
`SELECT count(*) FROM flows_flowstart WHERE flow_id = $1 AND status = 'P';`,
[]interface{}{models.FavoritesFlowID}, 1)

// we shouldn't have any pending schedules since there were all one time fires, but all should have last fire
testsuite.AssertQueryCount(t, db,
`SELECT count(*) FROM schedules_schedule WHERE next_fire IS NULL and last_fire < NOW();`,
nil, 3)

// check the tasks created
task, err := queue.PopNextTask(rc, queue.BatchQueue)

// first should be the flow start
assert.NoError(t, err)
assert.NotNil(t, task)
assert.Equal(t, queue.StartFlow, task.Type)

// then the broadacast
task, err = queue.PopNextTask(rc, queue.BatchQueue)
assert.NoError(t, err)
assert.NotNil(t, task)
assert.Equal(t, queue.SendBroadcast, task.Type)

// nothing more
task, err = queue.PopNextTask(rc, queue.BatchQueue)
assert.NoError(t, err)
assert.Nil(t, task)
}

0 comments on commit 1e8f795

Please sign in to comment.