diff --git a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java index e8c57ed1..15551a7a 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java @@ -258,8 +258,10 @@ private void updateBulkRequest(final BulkRequestBuilder bulk, DBObject data, Str } if (MongoDBRiver.OPLOG_COMMAND_OPERATION.equals(operation)) { if (definition.isDropCollection()) { - if (data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION) != null - && data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION).equals(definition.getMongoCollection())) { + if ((data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION) != null + && data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION).equals(definition.getMongoCollection()) || (data + .get(MongoDBRiver.OPLOG_DROP_DATABASE_COMMAND_OPERATION) != null && data.get( + MongoDBRiver.OPLOG_DROP_DATABASE_COMMAND_OPERATION).equals(1)))) { logger.info("Drop collection request [{}], [{}]", index, type); bulk.request().requests().clear(); client.admin().indices().prepareRefresh(index).execute().actionGet(); diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java index 5262c722..b3c7184f 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java @@ -98,6 +98,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River { public final static String OPLOG_DELETE_OPERATION = "d"; public final static String OPLOG_COMMAND_OPERATION = "c"; public final static String OPLOG_DROP_COMMAND_OPERATION = "drop"; + public final static String OPLOG_DROP_DATABASE_COMMAND_OPERATION = "dropDatabase"; public final static String OPLOG_TIMESTAMP = "ts"; public final static String OPLOG_FROM_MIGRATE = "fromMigrate"; public final static String GRIDFS_FILES_SUFFIX = ".files"; diff --git a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java index 995765c7..2c255648 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java @@ -1,7 +1,6 @@ package org.elasticsearch.river.mongodb; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Set; @@ -10,6 +9,7 @@ import org.bson.types.BSONTimestamp; import org.bson.types.ObjectId; import org.elasticsearch.client.Client; +import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.river.mongodb.util.MongoDBHelper; @@ -346,9 +346,8 @@ private DBObject getOplogFilter(final BSONTimestamp time) { if (definition.isMongoGridFS()) { filter.put(MongoDBRiver.OPLOG_NAMESPACE, definition.getMongoOplogNamespace() + MongoDBRiver.GRIDFS_FILES_SUFFIX); } else { - List namespaceFilter = new ArrayList(); - namespaceFilter.add(definition.getMongoOplogNamespace()); - namespaceFilter.add(definition.getMongoDb() + "." + MongoDBRiver.OPLOG_NAMESPACE_COMMAND); + List namespaceFilter = ImmutableList.of(definition.getMongoOplogNamespace(), definition.getMongoDb() + "." + + MongoDBRiver.OPLOG_NAMESPACE_COMMAND); filter.put(MongoDBRiver.OPLOG_NAMESPACE, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, namespaceFilter)); } if (definition.getMongoOplogFilter().size() > 0) { @@ -364,11 +363,8 @@ private DBObject getMongoFilter() { List filters = new ArrayList(); List filters2 = new ArrayList(); - List operationFilter = new ArrayList(); - operationFilter.add(MongoDBRiver.OPLOG_DELETE_OPERATION); - operationFilter.add(MongoDBRiver.OPLOG_UPDATE_OPERATION); - operationFilter.add(MongoDBRiver.OPLOG_INSERT_OPERATION); - filters.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, operationFilter))); + filters.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, ImmutableList.of( + MongoDBRiver.OPLOG_DELETE_OPERATION, MongoDBRiver.OPLOG_UPDATE_OPERATION, MongoDBRiver.OPLOG_INSERT_OPERATION)))); // include custom filter in filters2 filters2.add(definition.getMongoOplogFilter()); diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDropCollectionTest.java b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDropCollectionTest.java index f5f1be32..a4616c2f 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDropCollectionTest.java +++ b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDropCollectionTest.java @@ -29,8 +29,8 @@ import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import com.mongodb.DB; @@ -56,7 +56,8 @@ protected RiverMongoDropCollectionTest(String river, String database, String col super(river, database, collection, index); } - @BeforeClass +// @BeforeClass + @BeforeMethod public void createDatabase() { logger.debug("createDatabase {}", getDatabase()); try { @@ -73,11 +74,12 @@ public void createDatabase() { } } - @AfterClass +// @AfterClass + @AfterMethod public void cleanUp() { super.deleteRiver(); logger.info("Drop database " + mongoDB.getName()); - mongoDB.dropDatabase(); +// mongoDB.dropDatabase(); } @Test @@ -112,6 +114,8 @@ public void testDropCollection() throws Throwable { logger.error("testDropCollection failed.", t); t.printStackTrace(); throw t; + } finally { + mongoDB.dropDatabase(); } } @@ -158,6 +162,52 @@ public void testDropCollectionIssue79() throws Throwable { logger.error("testDropCollectionIssue79 failed.", t); t.printStackTrace(); throw t; + } finally { + mongoDB.dropDatabase(); } } + + @Test + public void testDropDatabaseIssue133() throws Throwable { + logger.debug("Start testDropDatabaseIssue133"); + try { + String mongoDocument = copyToStringFromClasspath(TEST_SIMPLE_MONGODB_DOCUMENT_JSON); + DBObject dbObject = (DBObject) JSON.parse(mongoDocument); + mongoCollection.insert(dbObject); + Thread.sleep(wait); + + assertThat(getNode().client().admin().indices().exists(new IndicesExistsRequest(getIndex())).actionGet().isExists(), + equalTo(true)); + assertThat(getNode().client().admin().indices().prepareTypesExists(getIndex()).setTypes(getDatabase()).execute().actionGet() + .isExists(), equalTo(true)); + long countRequest = getNode().client().count(countRequest(getIndex())).actionGet().getCount(); + mongoDB.dropDatabase(); + Thread.sleep(wait); + assertThat(databaseExists(database), equalTo(false)); + Thread.sleep(wait); + refreshIndex(); + + if (!dropCollectionOption) { + countRequest = getNode().client().count(countRequest(getIndex())).actionGet().getCount(); + assertThat(countRequest, greaterThan(0L)); + } else { + countRequest = getNode().client().count(countRequest(getIndex())).actionGet().getCount(); + assertThat(countRequest, equalTo(0L)); + } + } catch (Throwable t) { + logger.error("testDropDatabaseIssue133 failed.", t); + t.printStackTrace(); + throw t; + } finally { + } + } + + private boolean databaseExists(String name) { + for(String databaseName :mongo.getDatabaseNames()) { + if (databaseName.equals(name)) { + return true; + } + } + return false; + } }