Skip to content

Commit

Permalink
Merge pull request rapidpro#247 from nyaruka/latest_goflow
Browse files Browse the repository at this point in the history
Update to latest goflow v0.77.0
  • Loading branch information
rowanseymour authored Mar 9, 2020
2 parents 98dd365 + f27e138 commit 8c81421
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 17 deletions.
28 changes: 28 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package config

import (
"encoding/csv"
"io"
"net"
"strings"

"github.com/pkg/errors"
)

// Mailroom is the global configuration
var Mailroom *Config

Expand Down Expand Up @@ -28,6 +37,7 @@ type Config struct {
WebhooksInitialBackoff int `help:"the initial backoff in milliseconds when retrying a failed webhook call"`
WebhooksBackoffJitter float64 `help:"the amount of jitter to apply to backoff times"`
SMTPServer string `help:"the smtp configuration for sending emails ex: smtp://user%40password@server:port/?from=foo%40gmail.com"`
DisallowedIPs string `help:"comma separated list of IP addresses which engine can't make HTTP calls to"`
MaxStepsPerSprint int `help:"the maximum number of steps allowed per engine sprint"`
MaxValueLength int `help:"the maximum size in characters for contact field values and run result values"`

Expand Down Expand Up @@ -71,6 +81,7 @@ func NewMailroomConfig() *Config {
WebhooksInitialBackoff: 5000,
WebhooksBackoffJitter: 0.5,
SMTPServer: "",
DisallowedIPs: `127.0.0.1,::1`,
MaxStepsPerSprint: 100,
MaxValueLength: 640,

Expand All @@ -89,3 +100,20 @@ func NewMailroomConfig() *Config {
Port: 8090,
}
}

func (c *Config) ParseDisallowedIPs() ([]net.IP, error) {
addrs, err := csv.NewReader(strings.NewReader(c.DisallowedIPs)).Read()
if err != nil && err != io.EOF {
return nil, err
}
ips := make([]net.IP, 0, len(addrs))
for _, addr := range addrs {
ip := net.ParseIP(addr)
if ip == nil {
return nil, errors.Errorf("couldn't parse '%s' as an IP address", addr)
}
ips = append(ips, ip)
}

return ips, nil
}
35 changes: 35 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package config_test

import (
"net"
"testing"

"github.com/nyaruka/mailroom/config"

"github.com/stretchr/testify/assert"
)

func TestParseDisallowedIPs(t *testing.T) {
cfg := config.NewMailroomConfig()

// test with config defaults
ips, err := cfg.ParseDisallowedIPs()
assert.NoError(t, err)
assert.Equal(t, []net.IP{net.IPv4(127, 0, 0, 1), net.ParseIP(`::1`)}, ips)

// test with empty
cfg.DisallowedIPs = ``
ips, err = cfg.ParseDisallowedIPs()
assert.NoError(t, err)
assert.Equal(t, []net.IP{}, ips)

// test with invalid CSV
cfg.DisallowedIPs = `"127.0.0.1`
_, err = cfg.ParseDisallowedIPs()
assert.EqualError(t, err, `record on line 1; parse error on line 2, column 0: extraneous or missing " in quoted-field`)

// test with invalid IP
cfg.DisallowedIPs = `127.0.1`
_, err = cfg.ParseDisallowedIPs()
assert.EqualError(t, err, `couldn't parse '127.0.1' as an IP address`)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/mattn/go-sqlite3 v1.10.0 // indirect
github.com/nyaruka/ezconf v0.2.1
github.com/nyaruka/gocommon v1.2.0
github.com/nyaruka/goflow v0.76.3
github.com/nyaruka/goflow v0.77.0
github.com/nyaruka/librato v1.0.0
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
github.com/nyaruka/null v1.2.0
Expand Down
11 changes: 3 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,8 @@ github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
github.com/nyaruka/gocommon v1.2.0 h1:gCmVCXYZFwKDMqQj8R1jNlK+7a06khKFq3zX8fBBbzw=
github.com/nyaruka/gocommon v1.2.0/go.mod h1:9Y21Fd6iZXDLHWTRiZAc6b4LQSCi6HEEQK4SB45Yav4=
github.com/nyaruka/goflow v0.76.2 h1:8tpJHc4FyVXrKgtRVe8dwd6wOX+zZvf8Hh2Q47NGb84=
github.com/nyaruka/goflow v0.76.2/go.mod h1:0hFgPf2WMomtbf6d7VO67I52RlXcNdLmV4Ik0DijtRg=
github.com/nyaruka/goflow v0.76.3 h1:8VxUcyiIkucbp9AUecWMAe7u8Nm+GBwCDrxxkrnVskQ=
github.com/nyaruka/goflow v0.76.3/go.mod h1:LoRoyfHkJNVEY34AH3qDSDf7Vdq7Ztj47kNx6ceWp4I=
github.com/nyaruka/goflow v0.77.0 h1:XbaFHrd+SGTimSp3BpRZj6neRolOr2jZKlj7u6aZs0Q=
github.com/nyaruka/goflow v0.77.0/go.mod h1:LoRoyfHkJNVEY34AH3qDSDf7Vdq7Ztj47kNx6ceWp4I=
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=
Expand Down Expand Up @@ -118,13 +116,10 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180921000356-2f5d2388922f/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181217023233-e147a9138326 h1:iCzOf0xz39Tstp+Tu/WwyGjUXCk34QhQORRxBeXXTA4=
golang.org/x/net v0.0.0-20181217023233-e147a9138326/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
Expand Down
16 changes: 10 additions & 6 deletions goflow/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var engInit, simulatorInit, webhooksHTTPInit sync.Once

var webhooksHTTPClient *http.Client
var webhooksHTTPRetries *httpx.RetryConfig
var webhookHTTPAccess *httpx.AccessConfig

var emailFactory engine.EmailServiceFactory
var classificationFactory engine.ClassificationServiceFactory
Expand Down Expand Up @@ -52,10 +53,10 @@ func Engine() flows.Engine {
"X-Mailroom-Mode": "normal",
}

httpClient, httpRetries := webhooksHTTP()
httpClient, httpRetries, httpAccess := WebhooksHTTP()

eng = engine.NewBuilder().
WithWebhookServiceFactory(webhooks.NewServiceFactory(httpClient, httpRetries, webhookHeaders, config.Mailroom.WebhooksMaxBodyBytes)).
WithWebhookServiceFactory(webhooks.NewServiceFactory(httpClient, httpRetries, httpAccess, webhookHeaders, config.Mailroom.WebhooksMaxBodyBytes)).
WithEmailServiceFactory(emailFactory).
WithClassificationServiceFactory(classificationFactory).
WithAirtimeServiceFactory(airtimeFactory).
Expand All @@ -74,10 +75,10 @@ func Simulator() flows.Engine {
"X-Mailroom-Mode": "simulation",
}

httpClient, _ := webhooksHTTP() // don't do retries in simulator
httpClient, _, httpAccess := WebhooksHTTP() // don't do retries in simulator

simulator = engine.NewBuilder().
WithWebhookServiceFactory(webhooks.NewServiceFactory(httpClient, nil, webhookHeaders, config.Mailroom.WebhooksMaxBodyBytes)).
WithWebhookServiceFactory(webhooks.NewServiceFactory(httpClient, nil, httpAccess, webhookHeaders, config.Mailroom.WebhooksMaxBodyBytes)).
WithClassificationServiceFactory(classificationFactory). // simulated sessions do real classification
WithEmailServiceFactory(simulatorEmailServiceFactory). // but faked emails
WithAirtimeServiceFactory(simulatorAirtimeServiceFactory). // and faked airtime transfers
Expand All @@ -88,7 +89,7 @@ func Simulator() flows.Engine {
return simulator
}

func webhooksHTTP() (*http.Client, *httpx.RetryConfig) {
func WebhooksHTTP() (*http.Client, *httpx.RetryConfig, *httpx.AccessConfig) {
webhooksHTTPInit.Do(func() {
// customize the default golang transport
t := http.DefaultTransport.(*http.Transport).Clone()
Expand All @@ -109,8 +110,11 @@ func webhooksHTTP() (*http.Client, *httpx.RetryConfig) {
config.Mailroom.WebhooksMaxRetries,
config.Mailroom.WebhooksBackoffJitter,
)

disallowedIPs, _ := config.Mailroom.ParseDisallowedIPs()
webhookHTTPAccess = httpx.NewAccessConfig(10*time.Second, disallowedIPs)
})
return webhooksHTTPClient, webhooksHTTPRetries
return webhooksHTTPClient, webhooksHTTPRetries, webhookHTTPAccess
}

func simulatorEmailServiceFactory(session flows.Session) (flows.EmailService, error) {
Expand Down
3 changes: 2 additions & 1 deletion models/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/jmoiron/sqlx"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/envs"
"github.com/nyaruka/null"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (c *Channel) TPS() int { return c.c.TPS }
func (c *Channel) Address() string { return c.c.Address }

// Country returns the contry code for this channel
func (c *Channel) Country() string { return string(c.c.Country) }
func (c *Channel) Country() envs.Country { return envs.Country(string(c.c.Country)) }

// Schemes returns the schemes this channel supports
func (c *Channel) Schemes() []string { return c.c.Schemes }
Expand Down
4 changes: 3 additions & 1 deletion models/classifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func (c *Classifier) Type() string { return c.c.Type }

// AsService builds the corresponding ClassificationService for the passed in Classifier
func (c *Classifier) AsService(httpClient *http.Client, httpRetries *httpx.RetryConfig, classifier *flows.Classifier) (flows.ClassificationService, error) {
_, _, httpAccess := goflow.WebhooksHTTP()

switch c.Type() {
case ClassifierTypeWit:
accessToken := c.c.Config[WitConfigAccessToken]
Expand All @@ -106,7 +108,7 @@ func (c *Classifier) AsService(httpClient *http.Client, httpRetries *httpx.Retry
return nil, errors.Errorf("missing %s, %s or %s on LUIS classifier: %s",
LuisConfigEndpointURL, LuisConfigAppID, LuisConfigPrimaryKey, c.UUID())
}
return luis.NewService(httpClient, httpRetries, classifier, endpoint, appID, key), nil
return luis.NewService(httpClient, httpRetries, httpAccess, classifier, endpoint, appID, key), nil

case ClassifierTypeBothub:
accessToken := c.c.Config[BothubConfigAccessToken]
Expand Down

0 comments on commit 8c81421

Please sign in to comment.