Skip to content

Commit

Permalink
add apply actions endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Feb 14, 2020
1 parent 13fae8f commit 2e7db56
Show file tree
Hide file tree
Showing 4 changed files with 397 additions and 20 deletions.
17 changes: 15 additions & 2 deletions models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,23 @@ const (
NilContactID = ContactID(0)
)

// LoadContact loads a contact from the passed in id
func LoadContact(ctx context.Context, db Queryer, org *OrgAssets, id ContactID) (*Contact, error) {
contacts, err := LoadContacts(ctx, db, org, []ContactID{id})
if err != nil {
return nil, err
}
if len(contacts) == 0 {
return nil, nil
}
return contacts[0], nil
}

// LoadContacts loads a set of contacts for the passed in ids
func LoadContacts(ctx context.Context, db Queryer, org *OrgAssets, ids []ContactID) ([]*Contact, error) {
start := time.Now()

rows, err := db.QueryxContext(ctx, selectContactSQL, pq.Array(ids))
rows, err := db.QueryxContext(ctx, selectContactSQL, pq.Array(ids), org.OrgID())
if err != nil {
return nil, errors.Wrap(err, "error selecting contacts")
}
Expand Down Expand Up @@ -523,7 +535,8 @@ LEFT JOIN (
) u ON c.id = u.contact_id
WHERE
c.id = ANY($1) AND
is_active = TRUE
is_active = TRUE AND
c.org_id = $2
) r;
`

Expand Down
110 changes: 92 additions & 18 deletions web/contact/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package contact
import (
"context"
"encoding/json"
"math"
"net/http"

"github.com/apex/log"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/actions"
"github.com/nyaruka/goflow/flows/definition"
"github.com/nyaruka/goflow/flows/triggers"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/goflow/utils/uuids"
"github.com/nyaruka/mailroom/goflow"
Expand Down Expand Up @@ -201,9 +202,9 @@ func handleParseQuery(ctx context.Context, s *web.Server, r *http.Request) (inte
// }
//
type applyActionsRequest struct {
OrgID models.OrgID `json:"org_id" validate:"required"`
ContactUUID flows.ContactUUID `json:"contact_uuid" validate:"required"`
Actions []json.RawMessage `json:"actions" validate:"required"`
OrgID models.OrgID `json:"org_id" validate:"required"`
ContactID models.ContactID `json:"contact_id" validate:"required"`
Actions []json.RawMessage `json:"actions" validate:"required"`
}

// Response for a contact update. Will return the full contact state and any errors
Expand All @@ -222,7 +223,8 @@ type applyActionsRequest struct {
// }
// }
type applyActionsResponse struct {
Contact json.RawMessage `json:"contact"`
Contact *flows.Contact `json:"contact"`
Events []flows.Event `json:"events"`
}

// the types of actions our apply_actions endpoind supports
Expand All @@ -246,14 +248,24 @@ func handleApplyActions(ctx context.Context, s *web.Server, r *http.Request) (in
}

// grab our org
org, err := models.NewOrgAssets(s.CTX, s.DB, request.OrgID, nil)
org, err := models.GetOrgAssets(s.CTX, s.DB, request.OrgID)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load org assets")
}

sa, err := models.NewSessionAssets(org)
// clone it as we will modify flows
org, err = org.Clone(s.CTX, s.DB)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load session assets")
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to clone orgs")
}

// load our contact
contact, err := models.LoadContact(ctx, s.DB, org, request.ContactID)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "unable to load contact")
}
if contact == nil {
return errors.Errorf("unable to find contact widh id: %d", request.ContactID), http.StatusBadRequest, nil
}

// build up our actions
Expand All @@ -264,23 +276,23 @@ func handleApplyActions(ctx context.Context, s *web.Server, r *http.Request) (in
return errors.Wrapf(err, "error in action: %s", string(a)), http.StatusBadRequest, nil
}
if !supportedTypes[action.Type()] {
return errors.Errorf("unsupported action type: %s", action.Type), http.StatusBadRequest, nil
return errors.Errorf("unsupported action type: %s", action.Type()), http.StatusBadRequest, nil
}

as[i] = action
}

// create a minimal flow with these actions
// create a minimal node with these actions
entry := definition.NewNode(
flows.NodeUUID(uuids.New()),
as,
nil,
nil,
[]flows.Exit{definition.NewExit(flows.ExitUUID(uuids.New()), "")},
)

// we have our nodes, lets create our flow
flowUUID := assets.FlowUUID(uuids.New())
flow, err := definition.NewFlow(
flowDef, err := definition.NewFlow(
flowUUID,
"Contact Update Flow",
envs.Language("eng"),
Expand All @@ -292,17 +304,79 @@ func handleApplyActions(ctx context.Context, s *web.Server, r *http.Request) (in
nil,
)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error building flow")
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error building contact flow")
}

session, sprint, err := goflow.Engine().NewSession(sa, trigger)
flowJSON, err := json.Marshal(flowDef)
if err != nil {
log.WithError(err).Errorf("error starting flow")
continue
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error marshalling contact flow")
}

// build our response
response := &applyActionsResponse{}
flow := org.SetFlow(math.MaxInt32, flowUUID, flowDef.Name(), flowJSON)

flowContact, err := contact.FlowContact(org)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error converting to flow contact")
}

// build our trigger
trigger := triggers.NewManual(org.Env(), flow.FlowReference(), flowContact, nil)
flowSession, flowSprint, err := goflow.Engine().NewSession(org.SessionAssets(), trigger)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error running contact flow")
}

tx, err := s.DB.BeginTxx(ctx, nil)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error starting transaction")
}

session, err := models.NewSession(ctx, tx, org, flowSession, flowSprint)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error creating session object")
}

// apply our events
for _, e := range flowSprint.Events() {
err := models.ApplyEvent(ctx, tx, s.RP, org, session, e)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error applying event: %v", e)
}
}

// gather all our pre commit events, group them by hook and apply them
err = models.ApplyPreEventHooks(ctx, tx, s.RP, org, []*models.Session{session})
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error applying pre commit hooks")
}

// commit our transaction
err = tx.Commit()
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error committing pre commit hooks")
}

tx, err = s.DB.BeginTxx(ctx, nil)
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error starting transaction for post commit")
}

// then apply our post commit hooks
err = models.ApplyPostEventHooks(ctx, tx, s.RP, org, []*models.Session{session})
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error applying pre commit hooks")
}

err = tx.Commit()
if err != nil {
return nil, http.StatusInternalServerError, errors.Wrapf(err, "error committing pre commit hooks")
}

// all done! build our response, including our updated contact and events
response := &applyActionsResponse{
Contact: flowSession.Contact(),
Events: flowSprint.Events(),
}

return response, http.StatusOK, nil
}
83 changes: 83 additions & 0 deletions web/contact/contact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package contact
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
Expand All @@ -12,7 +13,10 @@ import (
"time"

"github.com/nyaruka/goflow/test"
"github.com/nyaruka/goflow/utils/dates"
"github.com/nyaruka/goflow/utils/uuids"
"github.com/nyaruka/mailroom/config"
_ "github.com/nyaruka/mailroom/hooks"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/search"
"github.com/nyaruka/mailroom/testsuite"
Expand Down Expand Up @@ -319,3 +323,82 @@ func TestParse(t *testing.T) {
test.AssertEqualJSON(t, tc.Response, json.RawMessage(response), "%d: unexpected response", i)
}
}

var update = flag.Bool("update", false, "update testdata files")

func TestApplyActions(t *testing.T) {
testsuite.Reset()
ctx := testsuite.CTX()
db := testsuite.DB()
rp := testsuite.RP()
wg := &sync.WaitGroup{}

server := web.NewServer(ctx, config.Mailroom, db, rp, nil, nil, wg)
server.Start()
time.Sleep(time.Second)

defer server.Stop()

uuids.SetGenerator(uuids.NewSeededGenerator(0))
defer uuids.SetGenerator(uuids.DefaultGenerator)

dates.SetNowSource(dates.NewSequentialNowSource(time.Date(2018, 7, 6, 12, 30, 0, 123456789, time.UTC)))
defer dates.SetNowSource(dates.DefaultNowSource)

// for simpler tests we clear out cathy's fields and groups to start
db.MustExec(`UPDATE contacts_contact SET fields = NULL WHERE id = $1`, models.CathyID)
db.MustExec(`DELETE FROM contacts_contactgroup_contacts WHERE contact_id = $1`, models.CathyID)
db.MustExec(`UPDATE contacts_contacturn SET contact_id = NULL WHERE contact_id = $1`, models.CathyID)

type TestCase struct {
Label string `json:"label"`
Method string `json:"method"`
Path string `json:"path"`
Body json.RawMessage `json:"body"`
Status int `json:"status"`
Response json.RawMessage `json:"response"`
DBAssertions []struct {
Query string `json:"query"`
Count int `json:"count"`
} `json:"db_assertions"`
}
tcs := make([]*TestCase, 0, 20)
tcJSON, err := ioutil.ReadFile("testdata/apply_actions.json")
assert.NoError(t, err)

err = json.Unmarshal(tcJSON, &tcs)
assert.NoError(t, err)

for _, tc := range tcs {
req, err := http.NewRequest(tc.Method, "http://localhost:8090"+tc.Path, bytes.NewReader([]byte(tc.Body)))
assert.NoError(t, err, "%s: error creating request", tc.Label)

resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err, "%s: error making request", tc.Label)

assert.Equal(t, tc.Status, resp.StatusCode, "%s: unexpected status", tc.Label)

response, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err, "%s: error reading body", tc.Label)

if !*update {
test.AssertEqualJSON(t, json.RawMessage(tc.Response), json.RawMessage(response), "%s: unexpected response\nExpected: %s\nGot: %s", tc.Label, tc.Response, string(response))
}

for _, dba := range tc.DBAssertions {
testsuite.AssertQueryCount(t, db, dba.Query, nil, dba.Count, "%s: '%s' returned wrong count", tc.Label, dba.Query)
}

tc.Response = json.RawMessage(response)
}

// update if we are meant to
if *update {
truth, err := json.MarshalIndent(tcs, "", " ")
assert.NoError(t, err)

if err := ioutil.WriteFile("testdata/apply_actions.json", truth, 0644); err != nil {
t.Fatalf("failed to update truth file: %s", err)
}
}
}
Loading

0 comments on commit 2e7db56

Please sign in to comment.