diff --git a/core/models/orgs.go b/core/models/orgs.go index 29f4389ca..ab3cf1a5c 100644 --- a/core/models/orgs.go +++ b/core/models/orgs.go @@ -68,9 +68,8 @@ const ( configSessionStorageMode = "session_storage_mode" - DBSessions = SessionStorageMode("db") - S3Sessions = SessionStorageMode("s3") - S3WriteSessions = SessionStorageMode("s3_write") + DBSessions = SessionStorageMode("db") + S3Sessions = SessionStorageMode("s3") ) // Org is mailroom's type for RapidPro orgs. It also implements the envs.Environment interface for GoFlow diff --git a/core/models/runs.go b/core/models/runs.go index f4b327387..96cc7118e 100644 --- a/core/models/runs.go +++ b/core/models/runs.go @@ -504,6 +504,20 @@ INSERT INTO RETURNING id ` +const insertCompleteSessionSQLNoOutput = ` +INSERT INTO + flows_flowsession( uuid, session_type, status, responded, output_url, contact_id, org_id, created_on, ended_on, wait_started_on, connection_id) + VALUES(:uuid,:session_type,:status,:responded, :output_url,:contact_id,:org_id, NOW(), NOW(), NULL, :connection_id) +RETURNING id +` + +const insertIncompleteSessionSQLNoOutput = ` +INSERT INTO + flows_flowsession( uuid, session_type, status, responded, output_url, contact_id, org_id, created_on, current_flow_id, timeout_on, wait_started_on, connection_id) + VALUES(:uuid,:session_type,:status,:responded, :output_url,:contact_id,:org_id, NOW(), :current_flow_id,:timeout_on,:wait_started_on,:connection_id) +RETURNING id +` + // FlowSession creates a flow session for the passed in session object. It also populates the runs we know about func (s *Session) FlowSession(sa flows.SessionAssets, env envs.Environment) (flows.Session, error) { session, err := goflow.Engine(config.Mailroom).ReadSession(sa, json.RawMessage(s.s.Output), assets.IgnoreMissing) @@ -608,17 +622,23 @@ func (s *Session) WriteUpdatedSession(ctx context.Context, tx *sqlx.Tx, rp *redi } } + // the SQL statement we'll use to update this session + updateSQL := updateSessionSQL + // if writing to S3, do so sessionMode := org.Org().SessionStorageMode() - if sessionMode == S3Sessions || sessionMode == S3WriteSessions { + if sessionMode == S3Sessions { err := WriteSessionOutputsToStorage(ctx, st, []*Session{s}) if err != nil { logrus.WithError(err).Error("error writing session to s3") } + + // don't write output in our SQL + updateSQL = updateSessionSQLNoOutput } // write our new session state to the db - _, err = tx.NamedExecContext(ctx, updateSessionSQL, s.s) + _, err = tx.NamedExecContext(ctx, updateSQL, s.s) if err != nil { return errors.Wrapf(err, "error updating session") } @@ -703,6 +723,21 @@ WHERE id = :id ` +const updateSessionSQLNoOutput = ` +UPDATE + flows_flowsession +SET + output_url = :output_url, + status = :status, + ended_on = CASE WHEN :status = 'W' THEN NULL ELSE NOW() END, + responded = :responded, + current_flow_id = :current_flow_id, + timeout_on = :timeout_on, + wait_started_on = :wait_started_on +WHERE + id = :id +` + const updateRunSQL = ` UPDATE flows_flowrun fr @@ -773,18 +808,25 @@ func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, st storage. } } + // the SQL we'll use to do our insert of complete sessions + insertCompleteSQL := insertCompleteSessionSQL + insertIncompleteSQL := insertIncompleteSessionSQL + // if writing our sessions to S3, do so sessionMode := org.Org().SessionStorageMode() - if sessionMode == S3Sessions || sessionMode == S3WriteSessions { + if sessionMode == S3Sessions { err := WriteSessionOutputsToStorage(ctx, st, sessions) if err != nil { // for now, continue on for errors, we are still reading from the DB logrus.WithError(err).Error("error writing sessions to s3") } + + insertCompleteSQL = insertCompleteSessionSQLNoOutput + insertIncompleteSQL = insertIncompleteSessionSQLNoOutput } // insert our complete sessions first - err := BulkQuery(ctx, "insert completed sessions", tx, insertCompleteSessionSQL, completeSessionsI) + err := BulkQuery(ctx, "insert completed sessions", tx, insertCompleteSQL, completeSessionsI) if err != nil { return nil, errors.Wrapf(err, "error inserting completed sessions") } @@ -796,7 +838,7 @@ func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, st storage. } // insert incomplete sessions - err = BulkQuery(ctx, "insert incomplete sessions", tx, insertIncompleteSessionSQL, incompleteSessionsI) + err = BulkQuery(ctx, "insert incomplete sessions", tx, insertIncompleteSQL, incompleteSessionsI) if err != nil { return nil, errors.Wrapf(err, "error inserting incomplete sessions") } diff --git a/core/runner/runner_test.go b/core/runner/runner_test.go index 10df36d02..84ef9a494 100644 --- a/core/runner/runner_test.go +++ b/core/runner/runner_test.go @@ -167,7 +167,7 @@ func TestResume(t *testing.T) { defer testsuite.ResetStorage() - // write sessions to storage as well + // write sessions to s3 storage db.MustExec(`UPDATE orgs_org set config = '{"session_storage_mode": "s3"}' WHERE id = 1`) defer testsuite.ResetDB() @@ -186,7 +186,7 @@ func TestResume(t *testing.T) { testsuite.AssertQuery(t, db, `SELECT count(*) FROM flows_flowsession WHERE contact_id = $1 AND current_flow_id = $2 - AND status = 'W' AND responded = FALSE AND org_id = 1 AND connection_id IS NULL AND output IS NOT NULL`, contact.ID(), flow.ID()).Returns(1) + AND status = 'W' AND responded = FALSE AND org_id = 1 AND connection_id IS NULL AND output IS NULL`, contact.ID(), flow.ID()).Returns(1) testsuite.AssertQuery(t, db, `SELECT count(*) FROM flows_flowrun WHERE contact_id = $1 AND flow_id = $2 @@ -220,7 +220,7 @@ func TestResume(t *testing.T) { testsuite.AssertQuery(t, db, `SELECT count(*) FROM flows_flowsession WHERE contact_id = $1 AND current_flow_id = $2 - AND status = $3 AND responded = TRUE AND org_id = 1 AND connection_id IS NULL AND output IS NOT NULL AND output_url IS NOT NULL`, contact.ID(), flow.ID(), tc.SessionStatus). + AND status = $3 AND responded = TRUE AND org_id = 1 AND connection_id IS NULL AND output IS NULL AND output_url IS NOT NULL`, contact.ID(), flow.ID(), tc.SessionStatus). Returns(1, "%d: didn't find expected session", i) runIsActive := tc.RunStatus == models.RunStatusActive || tc.RunStatus == models.RunStatusWaiting