Skip to content

Commit

Permalink
Feature #76
Browse files Browse the repository at this point in the history
- Implement exclude fields feature using exclude_fields in options
attribute.
- Example:
"options": {
"exclude_fields": ["exclude-field-1", "exclude-field-2"]
},
  • Loading branch information
richardwilly98 committed May 18, 2013
1 parent a4e0a07 commit 1070857
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 3 deletions.
34 changes: 31 additions & 3 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -103,6 +104,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
public final static String OPTIONS_FIELD = "options";
public final static String SECONDARY_READ_PREFERENCE_FIELD = "secondary_read_preference";
public final static String DROP_COLLECTION_FIELD = "drop_collection";
public final static String EXCLUDE_FIELDS_FIELD = "exclude_fields";
public final static String FILTER_FIELD = "filter";
public final static String CREDENTIALS_FIELD = "credentials";
public final static String USER_FIELD = "user";
Expand Down Expand Up @@ -140,7 +142,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
public final static String OPLOG_UPDATE_OPERATION = "u";
public final static String OPLOG_INSERT_OPERATION = "i";
public final static String OPLOG_DELETE_OPERATION = "d";
public final static String OPLOG_COLLECTION_OPERATION = "c";
public final static String OPLOG_COMMAND_OPERATION = "c";
public final static String OPLOG_TIMESTAMP = "ts";
public final static String GRIDFS_FILES_SUFFIX = ".files";
public final static String GRIDFS_CHUNKS_SUFFIX = ".chunks";
Expand All @@ -167,6 +169,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
protected final TimeValue bulkTimeout;
protected final int throttleSize;
protected final boolean dropCollection;
protected final Set<String> excludeFields;

private final ExecutableScript script;

Expand Down Expand Up @@ -246,9 +249,27 @@ public MongoDBRiver(final RiverName riverName,
dropCollection = XContentMapValues
.nodeBooleanValue(mongoOptionsSettings
.get(DROP_COLLECTION_FIELD), false);

if (mongoOptionsSettings.containsKey(EXCLUDE_FIELDS_FIELD)) {
excludeFields = new HashSet<String>();
Object excludeFieldsSettings = mongoOptionsSettings.get(EXCLUDE_FIELDS_FIELD);
logger.info("excludeFieldsSettings: " + excludeFieldsSettings);
boolean array = XContentMapValues.isArray(excludeFieldsSettings);

if (array) {
ArrayList<String> fields = (ArrayList<String>) excludeFieldsSettings;
for (String field : fields) {
logger.info("Field: " + field);
excludeFields.add(field);
}
}
} else {
excludeFields = null;
}
} else {
mongoSecondaryReadPreference = false;
dropCollection = false;
excludeFields = null;
}

// Credentials
Expand Down Expand Up @@ -350,6 +371,7 @@ public MongoDBRiver(final RiverName riverName,
// mongoDbPassword = "";
script = null;
dropCollection = false;
excludeFields = null;
}
mongoOplogNamespace = mongoDb + "." + mongoCollection;

Expand Down Expand Up @@ -635,7 +657,7 @@ private BSONTimestamp updateBulkRequest(final BulkRequestBuilder bulk,
Map<String, Object> data) {
if (data.get(MONGODB_ID_FIELD) == null
&& !data.get(OPLOG_OPERATION).equals(
OPLOG_COLLECTION_OPERATION)) {
OPLOG_COMMAND_OPERATION)) {
logger.warn(
"Cannot get object id. Skip the current item: [{}]",
data);
Expand Down Expand Up @@ -751,7 +773,7 @@ private BSONTimestamp updateBulkRequest(final BulkRequestBuilder bulk,
routing).parent(parent));
deletedDocuments++;
}
if (OPLOG_COLLECTION_OPERATION.equals(operation)) {
if (OPLOG_COMMAND_OPERATION.equals(operation)) {
if (dropCollection) {
logger.info("Drop collection request [{}], [{}]", index,
type);
Expand Down Expand Up @@ -974,6 +996,12 @@ private void processOplogEntry(final DBObject entry)
.get(OPLOG_TIMESTAMP);
DBObject object = (DBObject) entry.get(OPLOG_OBJECT);

if (excludeFields != null) {
for(String excludeField : excludeFields) {
object.removeField(excludeField);
}
}

// Initial support for sharded collection -
// https://jira.mongodb.org/browse/SERVER-4333
// Not interested in operation from migration or sharding
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package test.elasticsearch.plugin.river.mongodb.simple;

import static org.elasticsearch.index.query.QueryBuilders.fieldQuery;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.util.Map;

import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import test.elasticsearch.plugin.river.mongodb.RiverMongoDBTestAsbtract;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.WriteConcern;

@Test
public class RiverMongoExcludeFieldsTest extends RiverMongoDBTestAsbtract {

private final ESLogger logger = Loggers.getLogger(getClass());

private DB mongoDB;
private DBCollection mongoCollection;
protected boolean dropCollectionOption = true;

protected RiverMongoExcludeFieldsTest() {
super("exclude-fields-river-1", "exclude-fields-db-1", "exclude-fields-collection-1", "exclude-fields-index-1");
}

protected RiverMongoExcludeFieldsTest(String river, String database,
String collection, String index) {
super(river, database, collection, index);
}

@BeforeClass
public void createDatabase() {
logger.debug("createDatabase {}", getDatabase());
try {
mongoDB = getMongo().getDB(getDatabase());
mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE);
super.createRiver(
"/test/elasticsearch/plugin/river/mongodb/simple/test-simple-mongodb-river-exclude-fields.json",
getRiver(),
(Object) String.valueOf(getMongoPort1()),
(Object) String.valueOf(getMongoPort2()),
(Object) String.valueOf(getMongoPort3()),
(Object) "[\"exclude-field-1\", \"exclude-field-2\"]",
(Object) getDatabase(), (Object) getCollection(),
(Object) getIndex(), (Object) getDatabase());
logger.info("Start createCollection");
mongoCollection = mongoDB.createCollection(getCollection(), null);
Assert.assertNotNull(mongoCollection);
} catch (Throwable t) {
logger.error("createDatabase failed.", t);
}
}

@AfterClass
public void cleanUp() {
super.deleteRiver();
logger.info("Drop database " + mongoDB.getName());
mongoDB.dropDatabase();
}

@Test
public void testExcludeFields() throws Throwable {
logger.debug("Start testExcludeFields");
try {
DBObject dbObject = new BasicDBObject();
dbObject.put("exclude-field-1", System.currentTimeMillis());
dbObject.put("exclude-field-2", System.currentTimeMillis());
dbObject.put("include-field-1", System.currentTimeMillis());
mongoCollection.insert(dbObject);
Thread.sleep(1000);
String id = dbObject.get("_id").toString();
assertThat(getNode().client()
.admin().indices()
.exists(new IndicesExistsRequest(getIndex())).actionGet().isExists(), equalTo(true));
refreshIndex();

SearchResponse sr = getNode().client().prepareSearch(getIndex())
.setQuery(fieldQuery("_id", id)).execute().actionGet();
logger.debug("SearchResponse {}", sr.toString());
long totalHits = sr.getHits().getTotalHits();
logger.debug("TotalHits: {}", totalHits);
assertThat(totalHits, equalTo(1l));

Map<String, Object> object = sr.getHits().getHits()[0].sourceAsMap();
assertThat( object.containsKey("exclude-field-1"), equalTo(false));
assertThat( object.containsKey("exclude-field-1"), equalTo(false));
assertThat( object.containsKey("include-field-1"), equalTo(true));
} catch (Throwable t) {
logger.error("testExcludeFields failed.", t);
t.printStackTrace();
throw t;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"type": "mongodb",
"mongodb": {
"servers": [{
"host": "localhost",
"port": %s
},
{
"host": "localhost",
"port": %s
},
{
"host": "localhost",
"port": %s
}],
"options": {
"secondary_read_preference": true,
"exclude_fields": %s
},
"db": "%s",
"collection": "%s",
"gridfs": false
},
"index": {
"name": "%s",
"type": "%s",
"throttle_size": 2000
}
}

0 comments on commit 1070857

Please sign in to comment.