Skip to content

Commit

Permalink
Update to goflow v0.39.0
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed May 14, 2019
1 parent ce772d1 commit 234e0a6
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 24 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/mattn/go-sqlite3 v1.10.0 // indirect
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.0.0
github.com/nyaruka/goflow v0.38.3
github.com/nyaruka/goflow v0.39.0
github.com/nyaruka/librato v0.0.0-20180827155909-cacc769357b8
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/nyaruka/gocommon v0.2.0 h1:1Le4Ok0Zp2RYULue0n4/02zL+1MrykN/C79HhirGeR
github.com/nyaruka/gocommon v0.2.0/go.mod h1:ZrhaOKNc+kK1qWNuCuZivskT+ygLyIwu4KZVgcaC1mw=
github.com/nyaruka/gocommon v1.0.0 h1:4gdAMOR4BTMHHZjOy5WhfKYGUZVmJ+3LPh1sj011Qzk=
github.com/nyaruka/gocommon v1.0.0/go.mod h1:QbdU2J9WBsqBmeZRuwndf2f6O7rD7mkC0bGn5UNnwjI=
github.com/nyaruka/goflow v0.38.3 h1:qgTn6mpOwbFSprAvHKhuUmrFTjFgPEkmQ55/CW7AH4c=
github.com/nyaruka/goflow v0.38.3/go.mod h1:QitrAujTi7Mc75aVIoCpAmFsfO794+ljo9QNGt7qSHY=
github.com/nyaruka/goflow v0.39.0 h1:2a9fhaZ9y9T8FuemCAfdQ0ertin9XYtvDxlTFB+4xoY=
github.com/nyaruka/goflow v0.39.0/go.mod h1:QitrAujTi7Mc75aVIoCpAmFsfO794+ljo9QNGt7qSHY=
github.com/nyaruka/librato v0.0.0-20180827155909-cacc769357b8 h1:TOvxy0u6LNTWP3gwbdNVCiByXJupr9ATFdzBnBJ2TY8=
github.com/nyaruka/librato v0.0.0-20180827155909-cacc769357b8/go.mod h1:huVocfMEHkttMHD4hSr/wjWNyTx/YMzwwajVzV2bq+0=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down
4 changes: 3 additions & 1 deletion hooks/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/nyaruka/mailroom/runner"
"github.com/nyaruka/mailroom/testsuite"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type ContactActionMap map[models.ContactID][]flows.Action
Expand Down Expand Up @@ -124,7 +125,7 @@ func CreateTestFlow(t *testing.T, uuid assets.FlowUUID, tc HookTestCase) flows.F
nodes = append(nodes, exitNodes...)

// we have our nodes, lets create our flow
flow := definition.NewFlow(
flow, err := definition.NewFlow(
uuid,
"Test Flow",
utils.Language("eng"),
Expand All @@ -135,6 +136,7 @@ func CreateTestFlow(t *testing.T, uuid assets.FlowUUID, tc HookTestCase) flows.F
nodes,
nil,
)
require.NoError(t, err)

return flow
}
Expand Down
41 changes: 26 additions & 15 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,11 @@ func ResumeFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, org *models.Or
}

// validate our flow
missing, err := sa.Validate(flow.UUID())
err = validateFlow(sa, flow.UUID())
if err != nil {
return nil, errors.Wrapf(err, "invalid flow: %s, cannot resume", flow.UUID())
}

// log any missing references, this shouldn't happen and we should fix it
if len(missing) > 0 {
logrus.WithField("flow_uuid", flow.UUID()).WithField("missing", missing).Error("flow being resumed with missing dependencies")
}

// build our flow session
fs, err := session.FlowSession(sa, org.Env())
if err != nil {
Expand Down Expand Up @@ -423,22 +418,17 @@ func StartFlow(
}

// build our session assets
assets, err := models.GetSessionAssets(org)
sa, err := models.GetSessionAssets(org)
if err != nil {
return nil, errors.Wrapf(err, "error starting flow, unable to load assets")
}

// validate our flow
missing, err := assets.Validate(flow.UUID())
err = validateFlow(sa, flow.UUID())
if err != nil {
return nil, errors.Wrapf(err, "invalid flow: %s, cannot start", flow.UUID())
}

// log any missing references, this shouldn't happen and we should fix it
if len(missing) > 0 {
logrus.WithField("flow_uuid", flow.UUID()).WithField("missing", missing).Error("flow being started with missing dependencies")
}

// we now need to grab locks for our contacts so that they are never in two starts or handles at the
// same time we try to grab locks for up to five minutes, but do it in batches where we wait for one
// second per contact to prevent deadlocks
Expand Down Expand Up @@ -485,14 +475,14 @@ func StartFlow(
// ok, we've filtered our contacts, build our triggers
triggers := make([]flows.Trigger, 0, len(locked))
for _, c := range contacts {
contact, err := c.FlowContact(org, assets)
contact, err := c.FlowContact(org, sa)
if err != nil {
return nil, errors.Wrapf(err, "error creating flow contact")
}
triggers = append(triggers, options.TriggerBuilder(contact))
}

ss, err := StartFlowForContacts(ctx, db, rp, org, assets, flow, triggers, options.CommitHook, options.Interrupt)
ss, err := StartFlowForContacts(ctx, db, rp, org, sa, flow, triggers, options.CommitHook, options.Interrupt)
if err != nil {
return nil, errors.Wrapf(err, "error starting flow for contacts")
}
Expand Down Expand Up @@ -735,3 +725,24 @@ func TriggerIVRFlow(ctx context.Context, db *sqlx.DB, rp *redis.Pool, orgID mode

return nil
}

func validateFlow(sa flows.SessionAssets, uuid assets.FlowUUID) error {
flow, err := sa.Flows().Get(uuid)
if err != nil {
return errors.Wrapf(err, "invalid flow: %s, cannot start", flow.UUID())
}

// check for missing assets and log
missingDeps := make([]string, 0)
err = flow.InspectRecursively(sa, func(r assets.Reference) {
missingDeps = append(missingDeps, r.String())
})

// one day we might error if we encounter missing dependencies but for now it's too common so log them
// to help us find whatever problem is creating them
if len(missingDeps) > 0 {
logrus.WithField("flow_uuid", flow.UUID()).WithField("missing", missingDeps).Error("flow being started with missing dependencies")
}

return nil
}
11 changes: 6 additions & 5 deletions web/flow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ func handleValidate(ctx context.Context, s *web.Server, r *http.Request) (interf
}
}

// try to read the flow definition which will fail if it's invalid
flow, err := definition.ReadFlow(flowDef)
if err != nil {
return nil, http.StatusBadRequest, err
return nil, http.StatusUnprocessableEntity, err
}

// if we have an org ID, build a session assets for it
Expand All @@ -125,11 +126,11 @@ func handleValidate(ctx context.Context, s *web.Server, r *http.Request) (interf
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable get session assets")
}
}

// validate the flow against these assets
if err := flow.Validate(sa); err != nil {
return nil, http.StatusUnprocessableEntity, err
// inspect the flow against these assets
if err := flow.Inspect(sa); err != nil {
return nil, http.StatusUnprocessableEntity, err
}
}

return flow, http.StatusOK, nil
Expand Down

0 comments on commit 234e0a6

Please sign in to comment.