Skip to content

Commit

Permalink
Create a QueueEntry class to hold items in the stream. This fixes a b…
Browse files Browse the repository at this point in the history
…ug where fields named ts or op would not be indexed by the river. It also improves readability and removes the need for several class casts.
  • Loading branch information
benmccann committed Sep 25, 2013
1 parent befd012 commit b972ddd
Showing 1 changed file with 125 additions and 114 deletions.
239 changes: 125 additions & 114 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
protected volatile boolean active = false;
protected volatile boolean startInvoked = false;

private final BlockingQueue<Map<String, Object>> stream;
private final BlockingQueue<QueueEntry> stream;

private Mongo mongo;
private DB adminDb;
Expand Down Expand Up @@ -177,9 +177,9 @@ public MongoDBRiver(final RiverName riverName,
+ definition.getMongoCollection();

if (definition.getThrottleSize() == -1) {
stream = new LinkedTransferQueue<Map<String, Object>>();
stream = new LinkedTransferQueue<QueueEntry>();
} else {
stream = new ArrayBlockingQueue<Map<String, Object>>(
stream = new ArrayBlockingQueue<QueueEntry>(
definition.getThrottleSize());
}

Expand Down Expand Up @@ -213,6 +213,8 @@ public void start() {
definition.getMongoFilter(), definition.getMongoDb(),
definition.getMongoCollection(), definition.getScript(),
definition.getIndexName(), definition.getTypeName());

// Create the index if it does not exist
try {
client.admin().indices().prepareCreate(definition.getIndexName())
.execute().actionGet();
Expand All @@ -232,6 +234,7 @@ public void start() {
}
}

// GridFS
if (definition.isMongoGridFS()) {
try {
if (logger.isDebugEnabled()) {
Expand All @@ -247,6 +250,7 @@ public void start() {
}
}

// Tail the oplog
if (isMongos()) {
DBCursor cursor = getConfigDb().getCollection("shards").find();
try {
Expand Down Expand Up @@ -440,11 +444,11 @@ public void run() {

// 1. Attempt to fill as much of the bulk request as
// possible
Map<String, Object> data = stream.take();
lastTimestamp = processBlockingQueue(bulk, data);
while ((data = stream.poll(definition.getBulkTimeout()
QueueEntry entry = stream.take();
lastTimestamp = processBlockingQueue(bulk, entry);
while ((entry = stream.poll(definition.getBulkTimeout()
.millis(), MILLISECONDS)) != null) {
lastTimestamp = processBlockingQueue(bulk, data);
lastTimestamp = processBlockingQueue(bulk, entry);
if (bulk.numberOfActions() >= definition.getBulkSize()) {
break;
}
Expand Down Expand Up @@ -486,22 +490,21 @@ public void run() {

@SuppressWarnings({ "unchecked" })
private BSONTimestamp processBlockingQueue(
final BulkRequestBuilder bulk, Map<String, Object> data) {
if (data.get(MONGODB_ID_FIELD) == null
&& !data.get(OPLOG_OPERATION).equals(
final BulkRequestBuilder bulk, QueueEntry entry) {
if (entry.getData().get(MONGODB_ID_FIELD) == null
&& !entry.getOplogOperation().equals(
OPLOG_COMMAND_OPERATION)) {
logger.warn(
"Cannot get object id. Skip the current item: [{}]",
data);
entry.getData());
return null;
}

BSONTimestamp lastTimestamp = (BSONTimestamp) data
.get(OPLOG_TIMESTAMP);
String operation = data.get(OPLOG_OPERATION).toString();
BSONTimestamp lastTimestamp = entry.getOplogTimestamp();
String operation = entry.getOplogOperation();
if (OPLOG_COMMAND_OPERATION.equals(operation)) {
try {
updateBulkRequest(bulk, data, null, operation,
updateBulkRequest(bulk, entry.getData(), null, operation,
definition.getIndexName(),
definition.getTypeName(), null, null);
} catch (IOException ioEx) {
Expand All @@ -512,16 +515,15 @@ private BSONTimestamp processBlockingQueue(

if (scriptExecutable != null
&& definition.isAdvancedTransformation()) {
return applyAdvancedTransformation(bulk, data);
return applyAdvancedTransformation(bulk, entry);
}

// String objectId = data.get(MONGODB_ID_FIELD).toString();
String objectId = "";
if (data.get(MONGODB_ID_FIELD) != null) {
objectId = data.get(MONGODB_ID_FIELD).toString();
if (entry.getData().get(MONGODB_ID_FIELD) != null) {
objectId = entry.getData().get(MONGODB_ID_FIELD).toString();
}
data.remove(OPLOG_TIMESTAMP);
data.remove(OPLOG_OPERATION);

if (logger.isDebugEnabled()) {
logger.debug("updateBulkRequest for id: [{}], operation: [{}]",
objectId, operation);
Expand All @@ -532,7 +534,7 @@ private BSONTimestamp processBlockingQueue(
"About to include collection. set attribute {} / {} ",
definition.getIncludeCollection(),
definition.getMongoCollection());
data.put(definition.getIncludeCollection(),
entry.getData().put(definition.getIncludeCollection(),
definition.getMongoCollection());
}

Expand All @@ -543,9 +545,10 @@ private BSONTimestamp processBlockingQueue(
} catch (IOException e) {
logger.warn("failed to parse {}", e);
}
Map<String, Object> data = entry.getData();
if (scriptExecutable != null) {
if (ctx != null) {
ctx.put("document", data);
ctx.put("document", entry.getData());
ctx.put("operation", operation);
if (!objectId.isEmpty()) {
ctx.put("id", objectId);
Expand Down Expand Up @@ -694,7 +697,7 @@ private BSONTimestamp processBlockingQueue(
// }
// }
} catch (IOException e) {
logger.warn("failed to parse {}", e, data);
logger.warn("failed to parse {}", e, entry.getData());
}
return lastTimestamp;
}
Expand Down Expand Up @@ -831,17 +834,14 @@ private void deleteBulkRequest(BulkRequestBuilder bulk,

@SuppressWarnings("unchecked")
private BSONTimestamp applyAdvancedTransformation(
final BulkRequestBuilder bulk, Map<String, Object> data) {
final BulkRequestBuilder bulk, QueueEntry entry) {

BSONTimestamp lastTimestamp = (BSONTimestamp) data
.get(OPLOG_TIMESTAMP);
String operation = data.get(OPLOG_OPERATION).toString();
BSONTimestamp lastTimestamp = entry.getOplogTimestamp();
String operation = entry.getOplogOperation();
String objectId = "";
if (data.get(MONGODB_ID_FIELD) != null) {
objectId = data.get(MONGODB_ID_FIELD).toString();
if (entry.getData().get(MONGODB_ID_FIELD) != null) {
objectId = entry.getData().get(MONGODB_ID_FIELD).toString();
}
data.remove(OPLOG_TIMESTAMP);
data.remove(OPLOG_OPERATION);
if (logger.isDebugEnabled()) {
logger.debug(
"advancedUpdateBulkRequest for id: [{}], operation: [{}]",
Expand All @@ -853,7 +853,7 @@ private BSONTimestamp applyAdvancedTransformation(
"About to include collection. set attribute {} / {} ",
definition.getIncludeCollection(),
definition.getMongoCollection());
data.put(definition.getIncludeCollection(),
entry.getData().put(definition.getIncludeCollection(),
definition.getMongoCollection());
}
Map<String, Object> ctx = null;
Expand All @@ -869,7 +869,7 @@ private BSONTimestamp applyAdvancedTransformation(
if (scriptExecutable != null) {
if (ctx != null && documents != null) {

document.put("data", data);
document.put("data", entry.getData());
if (!objectId.isEmpty()) {
document.put("id", objectId);
}
Expand Down Expand Up @@ -1045,10 +1045,58 @@ private class Slurper implements Runnable {
private DBCollection slurpedCollection;
private DB oplogDb;
private DBCollection oplogCollection;
private final List<ServerAddress> mongoServers;

public Slurper(List<ServerAddress> mongoServers) {
this.mongoServers = mongoServers;
this.mongo = new MongoClient(mongoServers,
definition.getMongoClientOptions());
}

@Override
public void run() {
while (active) {
try {
if (!assignCollections()) {
break; // failed to assign oplogCollection or
// slurpedCollection
}

DBCursor oplogCursor = null;
try {
oplogCursor = oplogCursor(null);
if (oplogCursor == null) {
oplogCursor = processFullCollection();
}

while (oplogCursor.hasNext()) {
DBObject item = oplogCursor.next();
processOplogEntry(item);
}
Thread.sleep(500);
} finally {
if (oplogCursor != null) {
logger.trace("Closing oplogCursor cursor");
oplogCursor.close();
}
}
} catch (MongoInterruptedException mIEx) {
logger.warn("Mongo driver has been interrupted");
if (mongo != null) {
mongo.close();
mongo = null;
}
break;
} catch (MongoException mEx) {
logger.error("Mongo gave an exception", mEx);
} catch (NoSuchElementException nEx) {
logger.warn("A mongoDB cursor bug ?", nEx);
} catch (InterruptedException e) {
if (logger.isDebugEnabled()) {
logger.debug("river-mongodb slurper interrupted");
}
Thread.currentThread().interrupt();
break;
}
}
}

private boolean assignCollections() {
Expand Down Expand Up @@ -1121,79 +1169,21 @@ private boolean assignCollections() {

return true;
}

@Override
public void run() {
mongo = new MongoClient(mongoServers,
definition.getMongoClientOptions());

while (active) {
try {
if (!assignCollections()) {
break; // failed to assign oplogCollection or
// slurpedCollection
}

DBCursor oplogCursor = null;
try {
oplogCursor = oplogCursor(null);
if (oplogCursor == null) {
oplogCursor = processFullCollection();
}

while (oplogCursor.hasNext()) {
DBObject item = oplogCursor.next();
processOplogEntry(item);
}
Thread.sleep(500);
} finally {
if (oplogCursor != null) {
logger.trace("Closing oplogCursor cursor");
oplogCursor.close();
}
}
} catch (MongoInterruptedException mIEx) {
logger.warn("Mongo driver has been interrupted");
if (mongo != null) {
mongo.close();
mongo = null;
}
break;
} catch (MongoException mEx) {
logger.error("Mongo gave an exception", mEx);
} catch (NoSuchElementException nEx) {
logger.warn("A mongoDB cursor bug ?", nEx);
} catch (InterruptedException e) {
if (logger.isDebugEnabled()) {
logger.debug("river-mongodb slurper interrupted");
}
Thread.currentThread().interrupt();
break;
}
}
}


/*
* Remove fscynlock and unlock -
* https://github.com/richardwilly98/elasticsearch
* -river-mongodb/issues/17
*/
private DBCursor processFullCollection() throws InterruptedException {
// CommandResult lockResult = mongo.fsyncAndLock();
// if (lockResult.ok()) {
try {
BSONTimestamp currentTimestamp = (BSONTimestamp) oplogCollection
.find().sort(new BasicDBObject(OPLOG_TIMESTAMP, -1))
.limit(1).next().get(OPLOG_TIMESTAMP);
addQueryToStream(OPLOG_INSERT_OPERATION, currentTimestamp, null);
return oplogCursor(currentTimestamp);
} finally {
// mongo.unlock();
}
// } else {
// throw new MongoException(
// "Could not lock the database for FullCollection sync");
// }
BSONTimestamp currentTimestamp = (BSONTimestamp) oplogCollection
.find()
.sort(new BasicDBObject(OPLOG_TIMESTAMP, -1))
.limit(1)
.next()
.get(OPLOG_TIMESTAMP);
addQueryToStream(OPLOG_INSERT_OPERATION, currentTimestamp, null);
return oplogCursor(currentTimestamp);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -1268,8 +1258,8 @@ private void processOplogEntry(final DBObject entry)
logger.debug("Updated item: {}", update);
addQueryToStream(operation, oplogTimestamp, update);
} else {
object = applyFieldFilter(object);
addToStream(operation, oplogTimestamp, object.toMap());
Map<String, Object> map = applyFieldFilter(object).toMap();
addToStream(operation, oplogTimestamp, map);
}
}
}
Expand Down Expand Up @@ -1436,16 +1426,8 @@ private void addToStream(final String operation,
"addToStream - operation [{}], currentTimestamp [{}], data [{}]",
operation, currentTimestamp, data);
}
data.put(OPLOG_TIMESTAMP, currentTimestamp);
data.put(OPLOG_OPERATION, operation);

// stream.add(data);
stream.put(data);
// try {
// stream.put(data);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }

stream.put(new QueueEntry(currentTimestamp, operation, data));
}

}
Expand Down Expand Up @@ -1554,4 +1536,33 @@ private void updateLastTimestamp(final String namespace,
}
}

protected static class QueueEntry {

private BSONTimestamp oplogTimestamp;
private String oplogOperation;
private Map<String, Object> data;

public QueueEntry(
BSONTimestamp oplogTimestamp,
String oplogOperation,
Map<String, Object> data) {
this.oplogTimestamp = oplogTimestamp;
this.oplogOperation = oplogOperation;
this.data = data;
}

public BSONTimestamp getOplogTimestamp() {
return oplogTimestamp;
}

public String getOplogOperation() {
return oplogOperation;
}

public Map<String, Object> getData() {
return data;
}

}

}

0 comments on commit b972ddd

Please sign in to comment.