Skip to content

Commit

Permalink
GODRIVER-2497 Always set causalConsistency=false for implicit session…
Browse files Browse the repository at this point in the history
…s. (#1176)

Co-authored-by: Preston Vasquez <prestonvasquez@icloud.com>
  • Loading branch information
matthewdale and prestonvasquez committed Feb 11, 2023
1 parent 4d68f59 commit 9944eff
Showing 13 changed files with 708 additions and 129 deletions.
5 changes: 1 addition & 4 deletions mongo/change_stream.go
Original file line number Diff line number Diff line change
@@ -119,10 +119,7 @@ func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline in

cs.sess = sessionFromContext(ctx)
if cs.sess == nil && cs.client.sessionPool != nil {
cs.sess, cs.err = session.NewClientSession(cs.client.sessionPool, cs.client.id, session.Implicit)
if cs.err != nil {
return nil, cs.Err()
}
cs.sess = session.NewImplicitClientSession(cs.client.sessionPool, cs.client.id)
}
if cs.err = cs.client.validSession(cs.sess); cs.err != nil {
closeImplicitSession(cs.sess)
7 changes: 2 additions & 5 deletions mongo/client.go
Original file line number Diff line number Diff line change
@@ -387,7 +387,7 @@ func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error)
coreOpts.Snapshot = sopts.Snapshot
}

sess, err := session.NewClientSession(c.sessionPool, c.id, session.Explicit, coreOpts)
sess, err := session.NewClientSession(c.sessionPool, c.id, coreOpts)
if err != nil {
return nil, replaceErrors(err)
}
@@ -648,10 +648,7 @@ func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...
return ListDatabasesResult{}, err
}
if sess == nil && c.sessionPool != nil {
sess, err = session.NewClientSession(c.sessionPool, c.id, session.Implicit)
if err != nil {
return ListDatabasesResult{}, err
}
sess = session.NewImplicitClientSession(c.sessionPool, c.id)
defer sess.EndSession()
}

62 changes: 12 additions & 50 deletions mongo/collection.go
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ type aggregateParams struct {
}

func closeImplicitSession(sess *session.Client) {
if sess != nil && sess.SessionType == session.Implicit {
if sess != nil && sess.IsImplicit {
sess.EndSession()
}
}
@@ -187,11 +187,7 @@ func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel,

sess := sessionFromContext(ctx)
if sess == nil && coll.client.sessionPool != nil {
var err error
sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
if err != nil {
return nil, err
}
sess = session.NewImplicitClientSession(coll.client.sessionPool, coll.client.id)
defer sess.EndSession()
}

@@ -255,11 +251,7 @@ func (coll *Collection) insert(ctx context.Context, documents []interface{},

sess := sessionFromContext(ctx)
if sess == nil && coll.client.sessionPool != nil {
var err error
sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
if err != nil {
return nil, err
}
sess = session.NewImplicitClientSession(coll.client.sessionPool, coll.client.id)
defer sess.EndSession()
}

@@ -415,10 +407,7 @@ func (coll *Collection) delete(ctx context.Context, filter interface{}, deleteOn

sess := sessionFromContext(ctx)
if sess == nil && coll.client.sessionPool != nil {
sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
if err != nil {
return nil, err
}
sess = session.NewImplicitClientSession(coll.client.sessionPool, coll.client.id)
defer sess.EndSession()
}

@@ -546,11 +535,7 @@ func (coll *Collection) updateOrReplace(ctx context.Context, filter bsoncore.Doc

sess := sessionFromContext(ctx)
if sess == nil && coll.client.sessionPool != nil {
var err error
sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
if err != nil {
return nil, err
}
sess = session.NewImplicitClientSession(coll.client.sessionPool, coll.client.id)
defer sess.EndSession()
}

@@ -801,10 +786,7 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) {
}
}()
if sess == nil && a.client.sessionPool != nil {
sess, err = session.NewClientSession(a.client.sessionPool, a.client.id, session.Implicit)
if err != nil {
return nil, err
}
sess = session.NewImplicitClientSession(a.client.sessionPool, a.client.id)
}
if err = a.client.validSession(sess); err != nil {
return nil, err
@@ -950,10 +932,7 @@ func (coll *Collection) CountDocuments(ctx context.Context, filter interface{},

sess := sessionFromContext(ctx)
if sess == nil && coll.client.sessionPool != nil {
sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
if err != nil {
return 0, err
}
sess = session.NewImplicitClientSession(coll.client.sessionPool, coll.client.id)
defer sess.EndSession()
}
if err = coll.client.validSession(sess); err != nil {
@@ -1030,10 +1009,7 @@ func (coll *Collection) EstimatedDocumentCount(ctx context.Context,

var err error
if sess == nil && coll.client.sessionPool != nil {
sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
if err != nil {
return 0, err
}
sess = session.NewImplicitClientSession(coll.client.sessionPool, coll.client.id)
defer sess.EndSession()
}

@@ -1099,10 +1075,7 @@ func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter i
sess := sessionFromContext(ctx)

if sess == nil && coll.client.sessionPool != nil {
sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
if err != nil {
return nil, err
}
sess = session.NewImplicitClientSession(coll.client.sessionPool, coll.client.id)
defer sess.EndSession()
}

@@ -1198,11 +1171,7 @@ func (coll *Collection) Find(ctx context.Context, filter interface{},
}
}()
if sess == nil && coll.client.sessionPool != nil {
var err error
sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
if err != nil {
return nil, err
}
sess = session.NewImplicitClientSession(coll.client.sessionPool, coll.client.id)
}

err = coll.client.validSession(sess)
@@ -1404,10 +1373,7 @@ func (coll *Collection) findAndModify(ctx context.Context, op *operation.FindAnd
sess := sessionFromContext(ctx)
var err error
if sess == nil && coll.client.sessionPool != nil {
sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
if err != nil {
return &SingleResult{err: err}
}
sess = session.NewImplicitClientSession(coll.client.sessionPool, coll.client.id)
defer sess.EndSession()
}

@@ -1797,11 +1763,7 @@ func (coll *Collection) drop(ctx context.Context) error {

sess := sessionFromContext(ctx)
if sess == nil && coll.client.sessionPool != nil {
var err error
sess, err = session.NewClientSession(coll.client.sessionPool, coll.client.id, session.Implicit)
if err != nil {
return err
}
sess = session.NewImplicitClientSession(coll.client.sessionPool, coll.client.id)
defer sess.EndSession()
}

2 changes: 1 addition & 1 deletion mongo/cursor.go
Original file line number Diff line number Diff line change
@@ -309,7 +309,7 @@ func (c *Cursor) addFromBatch(sliceVal reflect.Value, elemType reflect.Type, bat
}

func (c *Cursor) closeImplicitSession() {
if c.clientSession != nil && c.clientSession.SessionType == session.Implicit {
if c.clientSession != nil && c.clientSession.IsImplicit {
c.clientSession.EndSession()
}
}
23 changes: 4 additions & 19 deletions mongo/database.go
Original file line number Diff line number Diff line change
@@ -136,11 +136,7 @@ func (db *Database) processRunCommand(ctx context.Context, cmd interface{},
cursorCommand bool, opts ...*options.RunCmdOptions) (*operation.Command, *session.Client, error) {
sess := sessionFromContext(ctx)
if sess == nil && db.client.sessionPool != nil {
var err error
sess, err = session.NewClientSession(db.client.sessionPool, db.client.id, session.Implicit)
if err != nil {
return nil, sess, err
}
sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
}

err := db.client.validSession(sess)
@@ -261,11 +257,7 @@ func (db *Database) Drop(ctx context.Context) error {

sess := sessionFromContext(ctx)
if sess == nil && db.client.sessionPool != nil {
var err error
sess, err = session.NewClientSession(db.client.sessionPool, db.client.id, session.Implicit)
if err != nil {
return err
}
sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
defer sess.EndSession()
}

@@ -362,10 +354,7 @@ func (db *Database) ListCollections(ctx context.Context, filter interface{}, opt

sess := sessionFromContext(ctx)
if sess == nil && db.client.sessionPool != nil {
sess, err = session.NewClientSession(db.client.sessionPool, db.client.id, session.Implicit)
if err != nil {
return nil, err
}
sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
}

err = db.client.validSession(sess)
@@ -775,11 +764,7 @@ func (db *Database) CreateView(ctx context.Context, viewName, viewOn string, pip
func (db *Database) executeCreateOperation(ctx context.Context, op *operation.Create) error {
sess := sessionFromContext(ctx)
if sess == nil && db.client.sessionPool != nil {
var err error
sess, err = session.NewClientSession(db.client.sessionPool, db.client.id, session.Implicit)
if err != nil {
return err
}
sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
defer sess.EndSession()
}

17 changes: 3 additions & 14 deletions mongo/index_view.go
Original file line number Diff line number Diff line change
@@ -72,11 +72,7 @@ func (iv IndexView) List(ctx context.Context, opts ...*options.ListIndexesOption

sess := sessionFromContext(ctx)
if sess == nil && iv.coll.client.sessionPool != nil {
var err error
sess, err = session.NewClientSession(iv.coll.client.sessionPool, iv.coll.client.id, session.Implicit)
if err != nil {
return nil, err
}
sess = session.NewImplicitClientSession(iv.coll.client.sessionPool, iv.coll.client.id)
}

err := iv.coll.client.validSession(sess)
@@ -227,10 +223,7 @@ func (iv IndexView) CreateMany(ctx context.Context, models []IndexModel, opts ..
sess := sessionFromContext(ctx)

if sess == nil && iv.coll.client.sessionPool != nil {
sess, err = session.NewClientSession(iv.coll.client.sessionPool, iv.coll.client.id, session.Implicit)
if err != nil {
return nil, err
}
sess = session.NewImplicitClientSession(iv.coll.client.sessionPool, iv.coll.client.id)
defer sess.EndSession()
}

@@ -367,11 +360,7 @@ func (iv IndexView) drop(ctx context.Context, name string, opts ...*options.Drop

sess := sessionFromContext(ctx)
if sess == nil && iv.coll.client.sessionPool != nil {
var err error
sess, err = session.NewClientSession(iv.coll.client.sessionPool, iv.coll.client.id, session.Implicit)
if err != nil {
return nil, err
}
sess = session.NewImplicitClientSession(iv.coll.client.sessionPool, iv.coll.client.id)
defer sess.EndSession()
}

Loading

0 comments on commit 9944eff

Please sign in to comment.