Skip to content

Commit

Permalink
Merge pull request #136 from benmccann/queue-entry-cleanup
Browse files Browse the repository at this point in the history
Bug fix: mongodb fields "ts" and "op" in user collection not picked up by river
  • Loading branch information
richardwilly98 committed Sep 25, 2013
2 parents befd012 + b972ddd commit 2e4ac12
Showing 1 changed file with 125 additions and 114 deletions.
239 changes: 125 additions & 114 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
protected volatile boolean active = false;
protected volatile boolean startInvoked = false;

private final BlockingQueue<Map<String, Object>> stream;
private final BlockingQueue<QueueEntry> stream;

private Mongo mongo;
private DB adminDb;
Expand Down Expand Up @@ -177,9 +177,9 @@ public MongoDBRiver(final RiverName riverName,
+ definition.getMongoCollection();

if (definition.getThrottleSize() == -1) {
stream = new LinkedTransferQueue<Map<String, Object>>();
stream = new LinkedTransferQueue<QueueEntry>();
} else {
stream = new ArrayBlockingQueue<Map<String, Object>>(
stream = new ArrayBlockingQueue<QueueEntry>(
definition.getThrottleSize());
}

Expand Down Expand Up @@ -213,6 +213,8 @@ public void start() {
definition.getMongoFilter(), definition.getMongoDb(),
definition.getMongoCollection(), definition.getScript(),
definition.getIndexName(), definition.getTypeName());

// Create the index if it does not exist
try {
client.admin().indices().prepareCreate(definition.getIndexName())
.execute().actionGet();
Expand All @@ -232,6 +234,7 @@ public void start() {
}
}

// GridFS
if (definition.isMongoGridFS()) {
try {
if (logger.isDebugEnabled()) {
Expand All @@ -247,6 +250,7 @@ public void start() {
}
}

// Tail the oplog
if (isMongos()) {
DBCursor cursor = getConfigDb().getCollection("shards").find();
try {
Expand Down Expand Up @@ -440,11 +444,11 @@ public void run() {

// 1. Attempt to fill as much of the bulk request as
// possible
Map<String, Object> data = stream.take();
lastTimestamp = processBlockingQueue(bulk, data);
while ((data = stream.poll(definition.getBulkTimeout()
QueueEntry entry = stream.take();
lastTimestamp = processBlockingQueue(bulk, entry);
while ((entry = stream.poll(definition.getBulkTimeout()
.millis(), MILLISECONDS)) != null) {
lastTimestamp = processBlockingQueue(bulk, data);
lastTimestamp = processBlockingQueue(bulk, entry);
if (bulk.numberOfActions() >= definition.getBulkSize()) {
break;
}
Expand Down Expand Up @@ -486,22 +490,21 @@ public void run() {

@SuppressWarnings({ "unchecked" })
private BSONTimestamp processBlockingQueue(
final BulkRequestBuilder bulk, Map<String, Object> data) {
if (data.get(MONGODB_ID_FIELD) == null
&& !data.get(OPLOG_OPERATION).equals(
final BulkRequestBuilder bulk, QueueEntry entry) {
if (entry.getData().get(MONGODB_ID_FIELD) == null
&& !entry.getOplogOperation().equals(
OPLOG_COMMAND_OPERATION)) {
logger.warn(
"Cannot get object id. Skip the current item: [{}]",
data);
entry.getData());
return null;
}

BSONTimestamp lastTimestamp = (BSONTimestamp) data
.get(OPLOG_TIMESTAMP);
String operation = data.get(OPLOG_OPERATION).toString();
BSONTimestamp lastTimestamp = entry.getOplogTimestamp();
String operation = entry.getOplogOperation();
if (OPLOG_COMMAND_OPERATION.equals(operation)) {
try {
updateBulkRequest(bulk, data, null, operation,
updateBulkRequest(bulk, entry.getData(), null, operation,
definition.getIndexName(),
definition.getTypeName(), null, null);
} catch (IOException ioEx) {
Expand All @@ -512,16 +515,15 @@ private BSONTimestamp processBlockingQueue(

if (scriptExecutable != null
&& definition.isAdvancedTransformation()) {
return applyAdvancedTransformation(bulk, data);
return applyAdvancedTransformation(bulk, entry);
}

// String objectId = data.get(MONGODB_ID_FIELD).toString();
String objectId = "";
if (data.get(MONGODB_ID_FIELD) != null) {
objectId = data.get(MONGODB_ID_FIELD).toString();
if (entry.getData().get(MONGODB_ID_FIELD) != null) {
objectId = entry.getData().get(MONGODB_ID_FIELD).toString();
}
data.remove(OPLOG_TIMESTAMP);
data.remove(OPLOG_OPERATION);

if (logger.isDebugEnabled()) {
logger.debug("updateBulkRequest for id: [{}], operation: [{}]",
objectId, operation);
Expand All @@ -532,7 +534,7 @@ private BSONTimestamp processBlockingQueue(
"About to include collection. set attribute {} / {} ",
definition.getIncludeCollection(),
definition.getMongoCollection());
data.put(definition.getIncludeCollection(),
entry.getData().put(definition.getIncludeCollection(),
definition.getMongoCollection());
}

Expand All @@ -543,9 +545,10 @@ private BSONTimestamp processBlockingQueue(
} catch (IOException e) {
logger.warn("failed to parse {}", e);
}
Map<String, Object> data = entry.getData();
if (scriptExecutable != null) {
if (ctx != null) {
ctx.put("document", data);
ctx.put("document", entry.getData());
ctx.put("operation", operation);
if (!objectId.isEmpty()) {
ctx.put("id", objectId);
Expand Down Expand Up @@ -694,7 +697,7 @@ private BSONTimestamp processBlockingQueue(
// }
// }
} catch (IOException e) {
logger.warn("failed to parse {}", e, data);
logger.warn("failed to parse {}", e, entry.getData());
}
return lastTimestamp;
}
Expand Down Expand Up @@ -831,17 +834,14 @@ private void deleteBulkRequest(BulkRequestBuilder bulk,

@SuppressWarnings("unchecked")
private BSONTimestamp applyAdvancedTransformation(
final BulkRequestBuilder bulk, Map<String, Object> data) {
final BulkRequestBuilder bulk, QueueEntry entry) {

BSONTimestamp lastTimestamp = (BSONTimestamp) data
.get(OPLOG_TIMESTAMP);
String operation = data.get(OPLOG_OPERATION).toString();
BSONTimestamp lastTimestamp = entry.getOplogTimestamp();
String operation = entry.getOplogOperation();
String objectId = "";
if (data.get(MONGODB_ID_FIELD) != null) {
objectId = data.get(MONGODB_ID_FIELD).toString();
if (entry.getData().get(MONGODB_ID_FIELD) != null) {
objectId = entry.getData().get(MONGODB_ID_FIELD).toString();
}
data.remove(OPLOG_TIMESTAMP);
data.remove(OPLOG_OPERATION);
if (logger.isDebugEnabled()) {
logger.debug(
"advancedUpdateBulkRequest for id: [{}], operation: [{}]",
Expand All @@ -853,7 +853,7 @@ private BSONTimestamp applyAdvancedTransformation(
"About to include collection. set attribute {} / {} ",
definition.getIncludeCollection(),
definition.getMongoCollection());
data.put(definition.getIncludeCollection(),
entry.getData().put(definition.getIncludeCollection(),
definition.getMongoCollection());
}
Map<String, Object> ctx = null;
Expand All @@ -869,7 +869,7 @@ private BSONTimestamp applyAdvancedTransformation(
if (scriptExecutable != null) {
if (ctx != null && documents != null) {

document.put("data", data);
document.put("data", entry.getData());
if (!objectId.isEmpty()) {
document.put("id", objectId);
}
Expand Down Expand Up @@ -1045,10 +1045,58 @@ private class Slurper implements Runnable {
private DBCollection slurpedCollection;
private DB oplogDb;
private DBCollection oplogCollection;
private final List<ServerAddress> mongoServers;

public Slurper(List<ServerAddress> mongoServers) {
this.mongoServers = mongoServers;
this.mongo = new MongoClient(mongoServers,
definition.getMongoClientOptions());
}

@Override
public void run() {
while (active) {
try {
if (!assignCollections()) {
break; // failed to assign oplogCollection or
// slurpedCollection
}

DBCursor oplogCursor = null;
try {
oplogCursor = oplogCursor(null);
if (oplogCursor == null) {
oplogCursor = processFullCollection();
}

while (oplogCursor.hasNext()) {
DBObject item = oplogCursor.next();
processOplogEntry(item);
}
Thread.sleep(500);
} finally {
if (oplogCursor != null) {
logger.trace("Closing oplogCursor cursor");
oplogCursor.close();
}
}
} catch (MongoInterruptedException mIEx) {
logger.warn("Mongo driver has been interrupted");
if (mongo != null) {
mongo.close();
mongo = null;
}
break;
} catch (MongoException mEx) {
logger.error("Mongo gave an exception", mEx);
} catch (NoSuchElementException nEx) {
logger.warn("A mongoDB cursor bug ?", nEx);
} catch (InterruptedException e) {
if (logger.isDebugEnabled()) {
logger.debug("river-mongodb slurper interrupted");
}
Thread.currentThread().interrupt();
break;
}
}
}

private boolean assignCollections() {
Expand Down Expand Up @@ -1121,79 +1169,21 @@ private boolean assignCollections() {

return true;
}

@Override
public void run() {
mongo = new MongoClient(mongoServers,
definition.getMongoClientOptions());

while (active) {
try {
if (!assignCollections()) {
break; // failed to assign oplogCollection or
// slurpedCollection
}

DBCursor oplogCursor = null;
try {
oplogCursor = oplogCursor(null);
if (oplogCursor == null) {
oplogCursor = processFullCollection();
}

while (oplogCursor.hasNext()) {
DBObject item = oplogCursor.next();
processOplogEntry(item);
}
Thread.sleep(500);
} finally {
if (oplogCursor != null) {
logger.trace("Closing oplogCursor cursor");
oplogCursor.close();
}
}
} catch (MongoInterruptedException mIEx) {
logger.warn("Mongo driver has been interrupted");
if (mongo != null) {
mongo.close();
mongo = null;
}
break;
} catch (MongoException mEx) {
logger.error("Mongo gave an exception", mEx);
} catch (NoSuchElementException nEx) {
logger.warn("A mongoDB cursor bug ?", nEx);
} catch (InterruptedException e) {
if (logger.isDebugEnabled()) {
logger.debug("river-mongodb slurper interrupted");
}
Thread.currentThread().interrupt();
break;
}
}
}


/*
* Remove fscynlock and unlock -
* https://github.com/richardwilly98/elasticsearch
* -river-mongodb/issues/17
*/
private DBCursor processFullCollection() throws InterruptedException {
// CommandResult lockResult = mongo.fsyncAndLock();
// if (lockResult.ok()) {
try {
BSONTimestamp currentTimestamp = (BSONTimestamp) oplogCollection
.find().sort(new BasicDBObject(OPLOG_TIMESTAMP, -1))
.limit(1).next().get(OPLOG_TIMESTAMP);
addQueryToStream(OPLOG_INSERT_OPERATION, currentTimestamp, null);
return oplogCursor(currentTimestamp);
} finally {
// mongo.unlock();
}
// } else {
// throw new MongoException(
// "Could not lock the database for FullCollection sync");
// }
BSONTimestamp currentTimestamp = (BSONTimestamp) oplogCollection
.find()
.sort(new BasicDBObject(OPLOG_TIMESTAMP, -1))
.limit(1)
.next()
.get(OPLOG_TIMESTAMP);
addQueryToStream(OPLOG_INSERT_OPERATION, currentTimestamp, null);
return oplogCursor(currentTimestamp);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -1268,8 +1258,8 @@ private void processOplogEntry(final DBObject entry)
logger.debug("Updated item: {}", update);
addQueryToStream(operation, oplogTimestamp, update);
} else {
object = applyFieldFilter(object);
addToStream(operation, oplogTimestamp, object.toMap());
Map<String, Object> map = applyFieldFilter(object).toMap();
addToStream(operation, oplogTimestamp, map);
}
}
}
Expand Down Expand Up @@ -1436,16 +1426,8 @@ private void addToStream(final String operation,
"addToStream - operation [{}], currentTimestamp [{}], data [{}]",
operation, currentTimestamp, data);
}
data.put(OPLOG_TIMESTAMP, currentTimestamp);
data.put(OPLOG_OPERATION, operation);

// stream.add(data);
stream.put(data);
// try {
// stream.put(data);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }

stream.put(new QueueEntry(currentTimestamp, operation, data));
}

}
Expand Down Expand Up @@ -1554,4 +1536,33 @@ private void updateLastTimestamp(final String namespace,
}
}

protected static class QueueEntry {

private BSONTimestamp oplogTimestamp;
private String oplogOperation;
private Map<String, Object> data;

public QueueEntry(
BSONTimestamp oplogTimestamp,
String oplogOperation,
Map<String, Object> data) {
this.oplogTimestamp = oplogTimestamp;
this.oplogOperation = oplogOperation;
this.data = data;
}

public BSONTimestamp getOplogTimestamp() {
return oplogTimestamp;
}

public String getOplogOperation() {
return oplogOperation;
}

public Map<String, Object> getData() {
return data;
}

}

}

0 comments on commit 2e4ac12

Please sign in to comment.