Skip to content

Commit

Permalink
Merge pull request rapidpro#416 from nyaruka/fail-old-expirations
Browse files Browse the repository at this point in the history
Fail expirations that are no longer the active session
  • Loading branch information
rowanseymour authored Mar 17, 2021
2 parents e5886ce + 3e1ec03 commit 1026e7a
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 8 deletions.
5 changes: 4 additions & 1 deletion core/models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ var exitToSessionStatusMap = map[ExitType]SessionStatus{
ExitInterrupted: SessionStatusInterrupted,
ExitCompleted: SessionStatusCompleted,
ExitExpired: SessionStatusExpired,
ExitFailed: SessionStatusFailed,
}

var exitToRunStatusMap = map[ExitType]RunStatus{
ExitInterrupted: RunStatusInterrupted,
ExitCompleted: RunStatusCompleted,
ExitExpired: RunStatusExpired,
ExitFailed: RunStatusFailed,
}

var keptEvents = map[string]bool{
Expand Down Expand Up @@ -953,7 +955,8 @@ SET
ended_on = $2,
status = $3
WHERE
id = ANY ($1)
id = ANY ($1) AND
status = 'W'
`

// InterruptContactRuns interrupts all runs and sesions that exist for the passed in list of contacts
Expand Down
2 changes: 1 addition & 1 deletion core/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func ResumeFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, oa *models.Org
// if this flow just isn't available anymore, log this error
if err == models.ErrNotFound {
logrus.WithField("contact_uuid", session.Contact().UUID()).WithField("session_id", session.ID()).WithField("flow_id", session.CurrentFlowID()).Error("unable to find flow in resume")
return nil, models.ExitSessions(ctx, db, []models.SessionID{session.ID()}, models.ExitInterrupted, time.Now())
return nil, models.ExitSessions(ctx, db, []models.SessionID{session.ID()}, models.ExitFailed, time.Now())
}
return nil, errors.Wrapf(err, "error loading session flow: %d", session.CurrentFlowID())
}
Expand Down
46 changes: 44 additions & 2 deletions core/tasks/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ func TestMsgEvents(t *testing.T) {
assert.Equal(t, "Hey, how are you?", text)
previous := time.Now()

// and should have interrupted previous session
// and should have failed previous session
testsuite.AssertQueryCount(t, db,
`SELECT count(*) from flows_flowsession where contact_id = $1 and status ='I' and current_flow_id = $2`,
`SELECT count(*) from flows_flowsession where contact_id = $1 and status = 'F' and current_flow_id = $2`,
[]interface{}{models.Org2FredID, models.Org2FavoritesFlowID}, 2,
)

Expand Down Expand Up @@ -406,6 +406,9 @@ func TestTimedEvents(t *testing.T) {

// respond one last time, should be done
{MsgEventType, models.CathyID, models.CathyURN, models.CathyURNID, "done", "Ended", models.TwitterChannelID, models.Org1},

// start our favorite flow again
{MsgEventType, models.CathyID, models.CathyURN, models.CathyURNID, "start", "What is your favorite color?", models.TwitterChannelID, models.Org1},
}

last := time.Now()
Expand Down Expand Up @@ -485,4 +488,43 @@ func TestTimedEvents(t *testing.T) {

last = time.Now()
}

// test the case of a run and session no longer being the most recent but somehow still active, expiration should still work
r, err := db.QueryContext(ctx, `SELECT id, session_id from flows_flowrun WHERE contact_id = $1 and is_active = FALSE order by created_on asc limit 1`, models.CathyID)
assert.NoError(t, err)
defer r.Close()
r.Next()
r.Scan(&runID, &sessionID)

expiration := time.Now()

// set both to be active (this requires us to disable the path change trigger for a bit which asserts flows can't cross back into active status)
db.MustExec(`ALTER TABLE flows_flowrun DISABLE TRIGGER temba_flowrun_path_change`)
db.MustExec(`UPDATE flows_flowrun SET is_active = TRUE, status = 'W', expires_on = $2 WHERE id = $1`, runID, expiration)
db.MustExec(`UPDATE flows_flowsession SET status = 'W' WHERE id = $1`, sessionID)
db.MustExec(`ALTER TABLE flows_flowrun ENABLE TRIGGER temba_flowrun_path_change`)

// try to expire the run
task := newTimedTask(
ExpirationEventType,
models.Org1,
models.CathyID,
sessionID,
runID,
expiration,
)

err = AddHandleTask(rc, models.CathyID, task)
assert.NoError(t, err)

task, err = queue.PopNextTask(rc, queue.HandlerQueue)
assert.NoError(t, err)

err = handleContactEvent(ctx, db, rp, task)
assert.NoError(t, err)

testsuite.AssertQueryCount(t, db, `SELECT count(*) from flows_flowrun WHERE is_active = FALSE AND status = 'F' AND id = $1`, []interface{}{runID}, 1)
testsuite.AssertQueryCount(t, db, `SELECT count(*) from flows_flowsession WHERE status = 'F' AND id = $1`, []interface{}{sessionID}, 1)

testsuite.ResetDB()
}
17 changes: 13 additions & 4 deletions core/tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,12 @@ func handleContactEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, task *
// handleTimedEvent is called for timeout events
func handleTimedEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, eventType string, event *TimedEvent) error {
start := time.Now()
log := logrus.WithField("event_type", eventType).WithField("contact_id", event.OrgID).WithField("session_id", event.SessionID)
log := logrus.WithFields(logrus.Fields{
"event_type": eventType,
"contact_id": event.ContactID,
"run_id": event.RunID,
"session_id": event.SessionID,
})
oa, err := models.GetOrgAssets(ctx, db, event.OrgID)
if err != nil {
return errors.Wrapf(err, "error loading org")
Expand Down Expand Up @@ -256,9 +261,13 @@ func handleTimedEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, eventTyp
return errors.Wrapf(err, "error loading active session for contact")
}

// if we didn't find a session or it is another session, ignore
// if we didn't find a session or it is another session then this flow got interrupted and this is a race, fail it
if session == nil || session.ID() != event.SessionID {
log.Info("ignoring event, couldn't find active session")
log.Error("expiring run with mismatched session, session for run no longer active, failing runs and session")
err = models.ExitSessions(ctx, db, []models.SessionID{event.SessionID}, models.ExitFailed, time.Now())
if err != nil {
return errors.Wrapf(err, "error failing expired runs for session that is no longer active")
}
return nil
}

Expand Down Expand Up @@ -597,7 +606,7 @@ func handleMsgEvent(ctx context.Context, db *sqlx.DB, rp *redis.Pool, event *Msg

// flow this session is in is gone, interrupt our session and reset it
if err == models.ErrNotFound {
err = models.ExitSessions(ctx, db, []models.SessionID{session.ID()}, models.ExitInterrupted, time.Now())
err = models.ExitSessions(ctx, db, []models.SessionID{session.ID()}, models.ExitFailed, time.Now())
session = nil
}

Expand Down

0 comments on commit 1026e7a

Please sign in to comment.