Skip to content

Commit

Permalink
Fix for #133
Browse files Browse the repository at this point in the history
- with ```options/drop_collection``` the river will also track
```dropDatabase``` operation
  • Loading branch information
richardwilly98 committed Nov 3, 2013
1 parent 80d1c6e commit 0da87d2
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 16 deletions.
6 changes: 4 additions & 2 deletions src/main/java/org/elasticsearch/river/mongodb/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,10 @@ private void updateBulkRequest(final BulkRequestBuilder bulk, DBObject data, Str
}
if (MongoDBRiver.OPLOG_COMMAND_OPERATION.equals(operation)) {
if (definition.isDropCollection()) {
if (data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION) != null
&& data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION).equals(definition.getMongoCollection())) {
if ((data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION) != null
&& data.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION).equals(definition.getMongoCollection()) || (data
.get(MongoDBRiver.OPLOG_DROP_DATABASE_COMMAND_OPERATION) != null && data.get(
MongoDBRiver.OPLOG_DROP_DATABASE_COMMAND_OPERATION).equals(1)))) {
logger.info("Drop collection request [{}], [{}]", index, type);
bulk.request().requests().clear();
client.admin().indices().prepareRefresh(index).execute().actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
public final static String OPLOG_DELETE_OPERATION = "d";
public final static String OPLOG_COMMAND_OPERATION = "c";
public final static String OPLOG_DROP_COMMAND_OPERATION = "drop";
public final static String OPLOG_DROP_DATABASE_COMMAND_OPERATION = "dropDatabase";
public final static String OPLOG_TIMESTAMP = "ts";
public final static String OPLOG_FROM_MIGRATE = "fromMigrate";
public final static String GRIDFS_FILES_SUFFIX = ".files";
Expand Down
14 changes: 5 additions & 9 deletions src/main/java/org/elasticsearch/river/mongodb/Slurper.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.elasticsearch.river.mongodb;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
Expand All @@ -10,6 +9,7 @@
import org.bson.types.BSONTimestamp;
import org.bson.types.ObjectId;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.river.mongodb.util.MongoDBHelper;
Expand Down Expand Up @@ -346,9 +346,8 @@ private DBObject getOplogFilter(final BSONTimestamp time) {
if (definition.isMongoGridFS()) {
filter.put(MongoDBRiver.OPLOG_NAMESPACE, definition.getMongoOplogNamespace() + MongoDBRiver.GRIDFS_FILES_SUFFIX);
} else {
List<String> namespaceFilter = new ArrayList<String>();
namespaceFilter.add(definition.getMongoOplogNamespace());
namespaceFilter.add(definition.getMongoDb() + "." + MongoDBRiver.OPLOG_NAMESPACE_COMMAND);
List<String> namespaceFilter = ImmutableList.of(definition.getMongoOplogNamespace(), definition.getMongoDb() + "."
+ MongoDBRiver.OPLOG_NAMESPACE_COMMAND);
filter.put(MongoDBRiver.OPLOG_NAMESPACE, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, namespaceFilter));
}
if (definition.getMongoOplogFilter().size() > 0) {
Expand All @@ -364,11 +363,8 @@ private DBObject getMongoFilter() {
List<DBObject> filters = new ArrayList<DBObject>();
List<DBObject> filters2 = new ArrayList<DBObject>();

List<String> operationFilter = new ArrayList<String>();
operationFilter.add(MongoDBRiver.OPLOG_DELETE_OPERATION);
operationFilter.add(MongoDBRiver.OPLOG_UPDATE_OPERATION);
operationFilter.add(MongoDBRiver.OPLOG_INSERT_OPERATION);
filters.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, operationFilter)));
filters.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, ImmutableList.of(
MongoDBRiver.OPLOG_DELETE_OPERATION, MongoDBRiver.OPLOG_UPDATE_OPERATION, MongoDBRiver.OPLOG_INSERT_OPERATION))));

// include custom filter in filters2
filters2.add(definition.getMongoOplogFilter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.mongodb.DB;
Expand All @@ -56,7 +56,8 @@ protected RiverMongoDropCollectionTest(String river, String database, String col
super(river, database, collection, index);
}

@BeforeClass
// @BeforeClass
@BeforeMethod
public void createDatabase() {
logger.debug("createDatabase {}", getDatabase());
try {
Expand All @@ -73,11 +74,12 @@ public void createDatabase() {
}
}

@AfterClass
// @AfterClass
@AfterMethod
public void cleanUp() {
super.deleteRiver();
logger.info("Drop database " + mongoDB.getName());
mongoDB.dropDatabase();
// mongoDB.dropDatabase();
}

@Test
Expand Down Expand Up @@ -112,6 +114,8 @@ public void testDropCollection() throws Throwable {
logger.error("testDropCollection failed.", t);
t.printStackTrace();
throw t;
} finally {
mongoDB.dropDatabase();
}
}

Expand Down Expand Up @@ -158,6 +162,52 @@ public void testDropCollectionIssue79() throws Throwable {
logger.error("testDropCollectionIssue79 failed.", t);
t.printStackTrace();
throw t;
} finally {
mongoDB.dropDatabase();
}
}

@Test
public void testDropDatabaseIssue133() throws Throwable {
logger.debug("Start testDropDatabaseIssue133");
try {
String mongoDocument = copyToStringFromClasspath(TEST_SIMPLE_MONGODB_DOCUMENT_JSON);
DBObject dbObject = (DBObject) JSON.parse(mongoDocument);
mongoCollection.insert(dbObject);
Thread.sleep(wait);

assertThat(getNode().client().admin().indices().exists(new IndicesExistsRequest(getIndex())).actionGet().isExists(),
equalTo(true));
assertThat(getNode().client().admin().indices().prepareTypesExists(getIndex()).setTypes(getDatabase()).execute().actionGet()
.isExists(), equalTo(true));
long countRequest = getNode().client().count(countRequest(getIndex())).actionGet().getCount();
mongoDB.dropDatabase();
Thread.sleep(wait);
assertThat(databaseExists(database), equalTo(false));
Thread.sleep(wait);
refreshIndex();

if (!dropCollectionOption) {
countRequest = getNode().client().count(countRequest(getIndex())).actionGet().getCount();
assertThat(countRequest, greaterThan(0L));
} else {
countRequest = getNode().client().count(countRequest(getIndex())).actionGet().getCount();
assertThat(countRequest, equalTo(0L));
}
} catch (Throwable t) {
logger.error("testDropDatabaseIssue133 failed.", t);
t.printStackTrace();
throw t;
} finally {
}
}

private boolean databaseExists(String name) {
for(String databaseName :mongo.getDatabaseNames()) {
if (databaseName.equals(name)) {
return true;
}
}
return false;
}
}

0 comments on commit 0da87d2

Please sign in to comment.