diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java index 7abc4a40..5262c722 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java @@ -83,6 +83,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River { public final static String MONGODB_ADMIN_DATABASE = "admin"; public final static String MONGODB_CONFIG_DATABASE = "config"; public final static String MONGODB_ID_FIELD = "_id"; + public final static String MONGODB_IN_OPERATOR = "$in"; public final static String MONGODB_OR_OPERATOR = "$or"; public final static String MONGODB_AND_OPERATOR = "$and"; public final static String MONGODB_NATURAL_OPERATOR = "$natural"; diff --git a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java index 024bdd79..995765c7 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java @@ -1,6 +1,7 @@ package org.elasticsearch.river.mongodb; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Set; @@ -335,7 +336,6 @@ private String getObjectIdFromOplogEntry(DBObject entry) { private DBObject getOplogFilter(final BSONTimestamp time) { BasicDBObject filter = new BasicDBObject(); - List values2 = new ArrayList(); if (time == null) { logger.info("No known previous slurping time for this collection"); @@ -346,10 +346,10 @@ private DBObject getOplogFilter(final BSONTimestamp time) { if (definition.isMongoGridFS()) { filter.put(MongoDBRiver.OPLOG_NAMESPACE, definition.getMongoOplogNamespace() + MongoDBRiver.GRIDFS_FILES_SUFFIX); } else { - values2.add(new BasicDBObject(MongoDBRiver.OPLOG_NAMESPACE, definition.getMongoOplogNamespace())); - values2.add(new BasicDBObject(MongoDBRiver.OPLOG_NAMESPACE, definition.getMongoDb() + "." - + MongoDBRiver.OPLOG_NAMESPACE_COMMAND)); - filter.put(MongoDBRiver.MONGODB_OR_OPERATOR, values2); + List namespaceFilter = new ArrayList(); + namespaceFilter.add(definition.getMongoOplogNamespace()); + namespaceFilter.add(definition.getMongoDb() + "." + MongoDBRiver.OPLOG_NAMESPACE_COMMAND); + filter.put(MongoDBRiver.OPLOG_NAMESPACE, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, namespaceFilter)); } if (definition.getMongoOplogFilter().size() > 0) { filter.putAll(getMongoFilter()); @@ -363,20 +363,15 @@ private DBObject getOplogFilter(final BSONTimestamp time) { private DBObject getMongoFilter() { List filters = new ArrayList(); List filters2 = new ArrayList(); - List filters3 = new ArrayList(); - // include delete operation - filters.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, MongoDBRiver.OPLOG_DELETE_OPERATION)); - // include update, insert in filters3 - filters3.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, MongoDBRiver.OPLOG_UPDATE_OPERATION)); - filters3.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, MongoDBRiver.OPLOG_INSERT_OPERATION)); - - // include or operation statement in filter2 - filters2.add(new BasicDBObject(MongoDBRiver.MONGODB_OR_OPERATOR, filters3)); + List operationFilter = new ArrayList(); + operationFilter.add(MongoDBRiver.OPLOG_DELETE_OPERATION); + operationFilter.add(MongoDBRiver.OPLOG_UPDATE_OPERATION); + operationFilter.add(MongoDBRiver.OPLOG_INSERT_OPERATION); + filters.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, operationFilter))); // include custom filter in filters2 filters2.add(definition.getMongoOplogFilter()); - filters.add(new BasicDBObject(MongoDBRiver.MONGODB_AND_OPERATOR, filters2)); return new BasicDBObject(MongoDBRiver.MONGODB_OR_OPERATOR, filters); @@ -396,7 +391,7 @@ private DBCursor oplogCursor(final BSONTimestamp timestampOverride) { if (indexFilter.containsField(MongoDBRiver.OPLOG_TIMESTAMP)) { options = options | Bytes.QUERYOPTION_OPLOGREPLAY; } - return oplogCollection.find(indexFilter).sort(new BasicDBObject(MongoDBRiver.MONGODB_NATURAL_OPERATOR, 1)).setOptions(options); + return oplogCollection.find(indexFilter).setOptions(options); } private void addQueryToStream(final String operation, final BSONTimestamp currentTimestamp, final DBObject update)