From a041c242d3535f666937d52bebb6f515ed3872cc Mon Sep 17 00:00:00 2001 From: Ngoc Dao Date: Sat, 21 Mar 2015 07:20:21 +0900 Subject: [PATCH] Fix #2 Insert: Stop replication on DuplicateKeyException only when the doc to be inserted is different from the doc in DB --- CHANGELOG.md | 3 +++ src/main/scala/mydit/MongoDBApplier.scala | 20 +++++++++++++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f3fc19d..9a4d9bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ 1.2: +* [#2](https://github.com/ngocdaothanh/mydit/issues/2) + Insert: Stop replication on DuplicateKeyException only when the doc to be + inserted is different from the doc in DB * [#3](https://github.com/ngocdaothanh/mydit/issues/3) Log INFO when failed replication event queue changes from nonempty to empty diff --git a/src/main/scala/mydit/MongoDBApplier.scala b/src/main/scala/mydit/MongoDBApplier.scala index b43b3f1..4e1aaaf 100644 --- a/src/main/scala/mydit/MongoDBApplier.scala +++ b/src/main/scala/mydit/MongoDBApplier.scala @@ -3,7 +3,9 @@ package mydit import java.io.Serializable import java.math.BigDecimal import java.util.BitSet + import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData @@ -11,8 +13,10 @@ import com.github.shyiko.mysql.binlog.event.WriteRowsEventData import com.mongodb.BasicDBObject import com.mongodb.DBObject +import com.mongodb.DuplicateKeyException import com.mongodb.MongoClient import com.mongodb.MongoClientURI +import com.mongodb.ReadPreference import com.mongodb.WriteConcern /** @@ -50,12 +54,22 @@ class MongoDBApplier(uri: String, binlogDb: String, binlogCollName: String, enum def insert(nextPosition: Long, dbName: String, collName: String, cols: Seq[ColInfo], data: WriteRowsEventData) { val db = client.getDB(dbName) val coll = db.getCollection(collName) - var objs = Seq[DBObject]() for (mySQLValues <- data.getRows.asScala) { val obj = mySQLRowToMongoDBObject(cols, data.getIncludedColumns, mySQLValues) - objs = objs :+ obj + try { + coll.insert(obj, WriteConcern.ACKNOWLEDGED) + } catch { + case dup: DuplicateKeyException => + // Stop replication only when the doc to be inserted is different from + // the doc in DB + // https://github.com/ngocdaothanh/mydit/issues/2 + val fields = new BasicDBObject("_id", 1) + if (coll.findOne(obj, fields, ReadPreference.primary) == null) throw dup + + case NonFatal(e) => + throw e + } } - coll.insert(WriteConcern.ACKNOWLEDGED, objs :_*) binlogNextPosition(nextPosition) }