diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java index af1f3940..e0590a09 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java @@ -142,7 +142,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River { protected volatile boolean active = false; protected volatile boolean startInvoked = false; - private final BlockingQueue> stream; + private final BlockingQueue stream; private Mongo mongo; private DB adminDb; @@ -177,9 +177,9 @@ public MongoDBRiver(final RiverName riverName, + definition.getMongoCollection(); if (definition.getThrottleSize() == -1) { - stream = new LinkedTransferQueue>(); + stream = new LinkedTransferQueue(); } else { - stream = new ArrayBlockingQueue>( + stream = new ArrayBlockingQueue( definition.getThrottleSize()); } @@ -213,6 +213,8 @@ public void start() { definition.getMongoFilter(), definition.getMongoDb(), definition.getMongoCollection(), definition.getScript(), definition.getIndexName(), definition.getTypeName()); + + // Create the index if it does not exist try { client.admin().indices().prepareCreate(definition.getIndexName()) .execute().actionGet(); @@ -232,6 +234,7 @@ public void start() { } } + // GridFS if (definition.isMongoGridFS()) { try { if (logger.isDebugEnabled()) { @@ -247,6 +250,7 @@ public void start() { } } + // Tail the oplog if (isMongos()) { DBCursor cursor = getConfigDb().getCollection("shards").find(); try { @@ -440,11 +444,11 @@ public void run() { // 1. Attempt to fill as much of the bulk request as // possible - Map data = stream.take(); - lastTimestamp = processBlockingQueue(bulk, data); - while ((data = stream.poll(definition.getBulkTimeout() + QueueEntry entry = stream.take(); + lastTimestamp = processBlockingQueue(bulk, entry); + while ((entry = stream.poll(definition.getBulkTimeout() .millis(), MILLISECONDS)) != null) { - lastTimestamp = processBlockingQueue(bulk, data); + lastTimestamp = processBlockingQueue(bulk, entry); if (bulk.numberOfActions() >= definition.getBulkSize()) { break; } @@ -486,22 +490,21 @@ public void run() { @SuppressWarnings({ "unchecked" }) private BSONTimestamp processBlockingQueue( - final BulkRequestBuilder bulk, Map data) { - if (data.get(MONGODB_ID_FIELD) == null - && !data.get(OPLOG_OPERATION).equals( + final BulkRequestBuilder bulk, QueueEntry entry) { + if (entry.getData().get(MONGODB_ID_FIELD) == null + && !entry.getOplogOperation().equals( OPLOG_COMMAND_OPERATION)) { logger.warn( "Cannot get object id. Skip the current item: [{}]", - data); + entry.getData()); return null; } - BSONTimestamp lastTimestamp = (BSONTimestamp) data - .get(OPLOG_TIMESTAMP); - String operation = data.get(OPLOG_OPERATION).toString(); + BSONTimestamp lastTimestamp = entry.getOplogTimestamp(); + String operation = entry.getOplogOperation(); if (OPLOG_COMMAND_OPERATION.equals(operation)) { try { - updateBulkRequest(bulk, data, null, operation, + updateBulkRequest(bulk, entry.getData(), null, operation, definition.getIndexName(), definition.getTypeName(), null, null); } catch (IOException ioEx) { @@ -512,16 +515,15 @@ private BSONTimestamp processBlockingQueue( if (scriptExecutable != null && definition.isAdvancedTransformation()) { - return applyAdvancedTransformation(bulk, data); + return applyAdvancedTransformation(bulk, entry); } // String objectId = data.get(MONGODB_ID_FIELD).toString(); String objectId = ""; - if (data.get(MONGODB_ID_FIELD) != null) { - objectId = data.get(MONGODB_ID_FIELD).toString(); + if (entry.getData().get(MONGODB_ID_FIELD) != null) { + objectId = entry.getData().get(MONGODB_ID_FIELD).toString(); } - data.remove(OPLOG_TIMESTAMP); - data.remove(OPLOG_OPERATION); + if (logger.isDebugEnabled()) { logger.debug("updateBulkRequest for id: [{}], operation: [{}]", objectId, operation); @@ -532,7 +534,7 @@ private BSONTimestamp processBlockingQueue( "About to include collection. set attribute {} / {} ", definition.getIncludeCollection(), definition.getMongoCollection()); - data.put(definition.getIncludeCollection(), + entry.getData().put(definition.getIncludeCollection(), definition.getMongoCollection()); } @@ -543,9 +545,10 @@ private BSONTimestamp processBlockingQueue( } catch (IOException e) { logger.warn("failed to parse {}", e); } + Map data = entry.getData(); if (scriptExecutable != null) { if (ctx != null) { - ctx.put("document", data); + ctx.put("document", entry.getData()); ctx.put("operation", operation); if (!objectId.isEmpty()) { ctx.put("id", objectId); @@ -694,7 +697,7 @@ private BSONTimestamp processBlockingQueue( // } // } } catch (IOException e) { - logger.warn("failed to parse {}", e, data); + logger.warn("failed to parse {}", e, entry.getData()); } return lastTimestamp; } @@ -831,17 +834,14 @@ private void deleteBulkRequest(BulkRequestBuilder bulk, @SuppressWarnings("unchecked") private BSONTimestamp applyAdvancedTransformation( - final BulkRequestBuilder bulk, Map data) { + final BulkRequestBuilder bulk, QueueEntry entry) { - BSONTimestamp lastTimestamp = (BSONTimestamp) data - .get(OPLOG_TIMESTAMP); - String operation = data.get(OPLOG_OPERATION).toString(); + BSONTimestamp lastTimestamp = entry.getOplogTimestamp(); + String operation = entry.getOplogOperation(); String objectId = ""; - if (data.get(MONGODB_ID_FIELD) != null) { - objectId = data.get(MONGODB_ID_FIELD).toString(); + if (entry.getData().get(MONGODB_ID_FIELD) != null) { + objectId = entry.getData().get(MONGODB_ID_FIELD).toString(); } - data.remove(OPLOG_TIMESTAMP); - data.remove(OPLOG_OPERATION); if (logger.isDebugEnabled()) { logger.debug( "advancedUpdateBulkRequest for id: [{}], operation: [{}]", @@ -853,7 +853,7 @@ private BSONTimestamp applyAdvancedTransformation( "About to include collection. set attribute {} / {} ", definition.getIncludeCollection(), definition.getMongoCollection()); - data.put(definition.getIncludeCollection(), + entry.getData().put(definition.getIncludeCollection(), definition.getMongoCollection()); } Map ctx = null; @@ -869,7 +869,7 @@ private BSONTimestamp applyAdvancedTransformation( if (scriptExecutable != null) { if (ctx != null && documents != null) { - document.put("data", data); + document.put("data", entry.getData()); if (!objectId.isEmpty()) { document.put("id", objectId); } @@ -1045,10 +1045,58 @@ private class Slurper implements Runnable { private DBCollection slurpedCollection; private DB oplogDb; private DBCollection oplogCollection; - private final List mongoServers; public Slurper(List mongoServers) { - this.mongoServers = mongoServers; + this.mongo = new MongoClient(mongoServers, + definition.getMongoClientOptions()); + } + + @Override + public void run() { + while (active) { + try { + if (!assignCollections()) { + break; // failed to assign oplogCollection or + // slurpedCollection + } + + DBCursor oplogCursor = null; + try { + oplogCursor = oplogCursor(null); + if (oplogCursor == null) { + oplogCursor = processFullCollection(); + } + + while (oplogCursor.hasNext()) { + DBObject item = oplogCursor.next(); + processOplogEntry(item); + } + Thread.sleep(500); + } finally { + if (oplogCursor != null) { + logger.trace("Closing oplogCursor cursor"); + oplogCursor.close(); + } + } + } catch (MongoInterruptedException mIEx) { + logger.warn("Mongo driver has been interrupted"); + if (mongo != null) { + mongo.close(); + mongo = null; + } + break; + } catch (MongoException mEx) { + logger.error("Mongo gave an exception", mEx); + } catch (NoSuchElementException nEx) { + logger.warn("A mongoDB cursor bug ?", nEx); + } catch (InterruptedException e) { + if (logger.isDebugEnabled()) { + logger.debug("river-mongodb slurper interrupted"); + } + Thread.currentThread().interrupt(); + break; + } + } } private boolean assignCollections() { @@ -1121,79 +1169,21 @@ private boolean assignCollections() { return true; } - - @Override - public void run() { - mongo = new MongoClient(mongoServers, - definition.getMongoClientOptions()); - - while (active) { - try { - if (!assignCollections()) { - break; // failed to assign oplogCollection or - // slurpedCollection - } - - DBCursor oplogCursor = null; - try { - oplogCursor = oplogCursor(null); - if (oplogCursor == null) { - oplogCursor = processFullCollection(); - } - - while (oplogCursor.hasNext()) { - DBObject item = oplogCursor.next(); - processOplogEntry(item); - } - Thread.sleep(500); - } finally { - if (oplogCursor != null) { - logger.trace("Closing oplogCursor cursor"); - oplogCursor.close(); - } - } - } catch (MongoInterruptedException mIEx) { - logger.warn("Mongo driver has been interrupted"); - if (mongo != null) { - mongo.close(); - mongo = null; - } - break; - } catch (MongoException mEx) { - logger.error("Mongo gave an exception", mEx); - } catch (NoSuchElementException nEx) { - logger.warn("A mongoDB cursor bug ?", nEx); - } catch (InterruptedException e) { - if (logger.isDebugEnabled()) { - logger.debug("river-mongodb slurper interrupted"); - } - Thread.currentThread().interrupt(); - break; - } - } - } - + /* * Remove fscynlock and unlock - * https://github.com/richardwilly98/elasticsearch * -river-mongodb/issues/17 */ private DBCursor processFullCollection() throws InterruptedException { - // CommandResult lockResult = mongo.fsyncAndLock(); - // if (lockResult.ok()) { - try { - BSONTimestamp currentTimestamp = (BSONTimestamp) oplogCollection - .find().sort(new BasicDBObject(OPLOG_TIMESTAMP, -1)) - .limit(1).next().get(OPLOG_TIMESTAMP); - addQueryToStream(OPLOG_INSERT_OPERATION, currentTimestamp, null); - return oplogCursor(currentTimestamp); - } finally { - // mongo.unlock(); - } - // } else { - // throw new MongoException( - // "Could not lock the database for FullCollection sync"); - // } + BSONTimestamp currentTimestamp = (BSONTimestamp) oplogCollection + .find() + .sort(new BasicDBObject(OPLOG_TIMESTAMP, -1)) + .limit(1) + .next() + .get(OPLOG_TIMESTAMP); + addQueryToStream(OPLOG_INSERT_OPERATION, currentTimestamp, null); + return oplogCursor(currentTimestamp); } @SuppressWarnings("unchecked") @@ -1268,8 +1258,8 @@ private void processOplogEntry(final DBObject entry) logger.debug("Updated item: {}", update); addQueryToStream(operation, oplogTimestamp, update); } else { - object = applyFieldFilter(object); - addToStream(operation, oplogTimestamp, object.toMap()); + Map map = applyFieldFilter(object).toMap(); + addToStream(operation, oplogTimestamp, map); } } } @@ -1436,16 +1426,8 @@ private void addToStream(final String operation, "addToStream - operation [{}], currentTimestamp [{}], data [{}]", operation, currentTimestamp, data); } - data.put(OPLOG_TIMESTAMP, currentTimestamp); - data.put(OPLOG_OPERATION, operation); - - // stream.add(data); - stream.put(data); - // try { - // stream.put(data); - // } catch (InterruptedException e) { - // e.printStackTrace(); - // } + + stream.put(new QueueEntry(currentTimestamp, operation, data)); } } @@ -1554,4 +1536,33 @@ private void updateLastTimestamp(final String namespace, } } + protected static class QueueEntry { + + private BSONTimestamp oplogTimestamp; + private String oplogOperation; + private Map data; + + public QueueEntry( + BSONTimestamp oplogTimestamp, + String oplogOperation, + Map data) { + this.oplogTimestamp = oplogTimestamp; + this.oplogOperation = oplogOperation; + this.data = data; + } + + public BSONTimestamp getOplogTimestamp() { + return oplogTimestamp; + } + + public String getOplogOperation() { + return oplogOperation; + } + + public Map getData() { + return data; + } + + } + }