Skip to content

Commit

Permalink
Move mongo specific error to mongodb/client. Add more tests to cover …
Browse files Browse the repository at this point in the history
…all read preferences. Allow for case insensitive read preference to be set. (+5 squashed commits)

Squashed commits:
[381efce] Fix failing tests
[82c48da] Unit tests for mongo read preference setting. Fixed other unit tests in light of the new default read preference.
[11e7ed9] Update readme to show read preference in the mongo config.
[ae4c94e] Remove mgo specific read preferences.
[b636058] Can now set mongo read preference in config, defaults to Primary and raises an error if the read preference is not known.
  • Loading branch information
SamBartrum committed Aug 14, 2017
1 parent 677afaf commit 60b614b
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 4 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ var source = mongodb({
// "wc": 1,
// "fsync": false,
// "bulk": false,
// "collection_filters": "{}"
// "collection_filters": "{}",
// "read_preference": "Primary"
})
var sink = elasticsearch({
Expand Down
1 change: 1 addition & 0 deletions adaptor/mongodb/adaptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
readPreference: DefaultReadPreference,
}
defaultSession *Session

Expand Down
34 changes: 31 additions & 3 deletions adaptor/mongodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"os"
"time"
"strings"

"github.com/compose/transporter/client"
"github.com/compose/transporter/log"
Expand All @@ -25,7 +26,7 @@ const (
DefaultSessionTimeout = 10 * time.Second

// DefaultReadPreference when connecting to a mongo replica set.
DefaultReadPreference = "Primary"
DefaultReadPreference = mgo.Primary
)

var (
Expand All @@ -46,6 +47,15 @@ func (e OplogAccessError) Error() string {
return fmt.Sprintf("oplog access failed, %s", e.reason)
}

// InvalidReadPreferenceError represents the error when an incorrect mongo read preference has been set.
type InvalidReadPreferenceError struct {
ReadPreference string
}

func (e InvalidReadPreferenceError) Error() string {
return fmt.Sprintf("Invalid Read Preference, %s", e.ReadPreference)
}

// ClientOptionFunc is a function that configures a Client.
// It is used in NewClient.
type ClientOptionFunc func(*Client) error
Expand All @@ -58,7 +68,7 @@ type Client struct {
tlsConfig *tls.Config
sessionTimeout time.Duration
tail bool
readPreference string
readPreference mgo.Mode

mgoSession *mgo.Session
}
Expand Down Expand Up @@ -196,7 +206,24 @@ func WithTail(tail bool) ClientOptionFunc {

func WithReadPreference(read_preference string) ClientOptionFunc {
return func(c *Client) error {
c.readPreference = read_preference
if read_preference == "" {
c.readPreference = DefaultReadPreference
return nil
}
switch strings.ToLower(read_preference) {
case "primary":
c.readPreference = mgo.Primary
case "primarypreferred":
c.readPreference = mgo.PrimaryPreferred
case "secondary":
c.readPreference = mgo.Secondary
case "secondarypreferred":
c.readPreference = mgo.SecondaryPreferred
case "nearest":
c.readPreference = mgo.Nearest
default:
return InvalidReadPreferenceError{ReadPreference: read_preference}
}
return nil
}
}
Expand Down Expand Up @@ -242,6 +269,7 @@ func (c *Client) initConnection() error {
mgoSession.SetBatch(1000)
mgoSession.SetPrefetch(0.5)
mgoSession.SetSocketTimeout(time.Hour)
mgoSession.SetMode(c.readPreference, true)

if c.tail {
log.With("uri", c.uri).Infoln("testing oplog access")
Expand Down
79 changes: 79 additions & 0 deletions adaptor/mongodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ var (
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
readPreference: DefaultReadPreference,
}

certPool = func() *x509.CertPool {
Expand Down Expand Up @@ -93,6 +94,7 @@ var clientTests = []struct {
uri: "mongodb://fakeurl:27017",
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -109,6 +111,8 @@ var clientTests = []struct {
uri: DefaultURI,
sessionTimeout: 30 * time.Second,
safety: DefaultSafety,
readPreference: DefaultReadPreference,

},
nil,
},
Expand All @@ -131,6 +135,7 @@ var clientTests = []struct {
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: mgo.Safe{W: 2},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -141,6 +146,7 @@ var clientTests = []struct {
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: mgo.Safe{FSync: true},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -152,6 +158,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tail: true,
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -163,6 +170,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: true, RootCAs: x509.NewCertPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -174,6 +182,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -185,6 +194,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -196,6 +206,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
&os.PathError{Op: "open", Path: "testdata/ca_no_perms.pem", Err: os.ErrPermission},
},
Expand All @@ -207,6 +218,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -218,9 +230,76 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
client.ErrInvalidCert,
},
{
"with_read_preference_invalid",
[]ClientOptionFunc{WithReadPreference("blah")},
&Client{},
InvalidReadPreferenceError{ReadPreference: "blah"},
},
{
"with_primary_read_preference",
[]ClientOptionFunc{WithReadPreference("Primary")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 2,
},
nil,
},
{
"with_primary_preferred_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("PrimaryPreferred")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 3,
},
nil,
},
{
"with_secondary_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("Secondary")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 4,
},
nil,
},
{
"with_secondary_preferred_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("SecondaryPrefered")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 5,
},
nil,
},
{
"with_nearest_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("Nearest")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 6,
},
nil,
},
{
"with_default_read_preference",
[]ClientOptionFunc{WithReadPreference("")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 2,
},
nil,
},
}

func TestNewClient(t *testing.T) {
Expand Down

0 comments on commit 60b614b

Please sign in to comment.