Skip to content

Commit

Permalink
wenichats send history based on session run events
Browse files Browse the repository at this point in the history
  • Loading branch information
rasoro committed Jan 10, 2024
1 parent 67d9ba2 commit 91da703
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 22 deletions.
50 changes: 50 additions & 0 deletions core/models/msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,56 @@ func SelectContactMessages(ctx context.Context, db Queryer, contactID int, after
return msgs, nil
}

const selectMsgByFlowrunUUID = `
SELECT
id,
broadcast_id,
uuid,
text,
created_on,
direction,
status,
visibility,
msg_count,
error_count,
next_attempt,
external_id,
attachments,
metadata,
channel_id,
contact_id,
contact_urn_id,
org_id,
topup_id
FROM public.msgs_msg
JOIN (
SELECT (jsonb_array_elements(events)->'msg'->>'uuid')::uuid AS msg_uuid
FROM public.flows_flowrun
WHERE "uuid" = $1
) AS flow_events
ON flow_events.msg_uuid = public.msgs_msg."uuid";
`

// SelectMessagesByFlowRun loads the given messages for the passed in contact, created after the passed in time
func SelectMessagesByFlowRun(ctx context.Context, db Queryer, flowrunUUID string) ([]*Msg, error) {
rows, err := db.QueryxContext(ctx, selectMsgByFlowrunUUID, flowrunUUID)
if err != nil {
return nil, errors.Wrapf(err, "error querying msgs for flowrun UUID %v", flowrunUUID)
}
defer rows.Close()

msgs := make([]*Msg, 0)
for rows.Next() {
msg := &Msg{}
err = rows.StructScan(&msg.m)
if err != nil {
return nil, errors.Wrapf(err, "error scanning msg row")
}
msgs = append(msgs, msg)
}
return msgs, nil
}

// NormalizeAttachment will turn any relative URL in the passed in attachment and normalize it to
// include the full host for attachment domains
func NormalizeAttachment(cfg *runtime.Config, attachment utils.Attachment) utils.Attachment {
Expand Down
32 changes: 32 additions & 0 deletions core/models/msgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/test"
"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/runner"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/testsuite/testdata"
Expand Down Expand Up @@ -623,3 +625,33 @@ func TestSelectContactMessages(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 3, len(msgs))
}

func TestSelectMsgByFlowrunUUID(t *testing.T) {
ctx, rt, db, _ := testsuite.Get()
defer testsuite.Reset(testsuite.ResetAll)
defer uuids.SetGenerator(uuids.DefaultGenerator)
uuids.SetGenerator(uuids.NewSeededGenerator(12345))

oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshFlows)
assert.NoError(t, err)

rt.Config.DB = "postgres://mailroom_test:temba@localhost/mailroom_test?sslmode=disable&Timezone=UTC"

contactIDs := []models.ContactID{testdata.Cathy.ID}
start := models.NewFlowStart(models.OrgID(1), models.StartTypeManual, models.FlowTypeMessaging, testdata.SingleMessage.ID, true, true).
WithContactIDs(contactIDs)
batch := start.CreateBatch(contactIDs, true, len(contactIDs))

sessions, err := runner.StartFlowBatch(ctx, rt, batch)
assert.NoError(t, err)

csession := sessions[0]

fcs, err := csession.FlowSession(rt.Config, oa.SessionAssets(), oa.Env())
assert.NoError(t, err)

fruuid := string(fcs.Runs()[0].UUID())
msgs, err := models.SelectMessagesByFlowRun(ctx, db, fruuid)
assert.NoError(t, err)
assert.Equal(t, 1, len(msgs))
}
13 changes: 8 additions & 5 deletions services/tickets/wenichats/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,16 @@ func (s *service) Open(session flows.Session, topic *flows.Topic, body string, a
return nil, errors.Wrap(err, "failed to create wenichats room webhook")
}

// get messages for history
after := session.Runs()[0].CreatedOn()
cx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
msgs, err := models.SelectContactMessages(cx, db, int(contact.ID()), after)
if err != nil {
return nil, errors.Wrap(err, "failed to get history messages")
msgs := make([]*models.Msg, 0)
for _, fr := range session.Runs() {
fruuid := string(fr.UUID())
frmsgs, err := models.SelectMessagesByFlowRun(cx, db, fruuid)
if err != nil {
return nil, errors.Wrap(err, "failed to get history messages")
}
msgs = append(msgs, frmsgs...)
}

//send history
Expand Down
19 changes: 2 additions & 17 deletions services/tickets/wenichats/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/uuids"
Expand All @@ -26,7 +24,7 @@ import (
)

func TestOpenAndForward(t *testing.T) {
ctx, rt, _, _ := testsuite.Get()
ctx, rt, db, _ := testsuite.Get()
testsuite.Reset(testsuite.ResetData | testsuite.ResetStorage)

defer dates.SetNowSource(dates.DefaultNowSource)
Expand Down Expand Up @@ -260,20 +258,7 @@ func TestOpenAndForward(t *testing.T) {
)
assert.EqualError(t, err, "missing project_auth or sector_uuid")

mockDB, mock, _ := sqlmock.New()
defer mockDB.Close()
sqlxDB := sqlx.NewDb(mockDB, "sqlmock")

rows := sqlmock.NewRows([]string{"id", "uuid", "text", "high_priority", "created_on", "modified_on", "sent_on", "queued_on", "direction", "status", "visibility", "msg_type", "msg_count", "error_count", "next_attempt", "external_id", "attachments", "metadata", "broadcast_id", "channel_id", "contact_id", "contact_urn_id", "org_id", "topup_id"})

after, err := time.Parse("2006-01-02T15:04:05", "2019-10-07T15:21:30")
assert.NoError(t, err)

mock.ExpectQuery("SELECT").
WithArgs(1234567, after).
WillReturnRows(rows)

wenichats.SetDB(sqlxDB)
wenichats.SetDB(db)

svc, err := wenichats.NewService(
rt.Config,
Expand Down

0 comments on commit 91da703

Please sign in to comment.