Skip to content

Commit

Permalink
Clear session timeout if timeout resume rejected by wait
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Mar 29, 2022
1 parent 25438bb commit 460cb9f
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 11 deletions.
2 changes: 1 addition & 1 deletion core/handlers/msg_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func handlePreMsgCreated(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx,
}

// everybody else gets their timeout cleared, will be set by courier
scene.Session().ClearTimeoutOn()
scene.Session().ClearWaitTimeout(ctx, nil)

return nil
}
Expand Down
14 changes: 13 additions & 1 deletion core/models/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ func (s *Session) WaitStartedOn() *time.Time { return s.s.WaitStartedOn
func (s *Session) WaitTimeoutOn() *time.Time { return s.s.WaitTimeoutOn }
func (s *Session) WaitExpiresOn() *time.Time { return s.s.WaitExpiresOn }
func (s *Session) WaitResumeOnExpire() bool { return s.s.WaitResumeOnExpire }
func (s *Session) ClearTimeoutOn() { s.s.WaitTimeoutOn = nil }
func (s *Session) CurrentFlowID() FlowID { return s.s.CurrentFlowID }
func (s *Session) ConnectionID() *ConnectionID { return s.s.ConnectionID }
func (s *Session) IncomingMsgID() MsgID { return s.incomingMsgID }
Expand Down Expand Up @@ -467,6 +466,19 @@ func (s *Session) Update(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx,
return nil
}

// ClearWaitTimeout clears the timeout on the wait on this session and is used if the engine tells us
// that the flow no longer has a timeout on that wait. It can be called without updating the session
// in the database which is used when handling msg_created events before session is updated anyway.
func (s *Session) ClearWaitTimeout(ctx context.Context, db *sqlx.DB) error {
s.s.WaitTimeoutOn = nil

if db != nil {
_, err := db.ExecContext(ctx, `UPDATE flows_flowsession SET timeout_on = NULL WHERE id = $1`, s.ID())
return errors.Wrap(err, "error clearing wait timeout")
}
return nil
}

// MarshalJSON is our custom marshaller so that our inner struct get output
func (s *Session) MarshalJSON() ([]byte, error) {
return json.Marshal(s.s)
Expand Down
28 changes: 28 additions & 0 deletions core/models/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,34 @@ func TestGetSessionWaitExpiresOn(t *testing.T) {
assert.Nil(t, s2Actual)
}

func TestClearWaitTimeout(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()

defer testsuite.Reset(testsuite.ResetData)

oa := testdata.Org1.Load(rt)

_, cathy := testdata.Cathy.Load(db, oa)

expiresOn := time.Now().Add(time.Hour)
timeoutOn := time.Now().Add(time.Minute)
testdata.InsertWaitingSession(db, testdata.Org1, testdata.Cathy, models.FlowTypeMessaging, testdata.Favorites, models.NilConnectionID, time.Now(), expiresOn, true, &timeoutOn)

session, err := models.FindWaitingSessionForContact(ctx, db, nil, oa, models.FlowTypeMessaging, cathy)
require.NoError(t, err)

// can be called without db connection to clear without updating db
session.ClearWaitTimeout(ctx, nil)
assert.Nil(t, session.WaitTimeoutOn())
assert.NotNil(t, session.WaitExpiresOn()) // unaffected

// and called with one to clear in the database as well
session.ClearWaitTimeout(ctx, db)
assert.Nil(t, session.WaitTimeoutOn())

assertdb.Query(t, db, `SELECT timeout_on FROM flows_flowsession WHERE id = $1`, session.ID()).Returns(nil)
}

func insertSessionAndRun(db *sqlx.DB, contact *testdata.Contact, sessionType models.FlowType, status models.SessionStatus, flow *testdata.Flow, connID models.ConnectionID) (models.SessionID, models.FlowRunID) {
// create session and add a run with same status
sessionID := testdata.InsertFlowSession(db, testdata.Org1, contact, sessionType, status, flow, connID)
Expand Down
18 changes: 13 additions & 5 deletions core/tasks/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,9 @@ func TestTimedEvents(t *testing.T) {

defer testsuite.Reset(testsuite.ResetAll)

// start to start our favorites flow
// create some keyword triggers
testdata.InsertKeywordTrigger(db, testdata.Org1, testdata.Favorites, "start", models.MatchOnly, nil, nil)
testdata.InsertKeywordTrigger(db, testdata.Org1, testdata.PickANumber, "pick", models.MatchOnly, nil, nil)

tcs := []struct {
EventType string
Expand Down Expand Up @@ -578,6 +579,12 @@ func TestTimedEvents(t *testing.T) {

// 10: timeout on the color question
{handler.TimeoutEventType, testdata.Cathy, "", "Sorry you can't participate right now, I'll try again later.", testdata.TwitterChannel.ID, testdata.Org1.ID},

// 11: start the pick a number flow
{handler.MsgEventType, testdata.Cathy, "pick", "Pick a number between 1-10.", testdata.TwitterChannel.ID, testdata.Org1.ID},

// 12: try to resume with timeout even tho flow doesn't have one set
{handler.TimeoutEventType, testdata.Cathy, "", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
}

last := time.Now()
Expand Down Expand Up @@ -619,7 +626,7 @@ func TestTimedEvents(t *testing.T) {
task = handler.NewExpirationTask(tc.OrgID, tc.Contact.ID, sessionID, expiration)

} else if tc.EventType == handler.TimeoutEventType {
timeoutOn := time.Now()
timeoutOn := time.Now().Round(time.Millisecond) // so that there's no difference between this and what we read from the db

// usually courier will set timeout_on after sending the last message
db.MustExec(`UPDATE flows_flowsession SET timeout_on = $2 WHERE id = $1`, sessionID, timeoutOn)
Expand Down Expand Up @@ -650,9 +657,10 @@ func TestTimedEvents(t *testing.T) {
last = time.Now()
}

// should have no waiting sessions or runs
assertdb.Query(t, db, `SELECT count(*) from flows_flowsession WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(0)
assertdb.Query(t, db, `SELECT count(*) from flows_flowrun WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(0)
// should only have a single waiting session/run with no timeout
assertdb.Query(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(1)
assertdb.Query(t, db, `SELECT timeout_on FROM flows_flowsession WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(nil)
assertdb.Query(t, db, `SELECT count(*) FROM flows_flowrun WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(1)

// test the case of a run and session no longer being the most recent but somehow still active, expiration should still work
r, err := db.QueryContext(ctx, `SELECT id, session_id from flows_flowrun WHERE contact_id = $1 and status = 'I' order by created_on asc limit 1`, testdata.Cathy.ID)
Expand Down
11 changes: 10 additions & 1 deletion core/tasks/handler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/excellent/types"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/engine"
"github.com/nyaruka/goflow/flows/events"
"github.com/nyaruka/goflow/flows/resumes"
"github.com/nyaruka/goflow/flows/triggers"
Expand Down Expand Up @@ -273,7 +274,15 @@ func handleTimedEvent(ctx context.Context, rt *runtime.Runtime, eventType string

_, err = runner.ResumeFlow(ctx, rt, oa, session, modelContact, resume, nil)
if err != nil {
return errors.Wrapf(err, "error resuming flow for timeout")
// if we errored, and it's the wait rejecting the timeout event, it's because it no longer exists on the flow, so clear it
// on the session
var eerr *engine.Error
if errors.As(err, &eerr) && eerr.Code() == engine.ErrorResumeRejectedByWait && resume.Type() == resumes.TypeWaitTimeout {
log.WithField("session_id", session.ID()).Info("clearing session timeout which is no longer set in flow")
return errors.Wrap(session.ClearWaitTimeout(ctx, rt.DB), "error clearing session timeout")
}

return errors.Wrap(err, "error resuming flow for timeout")
}

log.WithField("elapsed", time.Since(start)).Info("handled timed event")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/lib/pq v1.10.4
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.17.1
github.com/nyaruka/goflow v0.153.0
github.com/nyaruka/goflow v0.154.0
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDY
github.com/nyaruka/gocommon v1.5.3/go.mod h1:2ZeBZF9yt20IaAJ4aC1ujojAsFhJBk2IuDvSl7KuQDw=
github.com/nyaruka/gocommon v1.17.1 h1:4bbNp+0/BIbne4VDiKOxh3kcbdvEu/WsrsZiG/VyRZ8=
github.com/nyaruka/gocommon v1.17.1/go.mod h1:nmYyb7MZDM0iW4DYJKiBzfKuE9nbnx+xSHZasuIBOT0=
github.com/nyaruka/goflow v0.153.0 h1:ZphPN0WCod77uvBMCLOxjl9fibaHdTkcWVP3lltIgbc=
github.com/nyaruka/goflow v0.153.0/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ=
github.com/nyaruka/goflow v0.154.0 h1:tcUVs+sDFyjWdLvyk1kf2SOkQwSGInMbzuG+trE7ZNc=
github.com/nyaruka/goflow v0.154.0/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down

0 comments on commit 460cb9f

Please sign in to comment.