Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix: mongodb fields "ts" and "op" in user collection not picked up by river #136

Merged
merged 1 commit into from
Sep 25, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 125 additions & 114 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
protected volatile boolean active = false;
protected volatile boolean startInvoked = false;

private final BlockingQueue<Map<String, Object>> stream;
private final BlockingQueue<QueueEntry> stream;

private Mongo mongo;
private DB adminDb;
Expand Down Expand Up @@ -177,9 +177,9 @@ public MongoDBRiver(final RiverName riverName,
+ definition.getMongoCollection();

if (definition.getThrottleSize() == -1) {
stream = new LinkedTransferQueue<Map<String, Object>>();
stream = new LinkedTransferQueue<QueueEntry>();
} else {
stream = new ArrayBlockingQueue<Map<String, Object>>(
stream = new ArrayBlockingQueue<QueueEntry>(
definition.getThrottleSize());
}

Expand Down Expand Up @@ -213,6 +213,8 @@ public void start() {
definition.getMongoFilter(), definition.getMongoDb(),
definition.getMongoCollection(), definition.getScript(),
definition.getIndexName(), definition.getTypeName());

// Create the index if it does not exist
try {
client.admin().indices().prepareCreate(definition.getIndexName())
.execute().actionGet();
Expand All @@ -232,6 +234,7 @@ public void start() {
}
}

// GridFS
if (definition.isMongoGridFS()) {
try {
if (logger.isDebugEnabled()) {
Expand All @@ -247,6 +250,7 @@ public void start() {
}
}

// Tail the oplog
if (isMongos()) {
DBCursor cursor = getConfigDb().getCollection("shards").find();
try {
Expand Down Expand Up @@ -440,11 +444,11 @@ public void run() {

// 1. Attempt to fill as much of the bulk request as
// possible
Map<String, Object> data = stream.take();
lastTimestamp = processBlockingQueue(bulk, data);
while ((data = stream.poll(definition.getBulkTimeout()
QueueEntry entry = stream.take();
lastTimestamp = processBlockingQueue(bulk, entry);
while ((entry = stream.poll(definition.getBulkTimeout()
.millis(), MILLISECONDS)) != null) {
lastTimestamp = processBlockingQueue(bulk, data);
lastTimestamp = processBlockingQueue(bulk, entry);
if (bulk.numberOfActions() >= definition.getBulkSize()) {
break;
}
Expand Down Expand Up @@ -486,22 +490,21 @@ public void run() {

@SuppressWarnings({ "unchecked" })
private BSONTimestamp processBlockingQueue(
final BulkRequestBuilder bulk, Map<String, Object> data) {
if (data.get(MONGODB_ID_FIELD) == null
&& !data.get(OPLOG_OPERATION).equals(
final BulkRequestBuilder bulk, QueueEntry entry) {
if (entry.getData().get(MONGODB_ID_FIELD) == null
&& !entry.getOplogOperation().equals(
OPLOG_COMMAND_OPERATION)) {
logger.warn(
"Cannot get object id. Skip the current item: [{}]",
data);
entry.getData());
return null;
}

BSONTimestamp lastTimestamp = (BSONTimestamp) data
.get(OPLOG_TIMESTAMP);
String operation = data.get(OPLOG_OPERATION).toString();
BSONTimestamp lastTimestamp = entry.getOplogTimestamp();
String operation = entry.getOplogOperation();
if (OPLOG_COMMAND_OPERATION.equals(operation)) {
try {
updateBulkRequest(bulk, data, null, operation,
updateBulkRequest(bulk, entry.getData(), null, operation,
definition.getIndexName(),
definition.getTypeName(), null, null);
} catch (IOException ioEx) {
Expand All @@ -512,16 +515,15 @@ private BSONTimestamp processBlockingQueue(

if (scriptExecutable != null
&& definition.isAdvancedTransformation()) {
return applyAdvancedTransformation(bulk, data);
return applyAdvancedTransformation(bulk, entry);
}

// String objectId = data.get(MONGODB_ID_FIELD).toString();
String objectId = "";
if (data.get(MONGODB_ID_FIELD) != null) {
objectId = data.get(MONGODB_ID_FIELD).toString();
if (entry.getData().get(MONGODB_ID_FIELD) != null) {
objectId = entry.getData().get(MONGODB_ID_FIELD).toString();
}
data.remove(OPLOG_TIMESTAMP);
data.remove(OPLOG_OPERATION);

if (logger.isDebugEnabled()) {
logger.debug("updateBulkRequest for id: [{}], operation: [{}]",
objectId, operation);
Expand All @@ -532,7 +534,7 @@ private BSONTimestamp processBlockingQueue(
"About to include collection. set attribute {} / {} ",
definition.getIncludeCollection(),
definition.getMongoCollection());
data.put(definition.getIncludeCollection(),
entry.getData().put(definition.getIncludeCollection(),
definition.getMongoCollection());
}

Expand All @@ -543,9 +545,10 @@ private BSONTimestamp processBlockingQueue(
} catch (IOException e) {
logger.warn("failed to parse {}", e);
}
Map<String, Object> data = entry.getData();
if (scriptExecutable != null) {
if (ctx != null) {
ctx.put("document", data);
ctx.put("document", entry.getData());
ctx.put("operation", operation);
if (!objectId.isEmpty()) {
ctx.put("id", objectId);
Expand Down Expand Up @@ -694,7 +697,7 @@ private BSONTimestamp processBlockingQueue(
// }
// }
} catch (IOException e) {
logger.warn("failed to parse {}", e, data);
logger.warn("failed to parse {}", e, entry.getData());
}
return lastTimestamp;
}
Expand Down Expand Up @@ -831,17 +834,14 @@ private void deleteBulkRequest(BulkRequestBuilder bulk,

@SuppressWarnings("unchecked")
private BSONTimestamp applyAdvancedTransformation(
final BulkRequestBuilder bulk, Map<String, Object> data) {
final BulkRequestBuilder bulk, QueueEntry entry) {

BSONTimestamp lastTimestamp = (BSONTimestamp) data
.get(OPLOG_TIMESTAMP);
String operation = data.get(OPLOG_OPERATION).toString();
BSONTimestamp lastTimestamp = entry.getOplogTimestamp();
String operation = entry.getOplogOperation();
String objectId = "";
if (data.get(MONGODB_ID_FIELD) != null) {
objectId = data.get(MONGODB_ID_FIELD).toString();
if (entry.getData().get(MONGODB_ID_FIELD) != null) {
objectId = entry.getData().get(MONGODB_ID_FIELD).toString();
}
data.remove(OPLOG_TIMESTAMP);
data.remove(OPLOG_OPERATION);
if (logger.isDebugEnabled()) {
logger.debug(
"advancedUpdateBulkRequest for id: [{}], operation: [{}]",
Expand All @@ -853,7 +853,7 @@ private BSONTimestamp applyAdvancedTransformation(
"About to include collection. set attribute {} / {} ",
definition.getIncludeCollection(),
definition.getMongoCollection());
data.put(definition.getIncludeCollection(),
entry.getData().put(definition.getIncludeCollection(),
definition.getMongoCollection());
}
Map<String, Object> ctx = null;
Expand All @@ -869,7 +869,7 @@ private BSONTimestamp applyAdvancedTransformation(
if (scriptExecutable != null) {
if (ctx != null && documents != null) {

document.put("data", data);
document.put("data", entry.getData());
if (!objectId.isEmpty()) {
document.put("id", objectId);
}
Expand Down Expand Up @@ -1045,10 +1045,58 @@ private class Slurper implements Runnable {
private DBCollection slurpedCollection;
private DB oplogDb;
private DBCollection oplogCollection;
private final List<ServerAddress> mongoServers;

public Slurper(List<ServerAddress> mongoServers) {
this.mongoServers = mongoServers;
this.mongo = new MongoClient(mongoServers,
definition.getMongoClientOptions());
}

@Override
public void run() {
while (active) {
try {
if (!assignCollections()) {
break; // failed to assign oplogCollection or
// slurpedCollection
}

DBCursor oplogCursor = null;
try {
oplogCursor = oplogCursor(null);
if (oplogCursor == null) {
oplogCursor = processFullCollection();
}

while (oplogCursor.hasNext()) {
DBObject item = oplogCursor.next();
processOplogEntry(item);
}
Thread.sleep(500);
} finally {
if (oplogCursor != null) {
logger.trace("Closing oplogCursor cursor");
oplogCursor.close();
}
}
} catch (MongoInterruptedException mIEx) {
logger.warn("Mongo driver has been interrupted");
if (mongo != null) {
mongo.close();
mongo = null;
}
break;
} catch (MongoException mEx) {
logger.error("Mongo gave an exception", mEx);
} catch (NoSuchElementException nEx) {
logger.warn("A mongoDB cursor bug ?", nEx);
} catch (InterruptedException e) {
if (logger.isDebugEnabled()) {
logger.debug("river-mongodb slurper interrupted");
}
Thread.currentThread().interrupt();
break;
}
}
}

private boolean assignCollections() {
Expand Down Expand Up @@ -1121,79 +1169,21 @@ private boolean assignCollections() {

return true;
}

@Override
public void run() {
mongo = new MongoClient(mongoServers,
definition.getMongoClientOptions());

while (active) {
try {
if (!assignCollections()) {
break; // failed to assign oplogCollection or
// slurpedCollection
}

DBCursor oplogCursor = null;
try {
oplogCursor = oplogCursor(null);
if (oplogCursor == null) {
oplogCursor = processFullCollection();
}

while (oplogCursor.hasNext()) {
DBObject item = oplogCursor.next();
processOplogEntry(item);
}
Thread.sleep(500);
} finally {
if (oplogCursor != null) {
logger.trace("Closing oplogCursor cursor");
oplogCursor.close();
}
}
} catch (MongoInterruptedException mIEx) {
logger.warn("Mongo driver has been interrupted");
if (mongo != null) {
mongo.close();
mongo = null;
}
break;
} catch (MongoException mEx) {
logger.error("Mongo gave an exception", mEx);
} catch (NoSuchElementException nEx) {
logger.warn("A mongoDB cursor bug ?", nEx);
} catch (InterruptedException e) {
if (logger.isDebugEnabled()) {
logger.debug("river-mongodb slurper interrupted");
}
Thread.currentThread().interrupt();
break;
}
}
}


/*
* Remove fscynlock and unlock -
* https://github.com/richardwilly98/elasticsearch
* -river-mongodb/issues/17
*/
private DBCursor processFullCollection() throws InterruptedException {
// CommandResult lockResult = mongo.fsyncAndLock();
// if (lockResult.ok()) {
try {
BSONTimestamp currentTimestamp = (BSONTimestamp) oplogCollection
.find().sort(new BasicDBObject(OPLOG_TIMESTAMP, -1))
.limit(1).next().get(OPLOG_TIMESTAMP);
addQueryToStream(OPLOG_INSERT_OPERATION, currentTimestamp, null);
return oplogCursor(currentTimestamp);
} finally {
// mongo.unlock();
}
// } else {
// throw new MongoException(
// "Could not lock the database for FullCollection sync");
// }
BSONTimestamp currentTimestamp = (BSONTimestamp) oplogCollection
.find()
.sort(new BasicDBObject(OPLOG_TIMESTAMP, -1))
.limit(1)
.next()
.get(OPLOG_TIMESTAMP);
addQueryToStream(OPLOG_INSERT_OPERATION, currentTimestamp, null);
return oplogCursor(currentTimestamp);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -1268,8 +1258,8 @@ private void processOplogEntry(final DBObject entry)
logger.debug("Updated item: {}", update);
addQueryToStream(operation, oplogTimestamp, update);
} else {
object = applyFieldFilter(object);
addToStream(operation, oplogTimestamp, object.toMap());
Map<String, Object> map = applyFieldFilter(object).toMap();
addToStream(operation, oplogTimestamp, map);
}
}
}
Expand Down Expand Up @@ -1436,16 +1426,8 @@ private void addToStream(final String operation,
"addToStream - operation [{}], currentTimestamp [{}], data [{}]",
operation, currentTimestamp, data);
}
data.put(OPLOG_TIMESTAMP, currentTimestamp);
data.put(OPLOG_OPERATION, operation);

// stream.add(data);
stream.put(data);
// try {
// stream.put(data);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }

stream.put(new QueueEntry(currentTimestamp, operation, data));
}

}
Expand Down Expand Up @@ -1554,4 +1536,33 @@ private void updateLastTimestamp(final String namespace,
}
}

protected static class QueueEntry {

private BSONTimestamp oplogTimestamp;
private String oplogOperation;
private Map<String, Object> data;

public QueueEntry(
BSONTimestamp oplogTimestamp,
String oplogOperation,
Map<String, Object> data) {
this.oplogTimestamp = oplogTimestamp;
this.oplogOperation = oplogOperation;
this.data = data;
}

public BSONTimestamp getOplogTimestamp() {
return oplogTimestamp;
}

public String getOplogOperation() {
return oplogOperation;
}

public Map<String, Object> getData() {
return data;
}

}

}