Skip to content

Commit

Permalink
Merge pull request rapidpro#471 from nyaruka/s3-writing-only
Browse files Browse the repository at this point in the history
Don't write output when writing to s3
  • Loading branch information
rowanseymour authored Sep 2, 2021
2 parents e7d246f + 1cc43d1 commit 24049ba
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
5 changes: 2 additions & 3 deletions core/models/orgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ const (

configSessionStorageMode = "session_storage_mode"

DBSessions = SessionStorageMode("db")
S3Sessions = SessionStorageMode("s3")
S3WriteSessions = SessionStorageMode("s3_write")
DBSessions = SessionStorageMode("db")
S3Sessions = SessionStorageMode("s3")
)

// Org is mailroom's type for RapidPro orgs. It also implements the envs.Environment interface for GoFlow
Expand Down
52 changes: 47 additions & 5 deletions core/models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,20 @@ INSERT INTO
RETURNING id
`

const insertCompleteSessionSQLNoOutput = `
INSERT INTO
flows_flowsession( uuid, session_type, status, responded, output_url, contact_id, org_id, created_on, ended_on, wait_started_on, connection_id)
VALUES(:uuid,:session_type,:status,:responded, :output_url,:contact_id,:org_id, NOW(), NOW(), NULL, :connection_id)
RETURNING id
`

const insertIncompleteSessionSQLNoOutput = `
INSERT INTO
flows_flowsession( uuid, session_type, status, responded, output_url, contact_id, org_id, created_on, current_flow_id, timeout_on, wait_started_on, connection_id)
VALUES(:uuid,:session_type,:status,:responded, :output_url,:contact_id,:org_id, NOW(), :current_flow_id,:timeout_on,:wait_started_on,:connection_id)
RETURNING id
`

// FlowSession creates a flow session for the passed in session object. It also populates the runs we know about
func (s *Session) FlowSession(sa flows.SessionAssets, env envs.Environment) (flows.Session, error) {
session, err := goflow.Engine(config.Mailroom).ReadSession(sa, json.RawMessage(s.s.Output), assets.IgnoreMissing)
Expand Down Expand Up @@ -608,17 +622,23 @@ func (s *Session) WriteUpdatedSession(ctx context.Context, tx *sqlx.Tx, rp *redi
}
}

// the SQL statement we'll use to update this session
updateSQL := updateSessionSQL

// if writing to S3, do so
sessionMode := org.Org().SessionStorageMode()
if sessionMode == S3Sessions || sessionMode == S3WriteSessions {
if sessionMode == S3Sessions {
err := WriteSessionOutputsToStorage(ctx, st, []*Session{s})
if err != nil {
logrus.WithError(err).Error("error writing session to s3")
}

// don't write output in our SQL
updateSQL = updateSessionSQLNoOutput
}

// write our new session state to the db
_, err = tx.NamedExecContext(ctx, updateSessionSQL, s.s)
_, err = tx.NamedExecContext(ctx, updateSQL, s.s)
if err != nil {
return errors.Wrapf(err, "error updating session")
}
Expand Down Expand Up @@ -703,6 +723,21 @@ WHERE
id = :id
`

const updateSessionSQLNoOutput = `
UPDATE
flows_flowsession
SET
output_url = :output_url,
status = :status,
ended_on = CASE WHEN :status = 'W' THEN NULL ELSE NOW() END,
responded = :responded,
current_flow_id = :current_flow_id,
timeout_on = :timeout_on,
wait_started_on = :wait_started_on
WHERE
id = :id
`

const updateRunSQL = `
UPDATE
flows_flowrun fr
Expand Down Expand Up @@ -773,18 +808,25 @@ func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, st storage.
}
}

// the SQL we'll use to do our insert of complete sessions
insertCompleteSQL := insertCompleteSessionSQL
insertIncompleteSQL := insertIncompleteSessionSQL

// if writing our sessions to S3, do so
sessionMode := org.Org().SessionStorageMode()
if sessionMode == S3Sessions || sessionMode == S3WriteSessions {
if sessionMode == S3Sessions {
err := WriteSessionOutputsToStorage(ctx, st, sessions)
if err != nil {
// for now, continue on for errors, we are still reading from the DB
logrus.WithError(err).Error("error writing sessions to s3")
}

insertCompleteSQL = insertCompleteSessionSQLNoOutput
insertIncompleteSQL = insertIncompleteSessionSQLNoOutput
}

// insert our complete sessions first
err := BulkQuery(ctx, "insert completed sessions", tx, insertCompleteSessionSQL, completeSessionsI)
err := BulkQuery(ctx, "insert completed sessions", tx, insertCompleteSQL, completeSessionsI)
if err != nil {
return nil, errors.Wrapf(err, "error inserting completed sessions")
}
Expand All @@ -796,7 +838,7 @@ func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, st storage.
}

// insert incomplete sessions
err = BulkQuery(ctx, "insert incomplete sessions", tx, insertIncompleteSessionSQL, incompleteSessionsI)
err = BulkQuery(ctx, "insert incomplete sessions", tx, insertIncompleteSQL, incompleteSessionsI)
if err != nil {
return nil, errors.Wrapf(err, "error inserting incomplete sessions")
}
Expand Down
6 changes: 3 additions & 3 deletions core/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestResume(t *testing.T) {

defer testsuite.ResetStorage()

// write sessions to storage as well
// write sessions to s3 storage
db.MustExec(`UPDATE orgs_org set config = '{"session_storage_mode": "s3"}' WHERE id = 1`)
defer testsuite.ResetDB()

Expand All @@ -186,7 +186,7 @@ func TestResume(t *testing.T) {

testsuite.AssertQuery(t, db,
`SELECT count(*) FROM flows_flowsession WHERE contact_id = $1 AND current_flow_id = $2
AND status = 'W' AND responded = FALSE AND org_id = 1 AND connection_id IS NULL AND output IS NOT NULL`, contact.ID(), flow.ID()).Returns(1)
AND status = 'W' AND responded = FALSE AND org_id = 1 AND connection_id IS NULL AND output IS NULL`, contact.ID(), flow.ID()).Returns(1)

testsuite.AssertQuery(t, db,
`SELECT count(*) FROM flows_flowrun WHERE contact_id = $1 AND flow_id = $2
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestResume(t *testing.T) {

testsuite.AssertQuery(t, db,
`SELECT count(*) FROM flows_flowsession WHERE contact_id = $1 AND current_flow_id = $2
AND status = $3 AND responded = TRUE AND org_id = 1 AND connection_id IS NULL AND output IS NOT NULL AND output_url IS NOT NULL`, contact.ID(), flow.ID(), tc.SessionStatus).
AND status = $3 AND responded = TRUE AND org_id = 1 AND connection_id IS NULL AND output IS NULL AND output_url IS NOT NULL`, contact.ID(), flow.ID(), tc.SessionStatus).
Returns(1, "%d: didn't find expected session", i)

runIsActive := tc.RunStatus == models.RunStatusActive || tc.RunStatus == models.RunStatusWaiting
Expand Down

0 comments on commit 24049ba

Please sign in to comment.