Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
bagyenda committed Sep 14, 2020
2 parents cc9ddd7 + a9a6f44 commit 63cbe25
Show file tree
Hide file tree
Showing 14 changed files with 340 additions and 253 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
v5.7.23
----------
* Make defining new task types easier and drier
* Better locking when handling
* Fix and simplify creation of channel logs in IVR handlers

v5.7.22
----------
* Update to latest goflow v0.104.1

v5.7.21
----------
* Simplify test-smtp cmd using smtpx package
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ require (
github.com/lib/pq v1.4.0
github.com/mattn/go-sqlite3 v1.10.0 // indirect
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.5.0
github.com/nyaruka/goflow v0.104.0
github.com/nyaruka/gocommon v1.5.1
github.com/nyaruka/goflow v0.104.1
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand All @@ -31,7 +31,6 @@ require (
github.com/sirupsen/logrus v1.5.0
github.com/stretchr/testify v1.5.1
gopkg.in/go-playground/validator.v9 v9.31.0
gopkg.in/mail.v2 v2.3.1
)

go 1.14
14 changes: 6 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,18 @@ github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8=
github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
github.com/nyaruka/gocommon v1.5.0 h1:MYTSqfxun9QoZSTkX9vLF9s4vFVW37s+k/pELkknD8g=
github.com/nyaruka/gocommon v1.5.0/go.mod h1:tlsX12mmJMr/2tUKSdSm/6IFdMX2wp/s9GAeEAzZSU4=
github.com/nyaruka/goflow v0.104.0 h1:EUocd460MMst2LidxCrfVt3QjvY2/mG7/4lhtJ0+6lU=
github.com/nyaruka/goflow v0.104.0/go.mod h1:wwTXOxQoQAWpLxYm9hcmcvvlN30w7RR3CiNajHTAsr4=
github.com/nyaruka/gocommon v1.5.1 h1:2R6uo6EVSTHOerupAmVm6h5fyufO189dlv/5gwHj3lM=
github.com/nyaruka/gocommon v1.5.1/go.mod h1:6XoaOsVk6z+294hM6pZxX3fDgT2IyLV8hFU4FoQz9Aw=
github.com/nyaruka/goflow v0.104.1 h1:uFmB4dDJwuVJxgcJFEnbXzFOYrqNiiJLnlxd0t6yXxg=
github.com/nyaruka/goflow v0.104.1/go.mod h1:dZBsFXFQ9EzcDlEupM7rcbMdDpIGUNOCMndSCRo7Ofo=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d/go.mod h1:FGdPJVDTNqbRAD+2RvnK9YoO2HcEW7ogSMPzc90b638=
github.com/nyaruka/null v1.2.0 h1:uEbkyy4Z+zPB2Pr3ryQh/0N2965I9kEsXq/cGpyJ7PA=
github.com/nyaruka/null v1.2.0/go.mod h1:HSAFbLNOaEhHnoU0VCveCPz0GDtJ3GEtFWhvnBNkhPE=
github.com/nyaruka/phonenumbers v1.0.34/go.mod h1:GQ0cTHlrxPrhoLwyQ1blyN1hO794ygt6FTHWrFB5SSc=
github.com/nyaruka/phonenumbers v1.0.55 h1:bj0nTO88Y68KeUQ/n3Lo2KgK7lM1hF7L9NFuwcCl3yg=
github.com/nyaruka/phonenumbers v1.0.55/go.mod h1:sDaTZ/KPX5f8qyV9qN+hIm+4ZBARJrupC6LuhshJq1U=
github.com/nyaruka/phonenumbers v1.0.57 h1:V4FNPs061PSUOEzQaLH0+pfzEdqoiMH/QJWryx/0hfs=
github.com/nyaruka/phonenumbers v1.0.57/go.mod h1:sDaTZ/KPX5f8qyV9qN+hIm+4ZBARJrupC6LuhshJq1U=
github.com/olivere/elastic v6.2.33+incompatible h1:SRPB2w2OhJ7iULftDEHsNPRoL2GLREqPMRalVmbZaEw=
github.com/olivere/elastic v6.2.33+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -198,7 +197,6 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180921000356-2f5d2388922f/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down
15 changes: 14 additions & 1 deletion models/channel_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package models

import (
"context"
"fmt"
"net/http"
"time"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -52,9 +54,12 @@ func NewChannelLog(trace *httpx.Trace, isError bool, desc string, channel *Chann
statusCode = trace.Response.StatusCode
}

// if URL was rewritten (by nginx for example), we want to log the original request
url := originalURL(trace.Request)

l.Description = desc
l.IsError = isError
l.URL = trace.Request.URL.String()
l.URL = url
l.Method = trace.Request.Method
l.Request = string(trace.RequestTrace)
l.Response = trace.ResponseTraceUTF8("...")
Expand All @@ -68,6 +73,14 @@ func NewChannelLog(trace *httpx.Trace, isError bool, desc string, channel *Chann
return log
}

func originalURL(r *http.Request) string {
proxyPath := r.Header.Get("X-Forwarded-Path")
if proxyPath != "" {
return fmt.Sprintf("https://%s%s", r.Host, proxyPath)
}
return r.URL.String()
}

// InsertChannelLogs writes the given channel logs to the db
func InsertChannelLogs(ctx context.Context, db *sqlx.DB, logs []*ChannelLog) error {
ls := make([]interface{}, len(logs))
Expand Down
10 changes: 8 additions & 2 deletions models/channel_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestChannelLogs(t *testing.T) {
httpx.SetRequestor(httpx.NewMockRequestor(map[string][]httpx.MockResponse{
"http://rapidpro.io": {httpx.NewMockResponse(200, nil, "OK")},
"http://rapidpro.io/bad": {httpx.NewMockResponse(400, nil, "Oops")},
"http://rapidpro.io/new": {httpx.NewMockResponse(200, nil, "OK")},
}))

oa, err := models.GetOrgAssets(ctx, db, models.Org1)
Expand All @@ -36,10 +37,15 @@ func TestChannelLogs(t *testing.T) {
trace2, err := httpx.DoTrace(http.DefaultClient, req2, nil, nil, -1)
log2 := models.NewChannelLog(trace2, true, "test request", channel, nil)

err = models.InsertChannelLogs(ctx, db, []*models.ChannelLog{log1, log2})
req3, _ := httpx.NewRequest("GET", "http://rapidpro.io/new", nil, map[string]string{"X-Forwarded-Path": "/old"})
trace3, err := httpx.DoTrace(http.DefaultClient, req3, nil, nil, -1)
log3 := models.NewChannelLog(trace3, false, "test request", channel, nil)

err = models.InsertChannelLogs(ctx, db, []*models.ChannelLog{log1, log2, log3})
require.NoError(t, err)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM channels_channellog`, nil, 2)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM channels_channellog`, nil, 3)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM channels_channellog WHERE url = 'http://rapidpro.io' AND is_error = FALSE AND channel_id = $1`, []interface{}{channel.ID()}, 1)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM channels_channellog WHERE url = 'http://rapidpro.io/bad' AND is_error = TRUE AND channel_id = $1`, []interface{}{channel.ID()}, 1)
testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM channels_channellog WHERE url = 'https://rapidpro.io/old' AND is_error = FALSE AND channel_id = $1`, []interface{}{channel.ID()}, 1)
}
6 changes: 0 additions & 6 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ const (

// StartIVRFlowBatch is our task for starting an ivr batch
StartIVRFlowBatch = "start_ivr_flow_batch"

// InterruptSessions is our task type to interrupt a set of sessions
InterruptSessions = "interrupt_sessions"

// PopulateDynamicGroup is our task to populate the contacts for a dynamic group
PopulateDynamicGroup = "populate_dynamic_group"
)

// Size returns the number of tasks for the passed in queue
Expand Down
50 changes: 50 additions & 0 deletions tasks/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package tasks

import (
"context"
"encoding/json"

"github.com/nyaruka/goflow/utils"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/queue"

"github.com/pkg/errors"
)

var registeredTypes = map[string](func() Task){}

// RegisterType registers a new type of task
func RegisterType(name string, initFunc func() Task) {
registeredTypes[name] = initFunc

mailroom.AddTaskFunction(name, func(ctx context.Context, mr *mailroom.Mailroom, task *queue.Task) error {
// decode our task body
typedTask, err := ReadTask(task.Type, task.Task)
if err != nil {
return errors.Wrapf(err, "error reading task of type %s", task.Type)
}

return typedTask.Perform(ctx, mr)
})
}

// Task is the common interface for all task types
type Task interface {
// Perform performs the task
Perform(ctx context.Context, mr *mailroom.Mailroom) error
}

//------------------------------------------------------------------------------------------
// JSON Encoding / Decoding
//------------------------------------------------------------------------------------------

// ReadTask reads an action from the given JSON
func ReadTask(typeName string, data json.RawMessage) (Task, error) {
f := registeredTypes[typeName]
if f == nil {
return nil, errors.Errorf("unknown task type: '%s'", typeName)
}

task := f()
return task, utils.UnmarshalAndValidate(data, task)
}
26 changes: 26 additions & 0 deletions tasks/base_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package tasks_test

import (
"testing"

"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/tasks"
"github.com/nyaruka/mailroom/tasks/groups"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestReadTask(t *testing.T) {
task, err := tasks.ReadTask("populate_dynamic_group", []byte(`{
"org_id": 2,
"group_id": 23,
"query": "gender = F"
}`))
require.NoError(t, err)

typedTask := task.(*groups.PopulateTask)
assert.Equal(t, models.OrgID(2), typedTask.OrgID)
assert.Equal(t, models.GroupID(23), typedTask.GroupID)
assert.Equal(t, "gender = F", typedTask.Query)
}
29 changes: 11 additions & 18 deletions tasks/groups/worker.go → tasks/groups/populate_dynamic_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,39 @@ package groups

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/queue"
"github.com/nyaruka/mailroom/tasks"
"github.com/nyaruka/mailroom/utils/locker"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// TypePopulateDynamicGroup is the type of the populate group task
const TypePopulateDynamicGroup = "populate_dynamic_group"

const populateLockKey string = "pop_dyn_group_%d"

func init() {
mailroom.AddTaskFunction(queue.PopulateDynamicGroup, handlePopulateDynamicGroup)
tasks.RegisterType(TypePopulateDynamicGroup, func() tasks.Task { return &PopulateTask{} })
}

// PopulateTask is our definition of our group population
// PopulateTask is our task to populate the contacts for a dynamic group
type PopulateTask struct {
OrgID models.OrgID `json:"org_id"`
GroupID models.GroupID `json:"group_id"`
Query string `json:"query"`
}

const populateLockKey string = "pop_dyn_group_%d"

// handlePopulateDynamicGroup figures out the membership for a dynamic group then repopulates it
func handlePopulateDynamicGroup(ctx context.Context, mr *mailroom.Mailroom, task *queue.Task) error {
// Perform figures out the membership for a query based group then repopulates it
func (t *PopulateTask) Perform(ctx context.Context, mr *mailroom.Mailroom) error {
ctx, cancel := context.WithTimeout(ctx, time.Hour)
defer cancel()

// decode our task body
if task.Type != queue.PopulateDynamicGroup {
return errors.Errorf("unknown event type passed to populate dynamic group worker: %s", task.Type)
}
t := &PopulateTask{}
err := json.Unmarshal(task.Task, t)
if err != nil {
return errors.Wrapf(err, "error unmarshalling task: %s", string(task.Task))
}

lockKey := fmt.Sprintf(populateLockKey, t.GroupID)
lock, err := locker.GrabLock(mr.RP, lockKey, time.Hour, time.Minute*5)
if err != nil {
Expand Down
78 changes: 78 additions & 0 deletions tasks/groups/populate_dynamic_group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package groups_test

import (
"fmt"
"testing"

"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/mailroom"
"github.com/nyaruka/mailroom/config"
"github.com/nyaruka/mailroom/models"
"github.com/nyaruka/mailroom/tasks/groups"
"github.com/nyaruka/mailroom/testsuite"

"github.com/olivere/elastic"
"github.com/stretchr/testify/require"
)

func TestPopulateTask(t *testing.T) {
testsuite.Reset()
ctx := testsuite.CTX()
db := testsuite.DB()

mes := testsuite.NewMockElasticServer()
defer mes.Close()

es, err := elastic.NewClient(
elastic.SetURL(mes.URL()),
elastic.SetHealthcheck(false),
elastic.SetSniff(false),
)
require.NoError(t, err)

mr := &mailroom.Mailroom{Config: config.Mailroom, DB: db, RP: testsuite.RP(), ElasticClient: es}

mes.NextResponse = fmt.Sprintf(`{
"_scroll_id": "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAbgc0WS1hqbHlfb01SM2lLTWJRMnVOSVZDdw==",
"took": 2,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 1,
"max_score": null,
"hits": [
{
"_index": "contacts",
"_type": "_doc",
"_id": "%d",
"_score": null,
"_routing": "1",
"sort": [15124352]
}
]
}
}`, models.CathyID)

var groupID models.GroupID
err = db.Get(&groupID,
`INSERT INTO contacts_contactgroup(uuid, org_id, group_type, name, query, status, is_active, created_by_id, created_on, modified_by_id, modified_on)
VALUES($1, $2, 'U', $3, $4, 'R', TRUE, 1, NOW(), 1, NOW()) RETURNING id`,
uuids.New(), models.Org1, "Women", "gender = F",
)
require.NoError(t, err)

task := &groups.PopulateTask{
OrgID: models.Org1,
GroupID: groupID,
Query: "gender = F",
}
err = task.Perform(ctx, mr)
require.NoError(t, err)

testsuite.AssertQueryCount(t, db, `SELECT count(*) FROM contacts_contactgroup_contacts WHERE contactgroup_id = $1`, []interface{}{groupID}, 1)
}
Loading

0 comments on commit 63cbe25

Please sign in to comment.