Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't write output when writing to s3 #471

Merged
merged 2 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/models/orgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ const (

configSessionStorageMode = "session_storage_mode"

DBSessions = SessionStorageMode("db")
S3Sessions = SessionStorageMode("s3")
S3WriteSessions = SessionStorageMode("s3_write")
DBSessions = SessionStorageMode("db")
S3Sessions = SessionStorageMode("s3")
)

// Org is mailroom's type for RapidPro orgs. It also implements the envs.Environment interface for GoFlow
Expand Down
52 changes: 47 additions & 5 deletions core/models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,20 @@ INSERT INTO
RETURNING id
`

const insertCompleteSessionSQLNoOutput = `
INSERT INTO
flows_flowsession( uuid, session_type, status, responded, output_url, contact_id, org_id, created_on, ended_on, wait_started_on, connection_id)
VALUES(:uuid,:session_type,:status,:responded, :output_url,:contact_id,:org_id, NOW(), NOW(), NULL, :connection_id)
RETURNING id
`

const insertIncompleteSessionSQLNoOutput = `
INSERT INTO
flows_flowsession( uuid, session_type, status, responded, output_url, contact_id, org_id, created_on, current_flow_id, timeout_on, wait_started_on, connection_id)
VALUES(:uuid,:session_type,:status,:responded, :output_url,:contact_id,:org_id, NOW(), :current_flow_id,:timeout_on,:wait_started_on,:connection_id)
RETURNING id
`

// FlowSession creates a flow session for the passed in session object. It also populates the runs we know about
func (s *Session) FlowSession(sa flows.SessionAssets, env envs.Environment) (flows.Session, error) {
session, err := goflow.Engine(config.Mailroom).ReadSession(sa, json.RawMessage(s.s.Output), assets.IgnoreMissing)
Expand Down Expand Up @@ -608,17 +622,23 @@ func (s *Session) WriteUpdatedSession(ctx context.Context, tx *sqlx.Tx, rp *redi
}
}

// the SQL statement we'll use to update this session
updateSQL := updateSessionSQL

// if writing to S3, do so
sessionMode := org.Org().SessionStorageMode()
if sessionMode == S3Sessions || sessionMode == S3WriteSessions {
if sessionMode == S3Sessions {
err := WriteSessionOutputsToStorage(ctx, st, []*Session{s})
if err != nil {
logrus.WithError(err).Error("error writing session to s3")
}

// don't write output in our SQL
updateSQL = updateSessionSQLNoOutput
}

// write our new session state to the db
_, err = tx.NamedExecContext(ctx, updateSessionSQL, s.s)
_, err = tx.NamedExecContext(ctx, updateSQL, s.s)
if err != nil {
return errors.Wrapf(err, "error updating session")
}
Expand Down Expand Up @@ -703,6 +723,21 @@ WHERE
id = :id
`

const updateSessionSQLNoOutput = `
UPDATE
flows_flowsession
SET
output_url = :output_url,
status = :status,
ended_on = CASE WHEN :status = 'W' THEN NULL ELSE NOW() END,
responded = :responded,
current_flow_id = :current_flow_id,
timeout_on = :timeout_on,
wait_started_on = :wait_started_on
WHERE
id = :id
`

const updateRunSQL = `
UPDATE
flows_flowrun fr
Expand Down Expand Up @@ -773,18 +808,25 @@ func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, st storage.
}
}

// the SQL we'll use to do our insert of complete sessions
insertCompleteSQL := insertCompleteSessionSQL
insertIncompleteSQL := insertIncompleteSessionSQL

// if writing our sessions to S3, do so
sessionMode := org.Org().SessionStorageMode()
if sessionMode == S3Sessions || sessionMode == S3WriteSessions {
if sessionMode == S3Sessions {
err := WriteSessionOutputsToStorage(ctx, st, sessions)
if err != nil {
// for now, continue on for errors, we are still reading from the DB
logrus.WithError(err).Error("error writing sessions to s3")
}

insertCompleteSQL = insertCompleteSessionSQLNoOutput
insertIncompleteSQL = insertIncompleteSessionSQLNoOutput
}

// insert our complete sessions first
err := BulkQuery(ctx, "insert completed sessions", tx, insertCompleteSessionSQL, completeSessionsI)
err := BulkQuery(ctx, "insert completed sessions", tx, insertCompleteSQL, completeSessionsI)
if err != nil {
return nil, errors.Wrapf(err, "error inserting completed sessions")
}
Expand All @@ -796,7 +838,7 @@ func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, st storage.
}

// insert incomplete sessions
err = BulkQuery(ctx, "insert incomplete sessions", tx, insertIncompleteSessionSQL, incompleteSessionsI)
err = BulkQuery(ctx, "insert incomplete sessions", tx, insertIncompleteSQL, incompleteSessionsI)
if err != nil {
return nil, errors.Wrapf(err, "error inserting incomplete sessions")
}
Expand Down
6 changes: 3 additions & 3 deletions core/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestResume(t *testing.T) {

defer testsuite.ResetStorage()

// write sessions to storage as well
// write sessions to s3 storage
db.MustExec(`UPDATE orgs_org set config = '{"session_storage_mode": "s3"}' WHERE id = 1`)
defer testsuite.ResetDB()

Expand All @@ -186,7 +186,7 @@ func TestResume(t *testing.T) {

testsuite.AssertQuery(t, db,
`SELECT count(*) FROM flows_flowsession WHERE contact_id = $1 AND current_flow_id = $2
AND status = 'W' AND responded = FALSE AND org_id = 1 AND connection_id IS NULL AND output IS NOT NULL`, contact.ID(), flow.ID()).Returns(1)
AND status = 'W' AND responded = FALSE AND org_id = 1 AND connection_id IS NULL AND output IS NULL`, contact.ID(), flow.ID()).Returns(1)

testsuite.AssertQuery(t, db,
`SELECT count(*) FROM flows_flowrun WHERE contact_id = $1 AND flow_id = $2
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestResume(t *testing.T) {

testsuite.AssertQuery(t, db,
`SELECT count(*) FROM flows_flowsession WHERE contact_id = $1 AND current_flow_id = $2
AND status = $3 AND responded = TRUE AND org_id = 1 AND connection_id IS NULL AND output IS NOT NULL AND output_url IS NOT NULL`, contact.ID(), flow.ID(), tc.SessionStatus).
AND status = $3 AND responded = TRUE AND org_id = 1 AND connection_id IS NULL AND output IS NULL AND output_url IS NOT NULL`, contact.ID(), flow.ID(), tc.SessionStatus).
Returns(1, "%d: didn't find expected session", i)

runIsActive := tc.RunStatus == models.RunStatusActive || tc.RunStatus == models.RunStatusWaiting
Expand Down