Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expire runs that have no session, just warn while doing so #317

Merged
merged 1 commit into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,10 +1023,12 @@ func ExpireRunsAndSessions(ctx context.Context, db *sqlx.DB, runIDs []FlowRunID,
return errors.Wrapf(err, "error expiring runs")
}

err = Exec(ctx, "expiring sessions", tx, expireSessionsSQL, pq.Array(sessionIDs))
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error expiring sessions")
if len(sessionIDs) > 0 {
err = Exec(ctx, "expiring sessions", tx, expireSessionsSQL, pq.Array(sessionIDs))
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error expiring sessions")
}
}

err = tx.Commit()
Expand Down
30 changes: 17 additions & 13 deletions tasks/expirations/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/cron"
"github.com/nyaruka/mailroom/tasks/handler"
"github.com/nyaruka/mailroom/marker"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/tasks/handler"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -69,9 +69,14 @@ func expireRuns(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName strin
count++

// no parent id? we can add this to our batch
if expiration.ParentUUID == nil {
if expiration.ParentUUID == nil || expiration.SessionID == nil {
expiredRuns = append(expiredRuns, expiration.RunID)
expiredSessions = append(expiredSessions, expiration.SessionID)

if expiration.SessionID != nil {
expiredSessions = append(expiredSessions, *expiration.SessionID)
} else {
log.WithField("run_id", expiration.RunID).WithField("org_id", expiration.OrgID).Warn("expiring active run with no session")
}

// batch is full? commit it
if len(expiredRuns) == expireBatchSize {
Expand Down Expand Up @@ -99,7 +104,7 @@ func expireRuns(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName strin
}

// ok, queue this task
task := handler.NewExpirationTask(expiration.OrgID, expiration.ContactID, expiration.SessionID, expiration.RunID, expiration.ExpiresOn)
task := handler.NewExpirationTask(expiration.OrgID, expiration.ContactID, *expiration.SessionID, expiration.RunID, expiration.ExpiresOn)
err = handler.AddHandleTask(rc, expiration.ContactID, task)
if err != nil {
return errors.Wrapf(err, "error adding new expiration task")
Expand Down Expand Up @@ -139,19 +144,18 @@ const selectExpiredRunsSQL = `
WHERE
fr.is_active = TRUE AND
fr.expires_on < NOW() AND
fr.connection_id IS NULL AND
fr.session_id IS NOT NULL
fr.connection_id IS NULL
ORDER BY
expires_on ASC
LIMIT 25000
`

type RunExpiration struct {
OrgID models.OrgID `db:"org_id"`
FlowID models.FlowID `db:"flow_id"`
ContactID models.ContactID `db:"contact_id"`
RunID models.FlowRunID `db:"run_id"`
ParentUUID *flows.RunUUID `db:"parent_uuid"`
SessionID models.SessionID `db:"session_id"`
ExpiresOn time.Time `db:"expires_on"`
OrgID models.OrgID `db:"org_id"`
FlowID models.FlowID `db:"flow_id"`
ContactID models.ContactID `db:"contact_id"`
RunID models.FlowRunID `db:"run_id"`
ParentUUID *flows.RunUUID `db:"parent_uuid"`
SessionID *models.SessionID `db:"session_id"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we generally model null ids like NilSessionID = SessionID(0) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do both in various places. I'm not positive what the right thing is but I've kind of settled that if the TYPE can be null normally, then having a nullable type derived from null.Int makes sense, but if not then selecting to a pointer is more correct. IE, within Mailroom are we ever "setting" null.

In this case SessionID being nil is very much the exception and this is the only places that looks at that possibility so using a pointer made sense to me. We get to keep the "stricter" type elsewhere. (and have things fail if we try to select a NULL into that stricter type)

Happy to change if you disagree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense 👍

ExpiresOn time.Time `db:"expires_on"`
}
10 changes: 7 additions & 3 deletions tasks/expirations/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestExpirations(t *testing.T) {
err = db.Get(&s2, `INSERT INTO flows_flowsession(uuid, org_id, status, responded, contact_id, created_on) VALUES ($1, 1, 'W', TRUE, $2, NOW()) RETURNING id;`, uuids.New(), models.GeorgeID)
assert.NoError(t, err)

var r1, r2, r3 models.FlowRunID
var r1, r2, r3, r4 models.FlowRunID

// simple run, no parent
err = db.Get(&r1, `INSERT INTO flows_flowrun(session_id, status, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'f240ab19-ed5d-4b51-b934-f2fbb9f8e5ad', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW()) RETURNING id;`, s1, models.RunStatusWaiting, models.CathyID, models.FavoritesFlowID)
Expand All @@ -55,9 +55,13 @@ func TestExpirations(t *testing.T) {
err = db.Get(&r3, `INSERT INTO flows_flowrun(session_id, status, parent_uuid, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'c4126b59-7a61-4ed5-a2da-c7857580355b', 'a87b7079-5a3c-4e5f-8a6a-62084807c522', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW()) RETURNING id;`, s2, models.RunStatusWaiting, models.GeorgeID, models.FavoritesFlowID)
assert.NoError(t, err)

// run with no session
err = db.Get(&r4, `INSERT INTO flows_flowrun(session_id, status, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'd64fac33-933f-44b4-a6e4-53283d07a609', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW()) RETURNING id;`, nil, models.RunStatusWaiting, models.CathyID, models.FavoritesFlowID)
assert.NoError(t, err)

time.Sleep(10 * time.Millisecond)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.CathyID}, 1)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.CathyID}, 2)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'X' AND contact_id = $1;`, []interface{}{models.CathyID}, 0)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.GeorgeID}, 2)
Expand All @@ -71,7 +75,7 @@ func TestExpirations(t *testing.T) {
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.CathyID}, 0)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'X' AND contact_id = $1;`, []interface{}{models.CathyID}, 1)

// should still have two active runs for contact 43 as it needs to continue
// should still have two active runs for George as it needs to continue
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.GeorgeID}, 2)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'X' AND contact_id = $1;`, []interface{}{models.GeorgeID}, 0)

Expand Down