From d29e1509141abe755f0feb6bd551bfb09767a575 Mon Sep 17 00:00:00 2001 From: Nic Pottier Date: Tue, 14 Jul 2020 07:38:47 -0700 Subject: [PATCH] expire runs that have no session, just warn while doing so --- models/runs.go | 10 ++++++---- tasks/expirations/cron.go | 30 +++++++++++++++++------------- tasks/expirations/cron_test.go | 10 +++++++--- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/models/runs.go b/models/runs.go index c119878d7..4681396a4 100644 --- a/models/runs.go +++ b/models/runs.go @@ -1023,10 +1023,12 @@ func ExpireRunsAndSessions(ctx context.Context, db *sqlx.DB, runIDs []FlowRunID, return errors.Wrapf(err, "error expiring runs") } - err = Exec(ctx, "expiring sessions", tx, expireSessionsSQL, pq.Array(sessionIDs)) - if err != nil { - tx.Rollback() - return errors.Wrapf(err, "error expiring sessions") + if len(sessionIDs) > 0 { + err = Exec(ctx, "expiring sessions", tx, expireSessionsSQL, pq.Array(sessionIDs)) + if err != nil { + tx.Rollback() + return errors.Wrapf(err, "error expiring sessions") + } } err = tx.Commit() diff --git a/tasks/expirations/cron.go b/tasks/expirations/cron.go index cadec96df..ee72bb94a 100644 --- a/tasks/expirations/cron.go +++ b/tasks/expirations/cron.go @@ -10,9 +10,9 @@ import ( "github.com/nyaruka/goflow/flows" "github.com/nyaruka/mailroom" "github.com/nyaruka/mailroom/cron" - "github.com/nyaruka/mailroom/tasks/handler" "github.com/nyaruka/mailroom/marker" "github.com/nyaruka/mailroom/models" + "github.com/nyaruka/mailroom/tasks/handler" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -69,9 +69,14 @@ func expireRuns(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName strin count++ // no parent id? we can add this to our batch - if expiration.ParentUUID == nil { + if expiration.ParentUUID == nil || expiration.SessionID == nil { expiredRuns = append(expiredRuns, expiration.RunID) - expiredSessions = append(expiredSessions, expiration.SessionID) + + if expiration.SessionID != nil { + expiredSessions = append(expiredSessions, *expiration.SessionID) + } else { + log.WithField("run_id", expiration.RunID).WithField("org_id", expiration.OrgID).Warn("expiring active run with no session") + } // batch is full? commit it if len(expiredRuns) == expireBatchSize { @@ -99,7 +104,7 @@ func expireRuns(ctx context.Context, db *sqlx.DB, rp *redis.Pool, lockName strin } // ok, queue this task - task := handler.NewExpirationTask(expiration.OrgID, expiration.ContactID, expiration.SessionID, expiration.RunID, expiration.ExpiresOn) + task := handler.NewExpirationTask(expiration.OrgID, expiration.ContactID, *expiration.SessionID, expiration.RunID, expiration.ExpiresOn) err = handler.AddHandleTask(rc, expiration.ContactID, task) if err != nil { return errors.Wrapf(err, "error adding new expiration task") @@ -139,19 +144,18 @@ const selectExpiredRunsSQL = ` WHERE fr.is_active = TRUE AND fr.expires_on < NOW() AND - fr.connection_id IS NULL AND - fr.session_id IS NOT NULL + fr.connection_id IS NULL ORDER BY expires_on ASC LIMIT 25000 ` type RunExpiration struct { - OrgID models.OrgID `db:"org_id"` - FlowID models.FlowID `db:"flow_id"` - ContactID models.ContactID `db:"contact_id"` - RunID models.FlowRunID `db:"run_id"` - ParentUUID *flows.RunUUID `db:"parent_uuid"` - SessionID models.SessionID `db:"session_id"` - ExpiresOn time.Time `db:"expires_on"` + OrgID models.OrgID `db:"org_id"` + FlowID models.FlowID `db:"flow_id"` + ContactID models.ContactID `db:"contact_id"` + RunID models.FlowRunID `db:"run_id"` + ParentUUID *flows.RunUUID `db:"parent_uuid"` + SessionID *models.SessionID `db:"session_id"` + ExpiresOn time.Time `db:"expires_on"` } diff --git a/tasks/expirations/cron_test.go b/tasks/expirations/cron_test.go index bcf33d9eb..91567576c 100644 --- a/tasks/expirations/cron_test.go +++ b/tasks/expirations/cron_test.go @@ -41,7 +41,7 @@ func TestExpirations(t *testing.T) { err = db.Get(&s2, `INSERT INTO flows_flowsession(uuid, org_id, status, responded, contact_id, created_on) VALUES ($1, 1, 'W', TRUE, $2, NOW()) RETURNING id;`, uuids.New(), models.GeorgeID) assert.NoError(t, err) - var r1, r2, r3 models.FlowRunID + var r1, r2, r3, r4 models.FlowRunID // simple run, no parent err = db.Get(&r1, `INSERT INTO flows_flowrun(session_id, status, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'f240ab19-ed5d-4b51-b934-f2fbb9f8e5ad', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW()) RETURNING id;`, s1, models.RunStatusWaiting, models.CathyID, models.FavoritesFlowID) @@ -55,9 +55,13 @@ func TestExpirations(t *testing.T) { err = db.Get(&r3, `INSERT INTO flows_flowrun(session_id, status, parent_uuid, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'c4126b59-7a61-4ed5-a2da-c7857580355b', 'a87b7079-5a3c-4e5f-8a6a-62084807c522', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW()) RETURNING id;`, s2, models.RunStatusWaiting, models.GeorgeID, models.FavoritesFlowID) assert.NoError(t, err) + // run with no session + err = db.Get(&r4, `INSERT INTO flows_flowrun(session_id, status, uuid, is_active, created_on, modified_on, responded, contact_id, flow_id, org_id, expires_on) VALUES($1, $2, 'd64fac33-933f-44b4-a6e4-53283d07a609', TRUE, NOW(), NOW(), TRUE, $3, $4, 1, NOW()) RETURNING id;`, nil, models.RunStatusWaiting, models.CathyID, models.FavoritesFlowID) + assert.NoError(t, err) + time.Sleep(10 * time.Millisecond) - testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.CathyID}, 1) + testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.CathyID}, 2) testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'X' AND contact_id = $1;`, []interface{}{models.CathyID}, 0) testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.GeorgeID}, 2) @@ -71,7 +75,7 @@ func TestExpirations(t *testing.T) { testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.CathyID}, 0) testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'X' AND contact_id = $1;`, []interface{}{models.CathyID}, 1) - // should still have two active runs for contact 43 as it needs to continue + // should still have two active runs for George as it needs to continue testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun WHERE is_active = TRUE AND contact_id = $1;`, []interface{}{models.GeorgeID}, 2) testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'X' AND contact_id = $1;`, []interface{}{models.GeorgeID}, 0)