Skip to content

Commit

Permalink
expire runs that have no session, just warn while doing so
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Jul 14, 2020
1 parent ecd2a10 commit d29e150
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 20 deletions.
10 changes: 6 additions & 4 deletions models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,10 +1023,12 @@ func ExpireRunsAndSessions(ctx context.Context, db *sqlx.DB, runIDs []FlowRunID,
return errors.Wrapf(err, "error expiring runs")
}

err = Exec(ctx, "expiring sessions", tx, expireSessionsSQL, pq.Array(sessionIDs))
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error expiring sessions")
if len(sessionIDs) > 0 {
err = Exec(ctx, "expiring sessions", tx, expireSessionsSQL, pq.Array(sessionIDs))
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error expiring sessions")
}
}

err = tx.Commit()
Expand Down
30 changes: 17 additions & 13 deletions tasks/expirations/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/cron"
"github.com/nyaruka/mailroom/tasks/handler"
"github.com/nyaruka/mailroom/marker"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/tasks/handler"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -69,9 +69,14 @@ func expireRuns(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName strin
count++

// no parent id? we can add this to our batch
if expiration.ParentUUID == nil {
if expiration.ParentUUID == nil || expiration.SessionID == nil {
expiredRuns = append(expiredRuns, expiration.RunID)
expiredSessions = append(expiredSessions, expiration.SessionID)

if expiration.SessionID != nil {
expiredSessions = append(expiredSessions, *expiration.SessionID)
} else {
log.WithField("run_id", expiration.RunID).WithField("org_id", expiration.OrgID).Warn("expiring active run with no session")
}

// batch is full? commit it
if len(expiredRuns) == expireBatchSize {
Expand Down Expand Up @@ -99,7 +104,7 @@ func expireRuns(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName strin
}

// ok, queue this task
task := handler.NewExpirationTask(expiration.OrgID, expiration.ContactID, expiration.SessionID, expiration.RunID, expiration.ExpiresOn)
task := handler.NewExpirationTask(expiration.OrgID, expiration.ContactID, *expiration.SessionID, expiration.RunID, expiration.ExpiresOn)
err = handler.AddHandleTask(rc, expiration.ContactID, task)
if err != nil {
return errors.Wrapf(err, "error adding new expiration task")
Expand Down Expand Up @@ -139,19 +144,18 @@ const selectExpiredRunsSQL = `
WHERE
fr.is_active = TRUE AND
fr.expires_on < NOW() AND
fr.connection_id IS NULL AND
fr.session_id IS NOT NULL
fr.connection_id IS NULL
ORDER BY
expires_on ASC
LIMIT 25000
`

type RunExpiration struct {
OrgID models.OrgID `db:"org_id"`
FlowID models.FlowID `db:"flow_id"`
ContactID models.ContactID `db:"contact_id"`
RunID models.FlowRunID `db:"run_id"`
ParentUUID *flows.RunUUID `db:"parent_uuid"`
SessionID models.SessionID `db:"session_id"`
ExpiresOn time.Time `db:"expires_on"`
OrgID models.OrgID `db:"org_id"`
FlowID models.FlowID `db:"flow_id"`
ContactID models.ContactID `db:"contact_id"`
RunID models.FlowRunID `db:"run_id"`
ParentUUID *flows.RunUUID `db:"parent_uuid"`
SessionID *models.SessionID `db:"session_id"`
ExpiresOn time.Time `db:"expires_on"`
}
10 changes: 7 additions & 3 deletions tasks/expirations/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestExpirations(t *testing.T) {
err = db.Get(&s2, `INSERT INTO flows_flowsession(uuid, org_id, status, responded, contact_id, created_on) VALUES ($1, 1, 'W', TRUE, $2, NOW()) RETURNING id;`, uuids.New(), models.GeorgeID)
assert.NoError(t, err)

var r1, r2, r3 models.FlowRunID
var r1, r2, r3, r4 models.FlowRunID

// simple run, no parent
err = db.Get(&r1, `INSERT INTO flows_flowrun(session_id, status, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'f240ab19-ed5d-4b51-b934-f2fbb9f8e5ad', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW()) RETURNING id;`, s1, models.RunStatusWaiting, models.CathyID, models.FavoritesFlowID)
Expand All @@ -55,9 +55,13 @@ func TestExpirations(t *testing.T) {
err = db.Get(&r3, `INSERT INTO flows_flowrun(session_id, status, parent_uuid, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'c4126b59-7a61-4ed5-a2da-c7857580355b', 'a87b7079-5a3c-4e5f-8a6a-62084807c522', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW()) RETURNING id;`, s2, models.RunStatusWaiting, models.GeorgeID, models.FavoritesFlowID)
assert.NoError(t, err)

// run with no session
err = db.Get(&r4, `INSERT INTO flows_flowrun(session_id, status, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'd64fac33-933f-44b4-a6e4-53283d07a609', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW()) RETURNING id;`, nil, models.RunStatusWaiting, models.CathyID, models.FavoritesFlowID)
assert.NoError(t, err)

time.Sleep(10 * time.Millisecond)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.CathyID}, 1)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.CathyID}, 2)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'X' AND contact_id = $1;`, []interface{}{models.CathyID}, 0)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.GeorgeID}, 2)
Expand All @@ -71,7 +75,7 @@ func TestExpirations(t *testing.T) {
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.CathyID}, 0)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'X' AND contact_id = $1;`, []interface{}{models.CathyID}, 1)

// should still have two active runs for contact 43 as it needs to continue
// should still have two active runs for George as it needs to continue
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.GeorgeID}, 2)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'X' AND contact_id = $1;`, []interface{}{models.GeorgeID}, 0)

Expand Down

0 comments on commit d29e150

Please sign in to comment.