Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
also verify oplog doc is for the tailing collection (#386)
Browse files Browse the repository at this point in the history
  • Loading branch information
jipperinbham authored Jun 27, 2017
1 parent 41ffa26 commit 8a60a89
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions adaptor/mongodb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ func (r *Reader) tailCollection(c string, mgoSession *mgo.Session, oplogTime bso
collection = mgoSession.DB("local").C("oplog.rs")
result oplogDoc // hold the document
db = mgoSession.DB("").Name
query = bson.M{"ns": fmt.Sprintf("%s.%s", db, c), "ts": bson.M{"$gte": oplogTime}}
ns = fmt.Sprintf("%s.%s", db, c)
query = bson.M{"ns": ns, "ts": bson.M{"$gte": oplogTime}}
iter = collection.Find(query).LogReplay().Sort("$natural").Tail(r.oplogTimeout)
)
defer iter.Close()
Expand All @@ -249,7 +250,7 @@ func (r *Reader) tailCollection(c string, mgoSession *mgo.Session, oplogTime bso
return
default:
for iter.Next(&result) {
if result.validOp() {
if result.validOp(ns) {
var (
doc bson.M
err error
Expand Down Expand Up @@ -338,9 +339,8 @@ type oplogDoc struct {

// validOp checks to see if we're an insert, delete, or update, otherwise the
// document is skilled.
// TODO: skip system collections
func (o *oplogDoc) validOp() bool {
return o.Op == "i" || o.Op == "d" || o.Op == "u"
func (o *oplogDoc) validOp(ns string) bool {
return ns == o.Ns && (o.Op == "i" || o.Op == "d" || o.Op == "u")
}

func timeAsMongoTimestamp(t time.Time) bson.MongoTimestamp {
Expand Down

0 comments on commit 8a60a89

Please sign in to comment.