Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
expose MongoDB Read Preference, defaults to Primary (#393)
Browse files Browse the repository at this point in the history
  • Loading branch information
SamBartrum authored and jipperinbham committed Aug 14, 2017
1 parent a25a8e1 commit 9fa8ec7
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 3 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ var source = mongodb({
// "wc": 1,
// "fsync": false,
// "bulk": false,
// "collection_filters": "{}"
// "collection_filters": "{}",
// "read_preference": "Primary"
})
var sink = elasticsearch({
Expand Down
1 change: 1 addition & 0 deletions adaptor/mongodb/adaptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
readPreference: DefaultReadPreference,
}
defaultSession *Session

Expand Down
40 changes: 40 additions & 0 deletions adaptor/mongodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"os"
"time"
"strings"

"github.com/compose/transporter/client"
"github.com/compose/transporter/log"
Expand All @@ -23,6 +24,9 @@ const (
// DefaultSessionTimeout is the default timeout after which the
// session times out when unable to connect to the provided URI.
DefaultSessionTimeout = 10 * time.Second

// DefaultReadPreference when connecting to a mongo replica set.
DefaultReadPreference = mgo.Primary
)

var (
Expand All @@ -43,6 +47,15 @@ func (e OplogAccessError) Error() string {
return fmt.Sprintf("oplog access failed, %s", e.reason)
}

// InvalidReadPreferenceError represents the error when an incorrect mongo read preference has been set.
type InvalidReadPreferenceError struct {
ReadPreference string
}

func (e InvalidReadPreferenceError) Error() string {
return fmt.Sprintf("Invalid Read Preference, %s", e.ReadPreference)
}

// ClientOptionFunc is a function that configures a Client.
// It is used in NewClient.
type ClientOptionFunc func(*Client) error
Expand All @@ -55,6 +68,7 @@ type Client struct {
tlsConfig *tls.Config
sessionTimeout time.Duration
tail bool
readPreference mgo.Mode

mgoSession *mgo.Session
}
Expand All @@ -81,6 +95,7 @@ func NewClient(options ...ClientOptionFunc) (*Client, error) {
safety: DefaultSafety,
tlsConfig: nil,
tail: false,
readPreference: DefaultReadPreference,
}

// Run the options on it
Expand Down Expand Up @@ -189,6 +204,30 @@ func WithTail(tail bool) ClientOptionFunc {
}
}

func WithReadPreference(read_preference string) ClientOptionFunc {
return func(c *Client) error {
if read_preference == "" {
c.readPreference = DefaultReadPreference
return nil
}
switch strings.ToLower(read_preference) {
case "primary":
c.readPreference = mgo.Primary
case "primarypreferred":
c.readPreference = mgo.PrimaryPreferred
case "secondary":
c.readPreference = mgo.Secondary
case "secondarypreferred":
c.readPreference = mgo.SecondaryPreferred
case "nearest":
c.readPreference = mgo.Nearest
default:
return InvalidReadPreferenceError{ReadPreference: read_preference}
}
return nil
}
}

// Connect tests the mongodb connection and initializes the mongo session
func (c *Client) Connect() (client.Session, error) {
if c.mgoSession == nil {
Expand Down Expand Up @@ -230,6 +269,7 @@ func (c *Client) initConnection() error {
mgoSession.SetBatch(1000)
mgoSession.SetPrefetch(0.5)
mgoSession.SetSocketTimeout(time.Hour)
mgoSession.SetMode(c.readPreference, true)

if c.tail {
log.With("uri", c.uri).Infoln("testing oplog access")
Expand Down
84 changes: 84 additions & 0 deletions adaptor/mongodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ var errorTests = []struct {
"oplog access failed, database missing oplog.rs collection",
OplogAccessError{"database missing oplog.rs collection"},
},
{
"InvalidReadPreferenceError",
"Invalid Read Preference, fakeReadPreference",
InvalidReadPreferenceError{"fakeReadPreference"},
},
}

func TestErrors(t *testing.T) {
Expand Down Expand Up @@ -65,6 +70,7 @@ var (
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
readPreference: DefaultReadPreference,
}

certPool = func() *x509.CertPool {
Expand Down Expand Up @@ -93,6 +99,7 @@ var clientTests = []struct {
uri: "mongodb://fakeurl:27017",
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -109,6 +116,8 @@ var clientTests = []struct {
uri: DefaultURI,
sessionTimeout: 30 * time.Second,
safety: DefaultSafety,
readPreference: DefaultReadPreference,

},
nil,
},
Expand All @@ -131,6 +140,7 @@ var clientTests = []struct {
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: mgo.Safe{W: 2},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -141,6 +151,7 @@ var clientTests = []struct {
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: mgo.Safe{FSync: true},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -152,6 +163,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tail: true,
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -163,6 +175,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: true, RootCAs: x509.NewCertPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -174,6 +187,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -185,6 +199,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -196,6 +211,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
&os.PathError{Op: "open", Path: "testdata/ca_no_perms.pem", Err: os.ErrPermission},
},
Expand All @@ -207,6 +223,7 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
nil,
},
Expand All @@ -218,9 +235,76 @@ var clientTests = []struct {
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: &tls.Config{InsecureSkipVerify: false, RootCAs: certPool()},
readPreference: DefaultReadPreference,
},
client.ErrInvalidCert,
},
{
"with_read_preference_invalid",
[]ClientOptionFunc{WithReadPreference("blah")},
&Client{},
InvalidReadPreferenceError{ReadPreference: "blah"},
},
{
"with_primary_read_preference",
[]ClientOptionFunc{WithReadPreference("Primary")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 2,
},
nil,
},
{
"with_primary_preferred_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("PrimaryPreferred")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 3,
},
nil,
},
{
"with_secondary_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("Secondary")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 4,
},
nil,
},
{
"with_secondary_preferred_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("SecondaryPreferred")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 5,
},
nil,
},
{
"with_nearest_read_preference_valid",
[]ClientOptionFunc{WithReadPreference("Nearest")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 6,
},
nil,
},
{
"with_default_read_preference",
[]ClientOptionFunc{WithReadPreference("")},
&Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
readPreference: 2,
},
nil,
},
}

func TestNewClient(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions adaptor/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ const (
// "wc": 1,
// "fsync": false,
// "bulk": false,
// "collection_filters": "{}"
// "collection_filters": "{}",
// "read_preference": "Primary"
}`
)

Expand All @@ -43,6 +44,7 @@ type MongoDB struct {
FSync bool `json:"fsync"`
Bulk bool `json:"bulk"`
CollectionFilters string `json:"collection_filters"`
ReadPreference string `json:"read_preference"`
}

func init() {
Expand All @@ -61,7 +63,8 @@ func (m *MongoDB) Client() (client.Client, error) {
WithCACerts(m.CACerts),
WithFsync(m.FSync),
WithTail(m.Tail),
WithWriteConcern(m.Wc))
WithWriteConcern(m.Wc),
WithReadPreference(m.ReadPreference))
}

func (m *MongoDB) Reader() (client.Reader, error) {
Expand Down

0 comments on commit 9fa8ec7

Please sign in to comment.