Skip to content

Commit

Permalink
Merge pull request rapidpro#166 from nyaruka/run_status_1
Browse files Browse the repository at this point in the history
✏️ Start writing flows_flowrun.status alongside exit_type
  • Loading branch information
rowanseymour authored Aug 15, 2019
2 parents 0c9f2bb + b8a0413 commit 3df21c1
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 19 deletions.
Binary file modified mailroom_test.dump
Binary file not shown.
65 changes: 48 additions & 17 deletions models/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,36 @@ type FlowRunID int64
const NilFlowRunID = FlowRunID(0)

const (
SessionStatusActive = "A"
SessionStatusCompleted = "C"
SessionStatusFailed = "F"
SessionStatusWaiting = "W"
SessionStatusCompleted = "C"
SessionStatusExpired = "X"
SessionStatusInterrupted = "I"
SessionStatusFailed = "F"
)

var sessionStatusMap = map[flows.SessionStatus]SessionStatus{
flows.SessionStatusActive: SessionStatusActive,
flows.SessionStatusWaiting: SessionStatusWaiting,
flows.SessionStatusCompleted: SessionStatusCompleted,
flows.SessionStatusFailed: SessionStatusFailed,
flows.SessionStatusWaiting: SessionStatusWaiting,
}

type RunStatus string

const (
RunStatusActive = "A"
RunStatusWaiting = "W"
RunStatusCompleted = "C"
RunStatusExpired = "X"
RunStatusInterrupted = "I"
RunStatusFailed = "F"
)

var runStatusMap = map[flows.RunStatus]RunStatus{
flows.RunStatusActive: RunStatusActive,
flows.RunStatusWaiting: RunStatusWaiting,
flows.RunStatusCompleted: RunStatusCompleted,
flows.RunStatusExpired: RunStatusExpired,
flows.RunStatusFailed: RunStatusFailed,
}

type ExitType = null.String
Expand All @@ -57,12 +74,18 @@ var (
ExitFailed = ExitType("F")
)

var exitStatusMap = map[ExitType]SessionStatus{
var exitToSessionStatusMap = map[ExitType]SessionStatus{
ExitInterrupted: SessionStatusInterrupted,
ExitCompleted: SessionStatusCompleted,
ExitExpired: SessionStatusExpired,
}

var exitToRunStatusMap = map[ExitType]RunStatus{
ExitInterrupted: RunStatusInterrupted,
ExitCompleted: RunStatusCompleted,
ExitExpired: RunStatusExpired,
}

var keptEvents = map[string]bool{
events.TypeMsgCreated: true,
events.TypeMsgReceived: true,
Expand Down Expand Up @@ -210,6 +233,7 @@ type FlowRun struct {
r struct {
ID FlowRunID `db:"id"`
UUID flows.RunUUID `db:"uuid"`
Status RunStatus `db:"status"`
IsActive bool `db:"is_active"`
CreatedOn time.Time `db:"created_on"`
ModifiedOn time.Time `db:"modified_on"`
Expand Down Expand Up @@ -603,6 +627,7 @@ UPDATE
SET
is_active = r.is_active::bool,
exit_type = r.exit_type,
status = r.status,
exited_on = r.exited_on::timestamp with time zone,
expires_on = r.expires_on::timestamp with time zone,
responded = r.responded::bool,
Expand All @@ -612,9 +637,9 @@ SET
current_node_uuid = r.current_node_uuid::uuid,
modified_on = NOW()
FROM (
VALUES(:uuid, :is_active, :exit_type, :exited_on, :expires_on, :responded, :results, :path, :events, :current_node_uuid)
VALUES(:uuid, :is_active, :exit_type, :status, :exited_on, :expires_on, :responded, :results, :path, :events, :current_node_uuid)
) AS
r(uuid, is_active, exit_type, exited_on, expires_on, responded, results, path, events, current_node_uuid)
r(uuid, is_active, exit_type, status, exited_on, expires_on, responded, results, path, events, current_node_uuid)
WHERE
fr.uuid = r.uuid::uuid
`
Expand Down Expand Up @@ -723,9 +748,9 @@ func WriteSessions(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, org *OrgAss

const insertRunSQL = `
INSERT INTO
flows_flowrun(uuid, is_active, created_on, modified_on, exited_on, exit_type, expires_on, responded, results, path,
flows_flowrun(uuid, is_active, created_on, modified_on, exited_on, exit_type, status, expires_on, responded, results, path,
events, current_node_uuid, contact_id, flow_id, org_id, session_id, start_id, parent_uuid, connection_id)
VALUES(:uuid, :is_active, :created_on, NOW(), :exited_on, :exit_type, :expires_on, :responded, :results, :path,
VALUES(:uuid, :is_active, :created_on, NOW(), :exited_on, :exit_type, :status, :expires_on, :responded, :results, :path,
:events, :current_node_uuid, :contact_id, :flow_id, :org_id, :session_id, :start_id, :parent_uuid, :connection_id)
RETURNING id
`
Expand Down Expand Up @@ -755,6 +780,7 @@ func newRun(ctx context.Context, tx *sqlx.Tx, org *OrgAssets, session *Session,
run := &FlowRun{}
r := &run.r
r.UUID = fr.UUID()
r.Status = runStatusMap[fr.Status()]
r.CreatedOn = fr.CreatedOn()
r.ExitedOn = fr.ExitedOn()
r.ExpiresOn = fr.ExpiresOn()
Expand Down Expand Up @@ -877,9 +903,16 @@ func ExitSessions(ctx context.Context, tx Queryer, sessionIDs []SessionID, exitT
return nil
}

// map exit type to statuses for sessions and runs
sessionStatus := exitToSessionStatusMap[exitType]
runStatus, found := exitToRunStatusMap[exitType]
if !found {
return errors.Errorf("unknown exit type: %s", exitType)
}

// first interrupt our runs
start := time.Now()
res, err := tx.ExecContext(ctx, exitSessionRunsSQL, pq.Array(sessionIDs), exitType, now)
res, err := tx.ExecContext(ctx, exitSessionRunsSQL, pq.Array(sessionIDs), exitType, now, runStatus)
if err != nil {
return errors.Wrapf(err, "error exiting session runs")
}
Expand All @@ -889,12 +922,7 @@ func ExitSessions(ctx context.Context, tx Queryer, sessionIDs []SessionID, exitT
// then our sessions
start = time.Now()

status, found := exitStatusMap[exitType]
if !found {
return errors.Wrapf(err, "unknown exit type: %s", exitType)
}

res, err = tx.ExecContext(ctx, exitSessionsSQL, pq.Array(sessionIDs), now, status)
res, err = tx.ExecContext(ctx, exitSessionsSQL, pq.Array(sessionIDs), now, sessionStatus)
if err != nil {
return errors.Wrapf(err, "error exiting sessions")
}
Expand All @@ -911,6 +939,7 @@ SET
is_active = FALSE,
exit_type = $2,
exited_on = $3,
status = $4,
timeout_on = NULL,
modified_on = NOW(),
child_context = NULL,
Expand Down Expand Up @@ -956,6 +985,7 @@ SET
is_active = FALSE,
exited_on = $3,
exit_type = 'I',
status = 'I',
modified_on = NOW(),
child_context = NULL,
parent_context = NULL
Expand Down Expand Up @@ -1020,6 +1050,7 @@ const expireRunsSQL = `
is_active = FALSE,
exited_on = NOW(),
exit_type = 'E',
status = 'E',
modified_on = NOW(),
child_context = NULL,
parent_context = NULL
Expand Down
4 changes: 2 additions & 2 deletions runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestCampaignStarts(t *testing.T) {

testsuite.AssertQueryCount(t, db,
`SELECT count(*) FROM flows_flowrun WHERE contact_id = ANY($1) and flow_id = $2
AND is_active = FALSE AND responded = FALSE AND org_id = 1 AND parent_id IS NULL AND exit_type = 'C'
AND is_active = FALSE AND responded = FALSE AND org_id = 1 AND parent_id IS NULL AND exit_type = 'C' AND status = 'C'
AND results IS NOT NULL AND path IS NOT NULL AND events IS NOT NULL
AND session_id IS NOT NULL`,
[]interface{}{pq.Array(contacts), models.CampaignFlowID}, 2,
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestBatchStart(t *testing.T) {

testsuite.AssertQueryCount(t, db,
`SELECT count(*) FROM flows_flowrun WHERE contact_id = ANY($1) and flow_id = $2
AND is_active = FALSE AND responded = FALSE AND org_id = 1 AND parent_id IS NULL AND exit_type = 'C'
AND is_active = FALSE AND responded = FALSE AND org_id = 1 AND parent_id IS NULL AND exit_type = 'C' AND status = 'C'
AND results IS NOT NULL AND path IS NOT NULL AND events IS NOT NULL
AND session_id IS NOT NULL`,
[]interface{}{pq.Array(contactIDs), tc.Flow}, tc.TotalCount, "%d: unexpected number of runs", i,
Expand Down

0 comments on commit 3df21c1

Please sign in to comment.