Skip to content

Commit

Permalink
Implement feature #102
Browse files Browse the repository at this point in the history
Example in javascript (river will process the document with last
timestamp of now + 5 seconds):
"options": {
"initial_timestamp": {
"script_type": "js",
"script": "var date = new Date(); date.setSeconds(date.getSeconds() +
5); new java.lang.Long(date.getTime());"
}
},
  • Loading branch information
richardwilly98 committed Aug 2, 2013
1 parent 3d698ca commit 57fc1c7
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 0 deletions.
51 changes: 51 additions & 0 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
Expand Down Expand Up @@ -120,6 +122,9 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
public final static String DROP_COLLECTION_FIELD = "drop_collection";
public final static String EXCLUDE_FIELDS_FIELD = "exclude_fields";
public final static String INCLUDE_COLLECTION_FIELD = "include_collection";
public final static String INITIAL_TIMESTAMP_FIELD = "initial_timestamp";
public final static String INITIAL_TIMESTAMP_SCRIPT_TYPE_FIELD = "script_type";
public final static String INITIAL_TIMESTAMP_SCRIPT_FIELD = "script";
public final static String FILTER_FIELD = "filter";
public final static String CREDENTIALS_FIELD = "credentials";
public final static String USER_FIELD = "user";
Expand Down Expand Up @@ -189,6 +194,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
protected final boolean dropCollection;
protected final Set<String> excludeFields;
protected final String includeCollection;
protected final BSONTimestamp initialTimestamp;

private final BasicDBObject findKeys = new BasicDBObject();
private final ExecutableScript script;
Expand Down Expand Up @@ -264,6 +270,7 @@ public MongoDBRiver(final RiverName riverName,
if (mongoSettings.containsKey(OPTIONS_FIELD)) {
Map<String, Object> mongoOptionsSettings = (Map<String, Object>) mongoSettings
.get(OPTIONS_FIELD);
logger.trace("mongoOptionsSettings: " + mongoOptionsSettings);
mongoSecondaryReadPreference = XContentMapValues
.nodeBooleanValue(mongoOptionsSettings
.get(SECONDARY_READ_PREFERENCE_FIELD), false);
Expand Down Expand Up @@ -301,13 +308,52 @@ public MongoDBRiver(final RiverName riverName,
} else {
excludeFields = null;
}
if (mongoOptionsSettings.containsKey(INITIAL_TIMESTAMP_FIELD)) {
BSONTimestamp timeStamp = null;
try {
Map<String, Object> initalTimestampSettings = (Map<String, Object>) mongoOptionsSettings
.get(INITIAL_TIMESTAMP_FIELD);
String scriptType = "js";
if (initalTimestampSettings
.containsKey(INITIAL_TIMESTAMP_SCRIPT_TYPE_FIELD)) {
scriptType = initalTimestampSettings.get(
INITIAL_TIMESTAMP_SCRIPT_TYPE_FIELD)
.toString();
}
if (initalTimestampSettings
.containsKey(INITIAL_TIMESTAMP_SCRIPT_FIELD)) {

ExecutableScript script = scriptService.executable(
scriptType,
initalTimestampSettings.get(
INITIAL_TIMESTAMP_SCRIPT_FIELD)
.toString(), Maps.newHashMap());
Object ctx = script.run();
logger.trace(
"initialTimestamp script returned: {}", ctx);
if (ctx != null) {
long timestamp = Long.parseLong(ctx.toString());
timeStamp = new BSONTimestamp((int) (new Date(
timestamp).getTime() / 1000), 1);
}
}
} catch (Throwable t) {
logger.warn("Could set initial timestamp", t,
new Object());
} finally {
initialTimestamp = timeStamp;
}
} else {
initialTimestamp = null;
}
} else {
mongoSecondaryReadPreference = false;
dropCollection = false;
includeCollection = "";
excludeFields = null;
mongoUseSSL = false;
mongoSSLVerifyCertificate = false;
initialTimestamp = null;
}

// Credentials
Expand Down Expand Up @@ -414,6 +460,7 @@ public MongoDBRiver(final RiverName riverName,
excludeFields = null;
mongoUseSSL = false;
mongoSSLVerifyCertificate = false;
initialTimestamp = null;
}
mongoOplogNamespace = mongoDb + "." + mongoCollection;

Expand Down Expand Up @@ -1407,6 +1454,10 @@ private BSONTimestamp getLastTimestamp(final String namespace) {

}
}
} else {
if (initialTimestamp != null) {
return initialTimestamp;
}
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package test.elasticsearch.plugin.river.mongodb.simple;

import static org.elasticsearch.client.Requests.countRequest;
import static org.elasticsearch.common.io.Streams.copyToStringFromClasspath;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.count.CountResponse;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import test.elasticsearch.plugin.river.mongodb.RiverMongoDBTestAsbtract;

import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.WriteConcern;
import com.mongodb.util.JSON;

@Test
public class RiverMongoInitialTimestampTest extends RiverMongoDBTestAsbtract {

private static final String TEST_SIMPLE_MONGODB_RIVER_INITIAL_TIMESTAMP_JSON = "/test/elasticsearch/plugin/river/mongodb/simple/test-simple-mongodb-river-initial-timestamp.json";
private static final String GROOVY_SCRIPT_TYPE = "groovy";
private static final String JAVASCRIPT_SCRIPT_TYPE = "js";
private DB mongoDB;
private DBCollection mongoCollection;

protected RiverMongoInitialTimestampTest() {
super("initial-timestamp-river-" + System.currentTimeMillis(),
"initial-timestamp-river-" + System.currentTimeMillis(),
"initial-timestamp-collection-" + System.currentTimeMillis(),
"initial-timestamp-index-" + System.currentTimeMillis());
}

protected RiverMongoInitialTimestampTest(String river, String database,
String collection, String index) {
super(river, database, collection, index);
}

@BeforeClass
public void createDatabase() {
logger.debug("createDatabase {}", getDatabase());
try {
mongoDB = getMongo().getDB(getDatabase());
mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE);
logger.info("Start createCollection");
mongoCollection = mongoDB.createCollection(getCollection(), null);
Assert.assertNotNull(mongoCollection);
} catch (Throwable t) {
logger.error("createDatabase failed.", t);
}
}

@AfterClass
public void cleanUp() {
// super.deleteRiver();
logger.info("Drop database " + mongoDB.getName());
mongoDB.dropDatabase();
}

//String script = "def now = new Date(); println 'Now: ${now}'; ctx.document.modified = now.clearTime();";
@Test
public void testInitialTimestampInGroovy() throws Throwable {
logger.debug("Start testInitialTimestampInGroovy");
try {
String script = "import groovy.time.TimeCategory; use(TimeCategory){def date = new Date() + 5.second; date.time;}";
super.createRiver(TEST_SIMPLE_MONGODB_RIVER_INITIAL_TIMESTAMP_JSON,
getRiver(), (Object) String.valueOf(getMongoPort1()),
(Object) String.valueOf(getMongoPort2()),
(Object) String.valueOf(getMongoPort3()),
(Object) GROOVY_SCRIPT_TYPE, (Object) script,
(Object) getDatabase(), (Object) getCollection(),
(Object) getIndex(), (Object) getDatabase());

String mongoDocument = copyToStringFromClasspath(TEST_SIMPLE_MONGODB_DOCUMENT_JSON);
DBObject dbObject = (DBObject) JSON.parse(mongoDocument);
mongoCollection.insert(dbObject);
Thread.sleep(wait);

assertThat(
getNode().client().admin().indices()
.exists(new IndicesExistsRequest(getIndex()))
.actionGet().isExists(), equalTo(true));

refreshIndex();

CountResponse countResponse = getNode().client()
.count(countRequest(getIndex())).actionGet();
assertThat(countResponse.getCount(), equalTo(0L));

mongoCollection.remove(dbObject);

// Wait 5 seconds and store a new document
Thread.sleep(5000);

dbObject = (DBObject) JSON.parse(mongoDocument);
mongoCollection.insert(dbObject);
Thread.sleep(wait);

assertThat(
getNode().client().admin().indices()
.exists(new IndicesExistsRequest(getIndex()))
.actionGet().isExists(), equalTo(true));
assertThat(
getNode().client().admin().indices()
.prepareTypesExists(getIndex())
.setTypes(getDatabase()).execute().actionGet()
.isExists(), equalTo(true));

refreshIndex();

countResponse = getNode().client().count(countRequest(getIndex()))
.actionGet();
assertThat(countResponse.getCount(), equalTo(1L));

mongoCollection.remove(dbObject);
} catch (Throwable t) {
logger.error("testInitialTimestampInGroovy failed.", t);
t.printStackTrace();
throw t;
} finally {
super.deleteRiver();
super.deleteIndex();
}
}

// Convert JavaScript types to Java types: http://stackoverflow.com/questions/6730062/passing-common-types-between-java-and-rhino-javascript
@Test
public void testInitialTimestampInJavascript() throws Throwable {
logger.debug("Start testInitialTimestampInJavascript");
try {
String script = "var date = new Date(); date.setSeconds(date.getSeconds() + 5); new java.lang.Long(date.getTime());";
super.createRiver(TEST_SIMPLE_MONGODB_RIVER_INITIAL_TIMESTAMP_JSON,
getRiver(), (Object) String.valueOf(getMongoPort1()),
(Object) String.valueOf(getMongoPort2()),
(Object) String.valueOf(getMongoPort3()),
(Object) JAVASCRIPT_SCRIPT_TYPE, (Object) script,
(Object) getDatabase(), (Object) getCollection(),
(Object) getIndex(), (Object) getDatabase());

String mongoDocument = copyToStringFromClasspath(TEST_SIMPLE_MONGODB_DOCUMENT_JSON);
DBObject dbObject = (DBObject) JSON.parse(mongoDocument);
mongoCollection.insert(dbObject);
Thread.sleep(wait);

assertThat(
getNode().client().admin().indices()
.exists(new IndicesExistsRequest(getIndex()))
.actionGet().isExists(), equalTo(true));

refreshIndex();

CountResponse countResponse = getNode().client()
.count(countRequest(getIndex())).actionGet();
assertThat(countResponse.getCount(), equalTo(0L));

mongoCollection.remove(dbObject);

// Wait 5 seconds and store a new document
Thread.sleep(5000);

dbObject = (DBObject) JSON.parse(mongoDocument);
mongoCollection.insert(dbObject);
Thread.sleep(wait);

assertThat(
getNode().client().admin().indices()
.exists(new IndicesExistsRequest(getIndex()))
.actionGet().isExists(), equalTo(true));
assertThat(
getNode().client().admin().indices()
.prepareTypesExists(getIndex())
.setTypes(getDatabase()).execute().actionGet()
.isExists(), equalTo(true));

refreshIndex();

countResponse = getNode().client().count(countRequest(getIndex()))
.actionGet();
assertThat(countResponse.getCount(), equalTo(1L));

mongoCollection.remove(dbObject);
} catch (Throwable t) {
logger.error("testInitialTimestampInJavascript failed.", t);
t.printStackTrace();
throw t;
} finally {
super.deleteRiver();
super.deleteIndex();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"type": "mongodb",
"mongodb": {
"servers": [{
"host": "localhost",
"port": %s
},
{
"host": "localhost",
"port": %s
},
{
"host": "localhost",
"port": %s
}],
"options": {
"secondary_read_preference": true,
"initial_timestamp": {
"script_type": "%s",
"script": "%s"
}
},
"db": "%s",
"collection": "%s",
"gridfs": false
},
"index": {
"name": "%s",
"type": "%s",
"throttle_size": 2000
}
}

0 comments on commit 57fc1c7

Please sign in to comment.