diff --git a/core/handlers/msg_created.go b/core/handlers/msg_created.go index 9317e8fe8..0e79d9d37 100644 --- a/core/handlers/msg_created.go +++ b/core/handlers/msg_created.go @@ -50,7 +50,7 @@ func handlePreMsgCreated(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, } // everybody else gets their timeout cleared, will be set by courier - scene.Session().ClearTimeoutOn() + scene.Session().ClearWaitTimeout(ctx, nil) return nil } diff --git a/core/models/sessions.go b/core/models/sessions.go index 647b2c512..6a7dce403 100644 --- a/core/models/sessions.go +++ b/core/models/sessions.go @@ -108,7 +108,6 @@ func (s *Session) WaitStartedOn() *time.Time { return s.s.WaitStartedOn func (s *Session) WaitTimeoutOn() *time.Time { return s.s.WaitTimeoutOn } func (s *Session) WaitExpiresOn() *time.Time { return s.s.WaitExpiresOn } func (s *Session) WaitResumeOnExpire() bool { return s.s.WaitResumeOnExpire } -func (s *Session) ClearTimeoutOn() { s.s.WaitTimeoutOn = nil } func (s *Session) CurrentFlowID() FlowID { return s.s.CurrentFlowID } func (s *Session) ConnectionID() *ConnectionID { return s.s.ConnectionID } func (s *Session) IncomingMsgID() MsgID { return s.incomingMsgID } @@ -467,6 +466,19 @@ func (s *Session) Update(ctx context.Context, rt *runtime.Runtime, tx *sqlx.Tx, return nil } +// ClearWaitTimeout clears the timeout on the wait on this session and is used if the engine tells us +// that the flow no longer has a timeout on that wait. It can be called without updating the session +// in the database which is used when handling msg_created events before session is updated anyway. +func (s *Session) ClearWaitTimeout(ctx context.Context, db *sqlx.DB) error { + s.s.WaitTimeoutOn = nil + + if db != nil { + _, err := db.ExecContext(ctx, `UPDATE flows_flowsession SET timeout_on = NULL WHERE id = $1`, s.ID()) + return errors.Wrap(err, "error clearing wait timeout") + } + return nil +} + // MarshalJSON is our custom marshaller so that our inner struct get output func (s *Session) MarshalJSON() ([]byte, error) { return json.Marshal(s.s) diff --git a/core/models/sessions_test.go b/core/models/sessions_test.go index 9cdc4ef06..e06b18aff 100644 --- a/core/models/sessions_test.go +++ b/core/models/sessions_test.go @@ -435,6 +435,34 @@ func TestGetSessionWaitExpiresOn(t *testing.T) { assert.Nil(t, s2Actual) } +func TestClearWaitTimeout(t *testing.T) { + ctx, rt, db, _ := testsuite.Get() + + defer testsuite.Reset(testsuite.ResetData) + + oa := testdata.Org1.Load(rt) + + _, cathy := testdata.Cathy.Load(db, oa) + + expiresOn := time.Now().Add(time.Hour) + timeoutOn := time.Now().Add(time.Minute) + testdata.InsertWaitingSession(db, testdata.Org1, testdata.Cathy, models.FlowTypeMessaging, testdata.Favorites, models.NilConnectionID, time.Now(), expiresOn, true, &timeoutOn) + + session, err := models.FindWaitingSessionForContact(ctx, db, nil, oa, models.FlowTypeMessaging, cathy) + require.NoError(t, err) + + // can be called without db connection to clear without updating db + session.ClearWaitTimeout(ctx, nil) + assert.Nil(t, session.WaitTimeoutOn()) + assert.NotNil(t, session.WaitExpiresOn()) // unaffected + + // and called with one to clear in the database as well + session.ClearWaitTimeout(ctx, db) + assert.Nil(t, session.WaitTimeoutOn()) + + assertdb.Query(t, db, `SELECT timeout_on FROM flows_flowsession WHERE id = $1`, session.ID()).Returns(nil) +} + func insertSessionAndRun(db *sqlx.DB, contact *testdata.Contact, sessionType models.FlowType, status models.SessionStatus, flow *testdata.Flow, connID models.ConnectionID) (models.SessionID, models.FlowRunID) { // create session and add a run with same status sessionID := testdata.InsertFlowSession(db, testdata.Org1, contact, sessionType, status, flow, connID) diff --git a/core/tasks/handler/handler_test.go b/core/tasks/handler/handler_test.go index 3ac05f109..2361016c6 100644 --- a/core/tasks/handler/handler_test.go +++ b/core/tasks/handler/handler_test.go @@ -535,8 +535,9 @@ func TestTimedEvents(t *testing.T) { defer testsuite.Reset(testsuite.ResetAll) - // start to start our favorites flow + // create some keyword triggers testdata.InsertKeywordTrigger(db, testdata.Org1, testdata.Favorites, "start", models.MatchOnly, nil, nil) + testdata.InsertKeywordTrigger(db, testdata.Org1, testdata.PickANumber, "pick", models.MatchOnly, nil, nil) tcs := []struct { EventType string @@ -578,6 +579,12 @@ func TestTimedEvents(t *testing.T) { // 10: timeout on the color question {handler.TimeoutEventType, testdata.Cathy, "", "Sorry you can't participate right now, I'll try again later.", testdata.TwitterChannel.ID, testdata.Org1.ID}, + + // 11: start the pick a number flow + {handler.MsgEventType, testdata.Cathy, "pick", "Pick a number between 1-10.", testdata.TwitterChannel.ID, testdata.Org1.ID}, + + // 12: try to resume with timeout even tho flow doesn't have one set + {handler.TimeoutEventType, testdata.Cathy, "", "", testdata.TwitterChannel.ID, testdata.Org1.ID}, } last := time.Now() @@ -619,7 +626,7 @@ func TestTimedEvents(t *testing.T) { task = handler.NewExpirationTask(tc.OrgID, tc.Contact.ID, sessionID, expiration) } else if tc.EventType == handler.TimeoutEventType { - timeoutOn := time.Now() + timeoutOn := time.Now().Round(time.Millisecond) // so that there's no difference between this and what we read from the db // usually courier will set timeout_on after sending the last message db.MustExec(`UPDATE flows_flowsession SET timeout_on = $2 WHERE id = $1`, sessionID, timeoutOn) @@ -650,9 +657,10 @@ func TestTimedEvents(t *testing.T) { last = time.Now() } - // should have no waiting sessions or runs - assertdb.Query(t, db, `SELECT count(*) from flows_flowsession WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(0) - assertdb.Query(t, db, `SELECT count(*) from flows_flowrun WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(0) + // should only have a single waiting session/run with no timeout + assertdb.Query(t, db, `SELECT count(*) FROM flows_flowsession WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(1) + assertdb.Query(t, db, `SELECT timeout_on FROM flows_flowsession WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(nil) + assertdb.Query(t, db, `SELECT count(*) FROM flows_flowrun WHERE status = 'W' AND contact_id = $1`, testdata.Cathy.ID).Returns(1) // test the case of a run and session no longer being the most recent but somehow still active, expiration should still work r, err := db.QueryContext(ctx, `SELECT id, session_id from flows_flowrun WHERE contact_id = $1 and status = 'I' order by created_on asc limit 1`, testdata.Cathy.ID) diff --git a/core/tasks/handler/worker.go b/core/tasks/handler/worker.go index d98e16827..121af8a70 100644 --- a/core/tasks/handler/worker.go +++ b/core/tasks/handler/worker.go @@ -12,6 +12,7 @@ import ( "github.com/nyaruka/gocommon/urns" "github.com/nyaruka/goflow/excellent/types" "github.com/nyaruka/goflow/flows" + "github.com/nyaruka/goflow/flows/engine" "github.com/nyaruka/goflow/flows/events" "github.com/nyaruka/goflow/flows/resumes" "github.com/nyaruka/goflow/flows/triggers" @@ -273,7 +274,15 @@ func handleTimedEvent(ctx context.Context, rt *runtime.Runtime, eventType string _, err = runner.ResumeFlow(ctx, rt, oa, session, modelContact, resume, nil) if err != nil { - return errors.Wrapf(err, "error resuming flow for timeout") + // if we errored, and it's the wait rejecting the timeout event, it's because it no longer exists on the flow, so clear it + // on the session + var eerr *engine.Error + if errors.As(err, &eerr) && eerr.Code() == engine.ErrorResumeRejectedByWait && resume.Type() == resumes.TypeWaitTimeout { + log.WithField("session_id", session.ID()).Info("clearing session timeout which is no longer set in flow") + return errors.Wrap(session.ClearWaitTimeout(ctx, rt.DB), "error clearing session timeout") + } + + return errors.Wrap(err, "error resuming flow for timeout") } log.WithField("elapsed", time.Since(start)).Info("handled timed event") diff --git a/go.mod b/go.mod index f305e0f67..861aa3f4a 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/lib/pq v1.10.4 github.com/nyaruka/ezconf v0.2.1 github.com/nyaruka/gocommon v1.17.1 - github.com/nyaruka/goflow v0.153.0 + github.com/nyaruka/goflow v0.154.0 github.com/nyaruka/librato v1.0.0 github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d github.com/nyaruka/null v1.2.0 diff --git a/go.sum b/go.sum index e541a07ec..eb52258e1 100644 --- a/go.sum +++ b/go.sum @@ -135,8 +135,8 @@ github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDY github.com/nyaruka/gocommon v1.5.3/go.mod h1:2ZeBZF9yt20IaAJ4aC1ujojAsFhJBk2IuDvSl7KuQDw= github.com/nyaruka/gocommon v1.17.1 h1:4bbNp+0/BIbne4VDiKOxh3kcbdvEu/WsrsZiG/VyRZ8= github.com/nyaruka/gocommon v1.17.1/go.mod h1:nmYyb7MZDM0iW4DYJKiBzfKuE9nbnx+xSHZasuIBOT0= -github.com/nyaruka/goflow v0.153.0 h1:ZphPN0WCod77uvBMCLOxjl9fibaHdTkcWVP3lltIgbc= -github.com/nyaruka/goflow v0.153.0/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ= +github.com/nyaruka/goflow v0.154.0 h1:tcUVs+sDFyjWdLvyk1kf2SOkQwSGInMbzuG+trE7ZNc= +github.com/nyaruka/goflow v0.154.0/go.mod h1:HhK+wn4aRji8qJgJR8l48hPiZxnwVDdWa0Ogy5ifnSQ= github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0= github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg= github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=