From 3306ac09e0227c8e89b46bbb61b67a92c9f7ff9f Mon Sep 17 00:00:00 2001 From: Sam Bartrum Date: Sat, 29 Jul 2017 07:56:57 +0100 Subject: [PATCH] Fix typo in test and add new test for InvalidReadPreferrenceError (+2 squashed commits) Squashed commits: [60b614b] Move mongo specific error to mongodb/client. Add more tests to cover all read preferences. Allow for case insensitive read preference to be set. (+5 squashed commits) Squashed commits: [381efce] Fix failing tests [82c48da] Unit tests for mongo read preference setting. Fixed other unit tests in light of the new default read preference. [11e7ed9] Update readme to show read preference in the mongo config. [ae4c94e] Remove mgo specific read preferences. [b636058] Can now set mongo read preference in config, defaults to Primary and raises an error if the read preference is not known. [677afaf] Initial commit. Starting to expose option to set mongo read preference. --- README.md | 3 +- adaptor/mongodb/adaptor_test.go | 1 + adaptor/mongodb/client.go | 40 ++++++++++++++++ adaptor/mongodb/client_test.go | 84 +++++++++++++++++++++++++++++++++ adaptor/mongodb/mongodb.go | 7 ++- 5 files changed, 132 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index a68934eff..9bef87d92 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,8 @@ var source = mongodb({ // "wc": 1, // "fsync": false, // "bulk": false, - // "collection_filters": "{}" + // "collection_filters": "{}", + // "read_preference": "Primary" }) var sink = elasticsearch({ diff --git a/adaptor/mongodb/adaptor_test.go b/adaptor/mongodb/adaptor_test.go index 4097d69d7..e17a66960 100644 --- a/adaptor/mongodb/adaptor_test.go +++ b/adaptor/mongodb/adaptor_test.go @@ -15,6 +15,7 @@ var ( uri: DefaultURI, sessionTimeout: DefaultSessionTimeout, safety: DefaultSafety, + readPreference: DefaultReadPreference, } defaultSession *Session diff --git a/adaptor/mongodb/client.go b/adaptor/mongodb/client.go index ce4bb36f9..21e9b57aa 100644 --- a/adaptor/mongodb/client.go +++ b/adaptor/mongodb/client.go @@ -8,6 +8,7 @@ import ( "net" "os" "time" + "strings" "github.com/compose/transporter/client" "github.com/compose/transporter/log" @@ -23,6 +24,9 @@ const ( // DefaultSessionTimeout is the default timeout after which the // session times out when unable to connect to the provided URI. DefaultSessionTimeout = 10 * time.Second + + // DefaultReadPreference when connecting to a mongo replica set. + DefaultReadPreference = mgo.Primary ) var ( @@ -43,6 +47,15 @@ func (e OplogAccessError) Error() string { return fmt.Sprintf("oplog access failed, %s", e.reason) } +// InvalidReadPreferenceError represents the error when an incorrect mongo read preference has been set. +type InvalidReadPreferenceError struct { + ReadPreference string +} + +func (e InvalidReadPreferenceError) Error() string { + return fmt.Sprintf("Invalid Read Preference, %s", e.ReadPreference) +} + // ClientOptionFunc is a function that configures a Client. // It is used in NewClient. type ClientOptionFunc func(*Client) error @@ -55,6 +68,7 @@ type Client struct { tlsConfig *tls.Config sessionTimeout time.Duration tail bool + readPreference mgo.Mode mgoSession *mgo.Session } @@ -81,6 +95,7 @@ func NewClient(options ...ClientOptionFunc) (*Client, error) { safety: DefaultSafety, tlsConfig: nil, tail: false, + readPreference: DefaultReadPreference, } // Run the options on it @@ -189,6 +204,30 @@ func WithTail(tail bool) ClientOptionFunc { } } +func WithReadPreference(read_preference string) ClientOptionFunc { + return func(c *Client) error { + if read_preference == "" { + c.readPreference = DefaultReadPreference + return nil + } + switch strings.ToLower(read_preference) { + case "primary": + c.readPreference = mgo.Primary + case "primarypreferred": + c.readPreference = mgo.PrimaryPreferred + case "secondary": + c.readPreference = mgo.Secondary + case "secondarypreferred": + c.readPreference = mgo.SecondaryPreferred + case "nearest": + c.readPreference = mgo.Nearest + default: + return InvalidReadPreferenceError{ReadPreference: read_preference} + } + return nil + } +} + // Connect tests the mongodb connection and initializes the mongo session func (c *Client) Connect() (client.Session, error) { if c.mgoSession == nil { @@ -230,6 +269,7 @@ func (c *Client) initConnection() error { mgoSession.SetBatch(1000) mgoSession.SetPrefetch(0.5) mgoSession.SetSocketTimeout(time.Hour) + mgoSession.SetMode(c.readPreference, true) if c.tail { log.With("uri", c.uri).Infoln("testing oplog access") diff --git a/adaptor/mongodb/client_test.go b/adaptor/mongodb/client_test.go index 16198cbd3..19f43e95e 100644 --- a/adaptor/mongodb/client_test.go +++ b/adaptor/mongodb/client_test.go @@ -24,6 +24,11 @@ var errorTests = []struct { "oplog access failed, database missing oplog.rs collection", OplogAccessError{"database missing oplog.rs collection"}, }, + { + "InvalidReadPreferenceError", + "Invalid Read Preference, fakeReadPreference", + InvalidReadPreferenceError{"fakeReadPreference"}, + }, } func TestErrors(t *testing.T) { @@ -65,6 +70,7 @@ var ( uri: DefaultURI, sessionTimeout: DefaultSessionTimeout, safety: DefaultSafety, + readPreference: DefaultReadPreference, } certPool = func() *x509.CertPool { @@ -93,6 +99,7 @@ var clientTests = []struct { uri: "mongodb://fakeurl:27017", sessionTimeout: DefaultSessionTimeout, safety: DefaultSafety, + readPreference: DefaultReadPreference, }, nil, }, @@ -109,6 +116,8 @@ var clientTests = []struct { uri: DefaultURI, sessionTimeout: 30 * time.Second, safety: DefaultSafety, + readPreference: DefaultReadPreference, + }, nil, }, @@ -131,6 +140,7 @@ var clientTests = []struct { uri: DefaultURI, sessionTimeout: DefaultSessionTimeout, safety: mgo.Safe{W: 2}, + readPreference: DefaultReadPreference, }, nil, }, @@ -141,6 +151,7 @@ var clientTests = []struct { uri: DefaultURI, sessionTimeout: DefaultSessionTimeout, safety: mgo.Safe{FSync: true}, + readPreference: DefaultReadPreference, }, nil, }, @@ -152,6 +163,7 @@ var clientTests = []struct { sessionTimeout: DefaultSessionTimeout, safety: DefaultSafety, tail: true, + readPreference: DefaultReadPreference, }, nil, }, @@ -163,6 +175,7 @@ var clientTests = []struct { sessionTimeout: DefaultSessionTimeout, safety: DefaultSafety, tlsConfig: &tls.Config{InsecureSkipVerify: true, RootCAs: x509.NewCertPool()}, + readPreference: DefaultReadPreference, }, nil, }, @@ -174,6 +187,7 @@ var clientTests = []struct { sessionTimeout: DefaultSessionTimeout, safety: DefaultSafety, tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()}, + readPreference: DefaultReadPreference, }, nil, }, @@ -185,6 +199,7 @@ var clientTests = []struct { sessionTimeout: DefaultSessionTimeout, safety: DefaultSafety, tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()}, + readPreference: DefaultReadPreference, }, nil, }, @@ -196,6 +211,7 @@ var clientTests = []struct { sessionTimeout: DefaultSessionTimeout, safety: DefaultSafety, tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()}, + readPreference: DefaultReadPreference, }, &os.PathError{Op: "open", Path: "testdata/ca_no_perms.pem", Err: os.ErrPermission}, }, @@ -207,6 +223,7 @@ var clientTests = []struct { sessionTimeout: DefaultSessionTimeout, safety: DefaultSafety, tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()}, + readPreference: DefaultReadPreference, }, nil, }, @@ -218,9 +235,76 @@ var clientTests = []struct { sessionTimeout: DefaultSessionTimeout, safety: DefaultSafety, tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()}, + readPreference: DefaultReadPreference, }, client.ErrInvalidCert, }, + { + "with_read_preference_invalid", + []ClientOptionFunc{WithReadPreference("blah")}, + &Client{}, + InvalidReadPreferenceError{ReadPreference: "blah"}, + }, + { + "with_primary_read_preference", + []ClientOptionFunc{WithReadPreference("Primary")}, + &Client{ + uri: DefaultURI, + sessionTimeout: DefaultSessionTimeout, + readPreference: 2, + }, + nil, + }, + { + "with_primary_preferred_read_preference_valid", + []ClientOptionFunc{WithReadPreference("PrimaryPreferred")}, + &Client{ + uri: DefaultURI, + sessionTimeout: DefaultSessionTimeout, + readPreference: 3, + }, + nil, + }, + { + "with_secondary_read_preference_valid", + []ClientOptionFunc{WithReadPreference("Secondary")}, + &Client{ + uri: DefaultURI, + sessionTimeout: DefaultSessionTimeout, + readPreference: 4, + }, + nil, + }, + { + "with_secondary_preferred_read_preference_valid", + []ClientOptionFunc{WithReadPreference("SecondaryPreferred")}, + &Client{ + uri: DefaultURI, + sessionTimeout: DefaultSessionTimeout, + readPreference: 5, + }, + nil, + }, + { + "with_nearest_read_preference_valid", + []ClientOptionFunc{WithReadPreference("Nearest")}, + &Client{ + uri: DefaultURI, + sessionTimeout: DefaultSessionTimeout, + readPreference: 6, + }, + nil, + }, + { + "with_default_read_preference", + []ClientOptionFunc{WithReadPreference("")}, + &Client{ + uri: DefaultURI, + sessionTimeout: DefaultSessionTimeout, + readPreference: 2, + }, + nil, + }, } func TestNewClient(t *testing.T) { diff --git a/adaptor/mongodb/mongodb.go b/adaptor/mongodb/mongodb.go index 1f11af0bb..91dd717a3 100644 --- a/adaptor/mongodb/mongodb.go +++ b/adaptor/mongodb/mongodb.go @@ -21,7 +21,8 @@ const ( // "wc": 1, // "fsync": false, // "bulk": false, - // "collection_filters": "{}" + // "collection_filters": "{}", + // "read_preference": "Primary" }` ) @@ -43,6 +44,7 @@ type MongoDB struct { FSync bool `json:"fsync"` Bulk bool `json:"bulk"` CollectionFilters string `json:"collection_filters"` + ReadPreference string `json:"read_preference"` } func init() { @@ -61,7 +63,8 @@ func (m *MongoDB) Client() (client.Client, error) { WithCACerts(m.CACerts), WithFsync(m.FSync), WithTail(m.Tail), - WithWriteConcern(m.Wc)) + WithWriteConcern(m.Wc), + WithReadPreference(m.ReadPreference)) } func (m *MongoDB) Reader() (client.Reader, error) {