Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

💥 Fail flow starts which can't be started #311

Merged
merged 1 commit into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/mattn/go-sqlite3 v1.10.0 // indirect
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.2.0
github.com/nyaruka/goflow v0.94.0
github.com/nyaruka/goflow v0.94.1
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ github.com/nyaruka/gocommon v1.2.0 h1:gCmVCXYZFwKDMqQj8R1jNlK+7a06khKFq3zX8fBBbz
github.com/nyaruka/gocommon v1.2.0/go.mod h1:9Y21Fd6iZXDLHWTRiZAc6b4LQSCi6HEEQK4SB45Yav4=
github.com/nyaruka/goflow v0.94.0 h1:fmUdADrFsJjClsbxJMd0R0uMyYWtQNr4aiURBI31ZKo=
github.com/nyaruka/goflow v0.94.0/go.mod h1:PDah2hr5WzODnUFK4VWWQkg7SqnYclf7P9Ik5u/VOG0=
github.com/nyaruka/goflow v0.94.1 h1:FzbA4Age1i5GuQ9su/E4z7HRa7f5ghv+GeNGUn5nPfA=
github.com/nyaruka/goflow v0.94.1/go.mod h1:PDah2hr5WzODnUFK4VWWQkg7SqnYclf7P9Ik5u/VOG0=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down
21 changes: 21 additions & 0 deletions models/starts.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,28 @@ type StartID null.Int
// NilStartID is our constant for a nil start id
var NilStartID = StartID(0)

// StartType is the type for the type of a start
type StartType string

// start type constants
const (
StartTypeManual = StartType("M")
StartTypeAPI = StartType("A")
StartTypeFlowAction = StartType("F")
StartTypeTrigger = StartType("T")
)

// StartStatus is the type for the status of a start
type StartStatus string

// start status constants
const (
StartStatusPending = StartStatus("P")
StartStatusStarting = StartStatus("S")
StartStatusComplete = StartStatus("C")
StartStatusFailed = StartStatus("F")
)

// RestartParticipants is our type for the bool of restarting participatants
type RestartParticipants bool

Expand Down Expand Up @@ -55,7 +68,15 @@ func MarkStartStarted(ctx context.Context, db *sqlx.DB, startID StartID, contact
return errors.Wrapf(err, "error setting start as started")
}
return nil
}

// MarkStartFailed sets the status for the passed in flow start to F
func MarkStartFailed(ctx context.Context, db *sqlx.DB, startID StartID) error {
_, err := db.Exec("UPDATE flows_flowstart SET status = 'F', modified_on = NOW() WHERE id = $1", startID)
if err != nil {
return errors.Wrapf(err, "error setting start as failed")
}
return nil
}

// FlowStartBatch represents a single flow batch that needs to be started
Expand Down
14 changes: 13 additions & 1 deletion tasks/starts/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/goflow/contactql"

"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -43,7 +44,18 @@ func handleFlowStart(ctx context.Context, mr *mailroom.Mailroom, task *queue.Tas
return errors.Wrapf(err, "error unmarshalling flow start task: %s", string(task.Task))
}

return CreateFlowBatches(ctx, mr.DB, mr.RP, mr.ElasticClient, startTask)
err = CreateFlowBatches(ctx, mr.DB, mr.RP, mr.ElasticClient, startTask)
if err != nil {
models.MarkStartFailed(ctx, mr.DB, startTask.ID())

// if error is user created query error.. don't escalate error to sentry
isQueryError, _ := contactql.IsQueryError(err)
if !isQueryError {
return err
}
}

return nil
}

// CreateFlowBatches takes our master flow start and creates batches of flow starts for all the unique contacts
Expand Down
53 changes: 45 additions & 8 deletions tasks/starts/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"

"github.com/nyaruka/goflow/utils/uuids"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
_ "github.com/nyaruka/mailroom/hooks"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/queue"
Expand All @@ -14,6 +16,7 @@ import (

"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStarts(t *testing.T) {
Expand All @@ -32,7 +35,9 @@ func TestStarts(t *testing.T) {
elastic.SetHealthcheck(false),
elastic.SetSniff(false),
)
assert.NoError(t, err)
require.NoError(t, err)

mr := &mailroom.Mailroom{Config: config.Mailroom, DB: db, RP: rp, ElasticClient: es}

// insert a flow run for one of our contacts
// TODO: can be replaced with a normal flow start of another flow once we support flows with waits
Expand All @@ -54,6 +59,7 @@ func TestStarts(t *testing.T) {
ContactCount int
BatchCount int
TotalCount int
Status models.StartStatus
}{
{
Label: "empty flow start",
Expand All @@ -62,6 +68,7 @@ func TestStarts(t *testing.T) {
ContactCount: 0,
BatchCount: 0,
TotalCount: 0,
Status: models.StartStatusComplete,
},
{
Label: "Single group",
Expand All @@ -71,6 +78,7 @@ func TestStarts(t *testing.T) {
ContactCount: 121,
BatchCount: 2,
TotalCount: 121,
Status: models.StartStatusComplete,
},
{
Label: "Group and Contact (but all already active)",
Expand All @@ -81,6 +89,7 @@ func TestStarts(t *testing.T) {
ContactCount: 121,
BatchCount: 2,
TotalCount: 0,
Status: models.StartStatusComplete,
},
{
Label: "Contact restart",
Expand All @@ -92,6 +101,7 @@ func TestStarts(t *testing.T) {
ContactCount: 1,
BatchCount: 1,
TotalCount: 1,
Status: models.StartStatusComplete,
},
{
Label: "Previous group and one new contact",
Expand All @@ -102,6 +112,7 @@ func TestStarts(t *testing.T) {
ContactCount: 122,
BatchCount: 2,
TotalCount: 1,
Status: models.StartStatusComplete,
},
{
Label: "Single contact, no restart",
Expand All @@ -111,6 +122,7 @@ func TestStarts(t *testing.T) {
ContactCount: 1,
BatchCount: 1,
TotalCount: 0,
Status: models.StartStatusComplete,
},
{
Label: "Single contact, include active, but no restart",
Expand All @@ -121,6 +133,7 @@ func TestStarts(t *testing.T) {
ContactCount: 1,
BatchCount: 1,
TotalCount: 0,
Status: models.StartStatusComplete,
},
{
Label: "Single contact, include active and restart",
Expand All @@ -132,6 +145,7 @@ func TestStarts(t *testing.T) {
ContactCount: 1,
BatchCount: 1,
TotalCount: 1,
Status: models.StartStatusComplete,
},
{
Label: "Query start",
Expand Down Expand Up @@ -170,6 +184,19 @@ func TestStarts(t *testing.T) {
ContactCount: 1,
BatchCount: 1,
TotalCount: 1,
Status: models.StartStatusComplete,
},
{
Label: "Query start with invalid query",
FlowID: models.SingleMessageFlowID,
Query: "xyz = 45",
RestartParticipants: true,
IncludeActive: true,
Queue: queue.HandlerQueue,
ContactCount: 0,
BatchCount: 0,
TotalCount: 0,
Status: models.StartStatusFailed,
},
{
Label: "New Contact",
Expand All @@ -179,10 +206,11 @@ func TestStarts(t *testing.T) {
ContactCount: 1,
BatchCount: 1,
TotalCount: 1,
Status: models.StartStatusComplete,
},
}

for i, tc := range tcs {
for _, tc := range tcs {
mes.NextResponse = tc.QueryResponse

// handle our start task
Expand All @@ -195,7 +223,10 @@ func TestStarts(t *testing.T) {
err := models.InsertFlowStarts(ctx, db, []*models.FlowStart{start})
assert.NoError(t, err)

err = CreateFlowBatches(ctx, db, rp, es, start)
startJSON, err := json.Marshal(start)
require.NoError(t, err)

err = handleFlowStart(ctx, mr, &queue.Task{Type: queue.StartFlow, Task: startJSON})
assert.NoError(t, err)

// pop all our tasks and execute them
Expand All @@ -219,14 +250,20 @@ func TestStarts(t *testing.T) {
}

// assert our count of batches
assert.Equal(t, tc.BatchCount, count, "%d: unexpected batch count", i)
assert.Equal(t, tc.BatchCount, count, "unexpected batch count in '%s'", tc.Label)

// assert our count of total flow runs created
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowrun where flow_id = $1 AND start_id = $2 AND is_active = FALSE`,
[]interface{}{tc.FlowID, start.ID()}, tc.TotalCount, "%d: unexpected total run count", i)
[]interface{}{tc.FlowID, start.ID()}, tc.TotalCount, "unexpected total run count in '%s'", tc.Label)

// flow start should be complete
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart where status = 'C' AND id = $1 AND contact_count = $2`,
[]interface{}{start.ID(), tc.ContactCount}, 1, "%d: start status not set to complete", i)
// assert final status
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart where status = $2 AND id = $1`,
[]interface{}{start.ID(), tc.Status}, 1, "status mismatch in '%s'", tc.Label)

// assert final contact count
if tc.Status != models.StartStatusFailed {
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM flows_flowstart where contact_count = $2 AND id = $1`,
[]interface{}{start.ID(), tc.ContactCount}, 1, "contact count mismatch in '%s'", tc.Label)
}
}
}
18 changes: 8 additions & 10 deletions web/contact/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ func handleSearch(ctx context.Context, s *web.Server, r *http.Request) (interfac
request.GroupUUID, request.Query, request.Sort, request.Offset, request.PageSize)

if err != nil {
switch cause := errors.Cause(err).(type) {
case *contactql.QueryError:
return cause, http.StatusBadRequest, nil
default:
return nil, http.StatusInternalServerError, err
isQueryError, qerr := contactql.IsQueryError(err)
if isQueryError {
return qerr, http.StatusBadRequest, nil
}
return nil, http.StatusInternalServerError, err
}

// normalize and inspect the query
Expand Down Expand Up @@ -181,12 +180,11 @@ func handleParseQuery(ctx context.Context, s *web.Server, r *http.Request) (inte
parsed, err := contactql.ParseQuery(request.Query, env.RedactionPolicy(), env.DefaultCountry(), org.SessionAssets())

if err != nil {
switch cause := errors.Cause(err).(type) {
case *contactql.QueryError:
return cause, http.StatusBadRequest, nil
default:
return nil, http.StatusInternalServerError, err
isQueryError, qerr := contactql.IsQueryError(err)
if isQueryError {
return qerr, http.StatusBadRequest, nil
}
return nil, http.StatusInternalServerError, err
}

// normalize and inspect the query
Expand Down