From 0da87d2b5cc4b743557e340f5cfb366d934f274a Mon Sep 17 00:00:00 2001
From: Richard Louapre <richard.louapre@gmail.com>
Date: Sun, 3 Nov 2013 05:31:02 -0500
Subject: [PATCH] Fix for #133

- with ```options/drop_collection``` the river will also track
```dropDatabase``` operation
---
 .../elasticsearch/river/mongodb/Indexer.java  |  6 +-
 .../river/mongodb/MongoDBRiver.java           |  1 +
 .../elasticsearch/river/mongodb/Slurper.java  | 14 ++---
 .../simple/RiverMongoDropCollectionTest.java  | 60 +++++++++++++++++--
 4 files changed, 65 insertions(+), 16 deletions(-)

diff --git a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java
index e8c57ed1..15551a7a 100644
--- a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java
+++ b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java
@@ -258,8 +258,10 @@ private void updateBulkRequest(final BulkRequestBuilder bulk, DBObject data, Str
         }
         if (MongoDBRiver.OPLOG_COMMAND_OPERATION.equals(operation)) {
             if (definition.isDropCollection()) {
-                if (data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION) != null
-                        && data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION).equals(definition.getMongoCollection())) {
+                if ((data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION) != null
+                        && data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION).equals(definition.getMongoCollection()) || (data
+                        .get(MongoDBRiver.OPLOG_DROP_DATABASE_COMMAND_OPERATION) != null && data.get(
+                        MongoDBRiver.OPLOG_DROP_DATABASE_COMMAND_OPERATION).equals(1)))) {
                     logger.info("Drop collection request [{}], [{}]", index, type);
                     bulk.request().requests().clear();
                     client.admin().indices().prepareRefresh(index).execute().actionGet();
diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
index 5262c722..b3c7184f 100644
--- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
+++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
@@ -98,6 +98,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
     public final static String OPLOG_DELETE_OPERATION = "d";
     public final static String OPLOG_COMMAND_OPERATION = "c";
     public final static String OPLOG_DROP_COMMAND_OPERATION = "drop";
+    public final static String OPLOG_DROP_DATABASE_COMMAND_OPERATION = "dropDatabase";
     public final static String OPLOG_TIMESTAMP = "ts";
     public final static String OPLOG_FROM_MIGRATE = "fromMigrate";
     public final static String GRIDFS_FILES_SUFFIX = ".files";
diff --git a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java
index 995765c7..2c255648 100644
--- a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java
+++ b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java
@@ -1,7 +1,6 @@
 package org.elasticsearch.river.mongodb;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -10,6 +9,7 @@
 import org.bson.types.BSONTimestamp;
 import org.bson.types.ObjectId;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.collect.ImmutableList;
 import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.logging.ESLoggerFactory;
 import org.elasticsearch.river.mongodb.util.MongoDBHelper;
@@ -346,9 +346,8 @@ private DBObject getOplogFilter(final BSONTimestamp time) {
         if (definition.isMongoGridFS()) {
             filter.put(MongoDBRiver.OPLOG_NAMESPACE, definition.getMongoOplogNamespace() + MongoDBRiver.GRIDFS_FILES_SUFFIX);
         } else {
-            List<String> namespaceFilter = new ArrayList<String>();
-            namespaceFilter.add(definition.getMongoOplogNamespace());
-            namespaceFilter.add(definition.getMongoDb() + "." + MongoDBRiver.OPLOG_NAMESPACE_COMMAND);
+            List<String> namespaceFilter = ImmutableList.of(definition.getMongoOplogNamespace(), definition.getMongoDb() + "."
+                    + MongoDBRiver.OPLOG_NAMESPACE_COMMAND);
             filter.put(MongoDBRiver.OPLOG_NAMESPACE, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, namespaceFilter));
         }
         if (definition.getMongoOplogFilter().size() > 0) {
@@ -364,11 +363,8 @@ private DBObject getMongoFilter() {
         List<DBObject> filters = new ArrayList<DBObject>();
         List<DBObject> filters2 = new ArrayList<DBObject>();
 
-        List<String> operationFilter = new ArrayList<String>();
-        operationFilter.add(MongoDBRiver.OPLOG_DELETE_OPERATION);
-        operationFilter.add(MongoDBRiver.OPLOG_UPDATE_OPERATION);
-        operationFilter.add(MongoDBRiver.OPLOG_INSERT_OPERATION);
-        filters.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, operationFilter)));
+        filters.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, ImmutableList.of(
+                MongoDBRiver.OPLOG_DELETE_OPERATION, MongoDBRiver.OPLOG_UPDATE_OPERATION, MongoDBRiver.OPLOG_INSERT_OPERATION))));
 
         // include custom filter in filters2
         filters2.add(definition.getMongoOplogFilter());
diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDropCollectionTest.java b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDropCollectionTest.java
index f5f1be32..a4616c2f 100644
--- a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDropCollectionTest.java
+++ b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoDropCollectionTest.java
@@ -29,8 +29,8 @@
 import org.elasticsearch.action.count.CountResponse;
 import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.mongodb.DB;
@@ -56,7 +56,8 @@ protected RiverMongoDropCollectionTest(String river, String database, String col
         super(river, database, collection, index);
     }
 
-    @BeforeClass
+//    @BeforeClass
+    @BeforeMethod
     public void createDatabase() {
         logger.debug("createDatabase {}", getDatabase());
         try {
@@ -73,11 +74,12 @@ public void createDatabase() {
         }
     }
 
-    @AfterClass
+//    @AfterClass
+    @AfterMethod
     public void cleanUp() {
         super.deleteRiver();
         logger.info("Drop database " + mongoDB.getName());
-        mongoDB.dropDatabase();
+//        mongoDB.dropDatabase();
     }
 
     @Test
@@ -112,6 +114,8 @@ public void testDropCollection() throws Throwable {
             logger.error("testDropCollection failed.", t);
             t.printStackTrace();
             throw t;
+        } finally {
+            mongoDB.dropDatabase();
         }
     }
 
@@ -158,6 +162,52 @@ public void testDropCollectionIssue79() throws Throwable {
             logger.error("testDropCollectionIssue79 failed.", t);
             t.printStackTrace();
             throw t;
+        } finally {
+            mongoDB.dropDatabase();
         }
     }
+
+    @Test
+    public void testDropDatabaseIssue133() throws Throwable {
+        logger.debug("Start testDropDatabaseIssue133");
+        try {
+            String mongoDocument = copyToStringFromClasspath(TEST_SIMPLE_MONGODB_DOCUMENT_JSON);
+            DBObject dbObject = (DBObject) JSON.parse(mongoDocument);
+            mongoCollection.insert(dbObject);
+            Thread.sleep(wait);
+
+            assertThat(getNode().client().admin().indices().exists(new IndicesExistsRequest(getIndex())).actionGet().isExists(),
+                    equalTo(true));
+            assertThat(getNode().client().admin().indices().prepareTypesExists(getIndex()).setTypes(getDatabase()).execute().actionGet()
+                    .isExists(), equalTo(true));
+            long countRequest = getNode().client().count(countRequest(getIndex())).actionGet().getCount();
+            mongoDB.dropDatabase();
+            Thread.sleep(wait);
+            assertThat(databaseExists(database), equalTo(false));
+            Thread.sleep(wait);
+            refreshIndex();
+
+            if (!dropCollectionOption) {
+                countRequest = getNode().client().count(countRequest(getIndex())).actionGet().getCount();
+                assertThat(countRequest, greaterThan(0L));
+            } else {
+                countRequest = getNode().client().count(countRequest(getIndex())).actionGet().getCount();
+                assertThat(countRequest, equalTo(0L));
+            }
+        } catch (Throwable t) {
+            logger.error("testDropDatabaseIssue133 failed.", t);
+            t.printStackTrace();
+            throw t;
+        } finally {
+        }
+    }
+    
+    private boolean databaseExists(String name) {
+        for(String databaseName :mongo.getDatabaseNames()) {
+            if (databaseName.equals(name)) {
+                return true;
+            }
+        }
+        return false;
+    }
 }