Skip to content

Commit

Permalink
Initial implementation for issue #79
Browse files Browse the repository at this point in the history
- Enable drop collection feature using drop_collection: true in options
attribute (see test-simple-mongodb-river-drop-collection.json)
  • Loading branch information
richardwilly98 committed May 17, 2013
1 parent 33c8e8f commit a4e0a07
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 29 deletions.
72 changes: 58 additions & 14 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
public final static String PORT_FIELD = "port";
public final static String OPTIONS_FIELD = "options";
public final static String SECONDARY_READ_PREFERENCE_FIELD = "secondary_read_preference";
public final static String DROP_COLLECTION_FIELD = "drop_collection";
public final static String FILTER_FIELD = "filter";
public final static String CREDENTIALS_FIELD = "credentials";
public final static String USER_FIELD = "user";
Expand All @@ -127,14 +128,19 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
public final static String MONGODB_LOCAL = "local";
public final static String MONGODB_ADMIN = "admin";
public final static String MONGODB_ID_FIELD = "_id";
public final static String MONGODB_OR_OPERATOR = "$or";
public final static String MONGODB_AND_OPERATOR = "$and";
public final static String MONGODB_NATURAL_OPERATOR = "$natural";
public final static String OPLOG_COLLECTION = "oplog.rs";
public final static String OPLOG_NAMESPACE = "ns";
public final static String OPLOG_NAMESPACE_COMMAND = "$cmd";
public final static String OPLOG_OBJECT = "o";
public final static String OPLOG_UPDATE = "o2";
public final static String OPLOG_OPERATION = "op";
public final static String OPLOG_UPDATE_OPERATION = "u";
public final static String OPLOG_INSERT_OPERATION = "i";
public final static String OPLOG_DELETE_OPERATION = "d";
public final static String OPLOG_COLLECTION_OPERATION = "c";
public final static String OPLOG_TIMESTAMP = "ts";
public final static String GRIDFS_FILES_SUFFIX = ".files";
public final static String GRIDFS_CHUNKS_SUFFIX = ".chunks";
Expand All @@ -152,8 +158,6 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
protected final String mongoAdminPassword;
protected final String mongoLocalUser;
protected final String mongoLocalPassword;
// protected final String mongoDbUser;
// protected final String mongoDbPassword;
protected final String mongoOplogNamespace;
protected final boolean mongoSecondaryReadPreference;

Expand All @@ -162,6 +166,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
protected final int bulkSize;
protected final TimeValue bulkTimeout;
protected final int throttleSize;
protected final boolean dropCollection;

private final ExecutableScript script;

Expand Down Expand Up @@ -238,8 +243,12 @@ public MongoDBRiver(final RiverName riverName,
mongoSecondaryReadPreference = XContentMapValues
.nodeBooleanValue(mongoOptionsSettings
.get(SECONDARY_READ_PREFERENCE_FIELD), false);
dropCollection = XContentMapValues
.nodeBooleanValue(mongoOptionsSettings
.get(DROP_COLLECTION_FIELD), false);
} else {
mongoSecondaryReadPreference = false;
dropCollection = false;
}

// Credentials
Expand Down Expand Up @@ -340,6 +349,7 @@ public MongoDBRiver(final RiverName riverName,
// mongoDbUser = "";
// mongoDbPassword = "";
script = null;
dropCollection = false;
}
mongoOplogNamespace = mongoDb + "." + mongoCollection;

Expand Down Expand Up @@ -470,7 +480,7 @@ private boolean isMongos() {
logger.trace("serverStatus: {}", cr);
logger.trace("process: {}", process);
}
// return (cr.get("process").equals("mongos"));
// return (cr.get("process").equals("mongos"));
// Fix for https://jira.mongodb.org/browse/SERVER-9160
return (process.contains("mongos"));
}
Expand Down Expand Up @@ -623,7 +633,9 @@ public void run() {
@SuppressWarnings({ "unchecked" })
private BSONTimestamp updateBulkRequest(final BulkRequestBuilder bulk,
Map<String, Object> data) {
if (data.get(MONGODB_ID_FIELD) == null) {
if (data.get(MONGODB_ID_FIELD) == null
&& !data.get(OPLOG_OPERATION).equals(
OPLOG_COLLECTION_OPERATION)) {
logger.warn(
"Cannot get object id. Skip the current item: [{}]",
data);
Expand All @@ -632,7 +644,11 @@ private BSONTimestamp updateBulkRequest(final BulkRequestBuilder bulk,
BSONTimestamp lastTimestamp = (BSONTimestamp) data
.get(OPLOG_TIMESTAMP);
String operation = data.get(OPLOG_OPERATION).toString();
String objectId = data.get(MONGODB_ID_FIELD).toString();
// String objectId = data.get(MONGODB_ID_FIELD).toString();
String objectId = "";
if (data.get(MONGODB_ID_FIELD) != null) {
objectId = data.get(MONGODB_ID_FIELD).toString();
}
data.remove(OPLOG_TIMESTAMP);
data.remove(OPLOG_OPERATION);
if (logger.isDebugEnabled()) {
Expand All @@ -651,7 +667,9 @@ private BSONTimestamp updateBulkRequest(final BulkRequestBuilder bulk,
if (ctx != null) {
ctx.put("document", data);
ctx.put("operation", operation);
ctx.put("id", objectId);
if (!objectId.isEmpty()) {
ctx.put("id", objectId);
}
if (logger.isDebugEnabled()) {
logger.debug("Context before script executed: {}", ctx);
}
Expand Down Expand Up @@ -733,6 +751,24 @@ private BSONTimestamp updateBulkRequest(final BulkRequestBuilder bulk,
routing).parent(parent));
deletedDocuments++;
}
if (OPLOG_COLLECTION_OPERATION.equals(operation)) {
if (dropCollection) {
logger.info("Drop collection request [{}], [{}]", index,
type);
bulk.request().requests().clear();
client.admin().indices().prepareDeleteMapping(index)
.setType(type).execute().actionGet();
deletedDocuments = 0;
updatedDocuments = 0;
insertedDocuments = 0;
logger.info(
"Delete request for index / type [{}] [{}] successfully executed.",
index, type);
} else {
logger.info("Ignore drop collection request [{}], [{}]. The option has been disabled.", index,
type);
}
}
} catch (IOException e) {
logger.warn("failed to parse {}", e, data);
}
Expand Down Expand Up @@ -998,13 +1034,19 @@ private DBObject getIndexFilter(final BSONTimestamp timestampOverride) {
: timestampOverride;
BasicDBObject filter = new BasicDBObject();
List<DBObject> values = new ArrayList<DBObject>();
List<DBObject> values2 = new ArrayList<DBObject>();

if (mongoGridFS) {
values.add(new BasicDBObject(OPLOG_NAMESPACE,
mongoOplogNamespace + GRIDFS_FILES_SUFFIX));
} else {
values.add(new BasicDBObject(OPLOG_NAMESPACE,
// values.add(new BasicDBObject(OPLOG_NAMESPACE,
// mongoOplogNamespace));
values2.add(new BasicDBObject(OPLOG_NAMESPACE,
mongoOplogNamespace));
values2.add(new BasicDBObject(OPLOG_NAMESPACE, mongoDb + "."
+ OPLOG_NAMESPACE_COMMAND));
values.add(new BasicDBObject(MONGODB_OR_OPERATOR, values2));
}
if (!mongoFilter.isEmpty()) {
values.add(getMongoFilter());
Expand All @@ -1015,7 +1057,7 @@ private DBObject getIndexFilter(final BSONTimestamp timestampOverride) {
values.add(new BasicDBObject(OPLOG_TIMESTAMP,
new BasicDBObject(QueryOperators.GT, time)));
}
filter = new BasicDBObject("$and", values);
filter = new BasicDBObject(MONGODB_AND_OPERATOR, values);
if (logger.isDebugEnabled()) {
logger.debug("Using filter: {}", filter);
}
Expand All @@ -1037,14 +1079,14 @@ private DBObject getMongoFilter() {
OPLOG_INSERT_OPERATION));

// include or operation statement in filter2
filters2.add(new BasicDBObject("$or", filters3));
filters2.add(new BasicDBObject(MONGODB_OR_OPERATOR, filters3));

// include custom filter in filters2
filters2.add((DBObject) JSON.parse(mongoFilter));

filters.add(new BasicDBObject("$and", filters2));
filters.add(new BasicDBObject(MONGODB_AND_OPERATOR, filters2));

return new BasicDBObject("$or", filters);
return new BasicDBObject(MONGODB_OR_OPERATOR, filters);
}

private DBCursor oplogCursor(final BSONTimestamp timestampOverride) {
Expand All @@ -1053,7 +1095,7 @@ private DBCursor oplogCursor(final BSONTimestamp timestampOverride) {
return null;
}
return oplogCollection.find(indexFilter)
.sort(new BasicDBObject("$natural", 1))
.sort(new BasicDBObject(MONGODB_NATURAL_OPERATOR, 1))
.addOption(Bytes.QUERYOPTION_TAILABLE)
.addOption(Bytes.QUERYOPTION_AWAITDATA);
}
Expand Down Expand Up @@ -1116,9 +1158,11 @@ private BSONTimestamp getLastTimestamp(final String namespace) {
GetResponse lastTimestampResponse = client
.prepareGet(riverIndexName, riverName.getName(), namespace)
.execute().actionGet();
// API changes since 0.90.0 lastTimestampResponse.exists() replaced by lastTimestampResponse.isExists()
// API changes since 0.90.0 lastTimestampResponse.exists() replaced by
// lastTimestampResponse.isExists()
if (lastTimestampResponse.isExists()) {
// API changes since 0.90.0 lastTimestampResponse.sourceAsMap() replaced by lastTimestampResponse.getSourceAsMap()
// API changes since 0.90.0 lastTimestampResponse.sourceAsMap()
// replaced by lastTimestampResponse.getSourceAsMap()
Map<String, Object> mongodbState = (Map<String, Object>) lastTimestampResponse
.getSourceAsMap().get(ROOT_NAME);
if (mongodbState != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public abstract class RiverMongoDBTestAsbtract {

private boolean useDynamicPorts;
private String mongoVersion;

private final String river;
private final String database;
private final String collection;
Expand Down Expand Up @@ -410,4 +411,20 @@ protected static int getMongoPort2() {
protected static int getMongoPort3() {
return mongoPort3;
}

protected String getRiver() {
return river;
}

protected String getDatabase() {
return database;
}

protected String getCollection() {
return collection;
}

protected String getIndex() {
return index;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,22 @@ public class RiverMongoDBTest extends RiverMongoDBTestAsbtract {

private final ESLogger logger = Loggers.getLogger(getClass());

private static final String DATABASE_NAME = "testriver";
private static final String COLLECTION_NAME = "person";
private static final String RIVER_NAME = "testmongodb";
private static final String INDEX_NAME = "personindex";

private DB mongoDB;
private DBCollection mongoCollection;

protected RiverMongoDBTest() {
super(RIVER_NAME, DATABASE_NAME, COLLECTION_NAME, INDEX_NAME);
super("testmongodb-"+ System.currentTimeMillis(), "testriver-"+ System.currentTimeMillis(), "person-"+ System.currentTimeMillis(), "personindex-" + System.currentTimeMillis());
}

@BeforeClass
public void createDatabase() {
logger.debug("createDatabase {}", DATABASE_NAME);
logger.debug("createDatabase {}", getDatabase());
try {
mongoDB = getMongo().getDB(DATABASE_NAME);
mongoDB = getMongo().getDB(getDatabase());
mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE);
super.createRiver("/test/elasticsearch/plugin/river/mongodb/simple/test-simple-mongodb-river.json");
logger.info("Start createCollection");
mongoCollection = mongoDB.createCollection(COLLECTION_NAME, null);
mongoCollection = mongoDB.createCollection(getCollection(), null);
Assert.assertNotNull(mongoCollection);
} catch (Throwable t) {
logger.error("createDatabase failed.", t);
Expand Down Expand Up @@ -108,17 +103,17 @@ public void simpleBSONObject() throws Throwable {
logger.info("WriteResult: {}", result.toString());
ActionFuture<IndicesExistsResponse> response = getNode().client()
.admin().indices()
.exists(new IndicesExistsRequest(INDEX_NAME));
.exists(new IndicesExistsRequest(getIndex()));
assertThat(response.actionGet().isExists(), equalTo(true));
refreshIndex();
CountResponse countResponse = getNode()
.client()
.count(countRequest(INDEX_NAME).query(
.count(countRequest(getIndex()).query(
fieldQuery("name", "Richard"))).actionGet();
logger.info("Document count: {}", countResponse.getCount());
countResponse = getNode()
.client()
.count(countRequest(INDEX_NAME)
.count(countRequest(getIndex())
.query(fieldQuery("_id", id))).actionGet();
assertThat(countResponse.getCount(), equalTo(1l));

Expand All @@ -128,10 +123,12 @@ public void simpleBSONObject() throws Throwable {
refreshIndex();
countResponse = getNode()
.client()
.count(countRequest(INDEX_NAME)
.count(countRequest(getIndex())
.query(fieldQuery("_id", id))).actionGet();
logger.debug("Count after delete request: {}", countResponse.getCount());
assertThat(countResponse.getCount(), equalTo(0L));
logger.debug("Count after delete request: {}",
countResponse.getCount());
assertThat(countResponse.getCount(), equalTo(0L));

} catch (Throwable t) {
logger.error("simpleBSONObject failed.", t);
t.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package test.elasticsearch.plugin.river.mongodb.simple;

import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test
public class RiverMongoDropCollectionDisabledTest extends RiverMongoDropCollectionTest {

protected RiverMongoDropCollectionDisabledTest() {
super("drop-river-2", "drop-river-2", "drop-collection-2", "drop-index-2");
dropCollectionOption = false;
}

@Override
@BeforeClass
public void createDatabase() {
super.createDatabase();
}

@Override
@AfterClass
public void cleanUp() {
super.cleanUp();
}

@Override
@Test
public void testDropCollection() throws Throwable {
super.testDropCollection();
}
}
Loading

0 comments on commit a4e0a07

Please sign in to comment.