From 8a60a89125f6d0a9f5843824a0be040a8b8e9034 Mon Sep 17 00:00:00 2001 From: JP Phillips Date: Tue, 27 Jun 2017 14:28:45 -0500 Subject: [PATCH] also verify oplog doc is for the tailing collection (#386) --- adaptor/mongodb/reader.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/adaptor/mongodb/reader.go b/adaptor/mongodb/reader.go index f9756e575..5db27c887 100644 --- a/adaptor/mongodb/reader.go +++ b/adaptor/mongodb/reader.go @@ -236,7 +236,8 @@ func (r *Reader) tailCollection(c string, mgoSession *mgo.Session, oplogTime bso collection = mgoSession.DB("local").C("oplog.rs") result oplogDoc // hold the document db = mgoSession.DB("").Name - query = bson.M{"ns": fmt.Sprintf("%s.%s", db, c), "ts": bson.M{"$gte": oplogTime}} + ns = fmt.Sprintf("%s.%s", db, c) + query = bson.M{"ns": ns, "ts": bson.M{"$gte": oplogTime}} iter = collection.Find(query).LogReplay().Sort("$natural").Tail(r.oplogTimeout) ) defer iter.Close() @@ -249,7 +250,7 @@ func (r *Reader) tailCollection(c string, mgoSession *mgo.Session, oplogTime bso return default: for iter.Next(&result) { - if result.validOp() { + if result.validOp(ns) { var ( doc bson.M err error @@ -338,9 +339,8 @@ type oplogDoc struct { // validOp checks to see if we're an insert, delete, or update, otherwise the // document is skilled. -// TODO: skip system collections -func (o *oplogDoc) validOp() bool { - return o.Op == "i" || o.Op == "d" || o.Op == "u" +func (o *oplogDoc) validOp(ns string) bool { + return ns == o.Ns && (o.Op == "i" || o.Op == "d" || o.Op == "u") } func timeAsMongoTimestamp(t time.Time) bson.MongoTimestamp {