Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Mongo read preference #393

Merged
merged 2 commits into from
Aug 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ var source = mongodb({
// "wc": 1,
// "fsync": false,
// "bulk": false,
// "collection_filters": "{}"
// "collection_filters": "{}",
// "read_preference": "Primary"
})

var sink = elasticsearch({
Expand Down
1 change: 1 addition & 0 deletions adaptor/mongodb/adaptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
readPreference: DefaultReadPreference,
}
defaultSession *Session

Expand Down
40 changes: 40 additions & 0 deletions adaptor/mongodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"os"
"time"
"strings"

"github.com/compose/transporter/client"
"github.com/compose/transporter/log"
Expand All @@ -23,6 +24,9 @@ const (
// DefaultSessionTimeout is the default timeout after which the
// session times out when unable to connect to the provided URI.
DefaultSessionTimeout = 10 * time.Second

// DefaultReadPreference when connecting to a mongo replica set.
DefaultReadPreference = mgo.Primary
)

var (
Expand All @@ -43,6 +47,15 @@ func (e OplogAccessError) Error() string {
return fmt.Sprintf("oplog access failed, %s", e.reason)
}

// InvalidReadPreferenceError represents the error when an incorrect mongo read preference has been set.
type InvalidReadPreferenceError struct {
ReadPreference string
}

func (e InvalidReadPreferenceError) Error() string {
return fmt.Sprintf("Invalid Read Preference, %s", e.ReadPreference)
}

// ClientOptionFunc is a function that configures a Client.
// It is used in NewClient.
type ClientOptionFunc func(*Client) error
Expand All @@ -55,6 +68,7 @@ type Client struct {
tlsConfig *tls.Config
sessionTimeout time.Duration
tail bool
readPreference mgo.Mode

mgoSession *mgo.Session
}
Expand All @@ -81,6 +95,7 @@ func NewClient(options ...ClientOptionFunc) (*Client, error) {
safety: DefaultSafety,
tlsConfig: nil,
tail: false,
readPreference: DefaultReadPreference,
}

// Run the options on it
Expand Down Expand Up @@ -189,6 +204,30 @@ func WithTail(tail bool) ClientOptionFunc {
}
}

func WithReadPreference(read_preference string) ClientOptionFunc {
return func(c *Client) error {
if read_preference == "" {
c.readPreference = DefaultReadPreference
return nil
}
switch strings.ToLower(read_preference) {
case "primary":
c.readPreference = mgo.Primary
case "primarypreferred":
c.readPreference = mgo.PrimaryPreferred
case "secondary":
c.readPreference = mgo.Secondary
case "secondarypreferred":
c.readPreference = mgo.SecondaryPreferred
case "nearest":
c.readPreference = mgo.Nearest
default:
return InvalidReadPreferenceError{ReadPreference: read_preference}
}
return nil
}
}

// Connect tests the mongodb connection and initializes the mongo session
func (c *Client) Connect() (client.Session, error) {
if c.mgoSession == nil {
Expand Down Expand Up @@ -230,6 +269,7 @@ func (c *Client) initConnection() error {
mgoSession.SetBatch(1000)
mgoSession.SetPrefetch(0.5)
mgoSession.SetSocketTimeout(time.Hour)
mgoSession.SetMode(c.readPreference, true)

if c.tail {
log.With("uri", c.uri).Infoln("testing oplog access")
Expand Down
84 changes: 84 additions & 0 deletions adaptor/mongodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ var errorTests = []struct {
"oplog access failed, database missing oplog.rs collection",
OplogAccessError{"database missing oplog.rs collection"},
},
{
"InvalidReadPreferenceError",
"Invalid Read Preference, fakeReadPreference",
InvalidReadPreferenceError{"fakeReadPreference"},
},
}

func TestErrors(t *testing.T) {
Expand Down Expand Up @@ -65,6 +70,7 @@ var (
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
readPreference: DefaultReadPreference,
}

certPool = func() *x509.CertPool {
Expand Down Expand Up @@ -93,6 +99,7 @@ var clientTests = []struct {
uri: "mongodb://fakeurl:27017",
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -109,6 +116,8 @@ var clientTests = []struct {
uri: DefaultURI,
sessionTimeout: 30 * time.Second,
safety: DefaultSafety,
readPreference: DefaultReadPreference,

},
nil,
},
Expand All @@ -131,6 +140,7 @@ var clientTests = []struct {
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: mgo.Safe{W: 2},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -141,6 +151,7 @@ var clientTests = []struct {
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: mgo.Safe{FSync: true},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -152,6 +163,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tail: true,
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -163,6 +175,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: true, RootCAs: x509.NewCertPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -174,6 +187,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -185,6 +199,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -196,6 +211,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
&os.PathError{Op: "open", Path: "testdata/ca_no_perms.pem", Err: os.ErrPermission},
},
Expand All @@ -207,6 +223,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -218,9 +235,76 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
client.ErrInvalidCert,
},
{
"with_read_preference_invalid",
[]ClientOptionFunc{WithReadPreference("blah")},
&Client{},
InvalidReadPreferenceError{ReadPreference: "blah"},
},
{
"with_primary_read_preference",
[]ClientOptionFunc{WithReadPreference("Primary")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 2,
},
nil,
},
{
"with_primary_preferred_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("PrimaryPreferred")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 3,
},
nil,
},
{
"with_secondary_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("Secondary")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 4,
},
nil,
},
{
"with_secondary_preferred_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("SecondaryPreferred")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 5,
},
nil,
},
{
"with_nearest_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("Nearest")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 6,
},
nil,
},
{
"with_default_read_preference",
[]ClientOptionFunc{WithReadPreference("")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 2,
},
nil,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add test cases to cover all possible preferences

}

func TestNewClient(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions adaptor/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ const (
// "wc": 1,
// "fsync": false,
// "bulk": false,
// "collection_filters": "{}"
// "collection_filters": "{}",
// "read_preference": "Primary"
}`
)

Expand All @@ -43,6 +44,7 @@ type MongoDB struct {
FSync bool `json:"fsync"`
Bulk bool `json:"bulk"`
CollectionFilters string `json:"collection_filters"`
ReadPreference string `json:"read_preference"`
}

func init() {
Expand All @@ -61,7 +63,8 @@ func (m *MongoDB) Client() (client.Client, error) {
WithCACerts(m.CACerts),
WithFsync(m.FSync),
WithTail(m.Tail),
WithWriteConcern(m.Wc))
WithWriteConcern(m.Wc),
WithReadPreference(m.ReadPreference))
}

func (m *MongoDB) Reader() (client.Reader, error) {
Expand Down