From 21e422bbe578b5834df377928b185c0e13a0ffdc Mon Sep 17 00:00:00 2001 From: Ngoc Dao Date: Thu, 2 Apr 2015 16:36:37 +0900 Subject: [PATCH] Fix #12 Exit if binlog filename/position is too old --- CHANGELOG.md | 5 ++ build.sbt | 2 +- src/main/scala/mydit/MySQLExtractor.scala | 80 ++++++++++++++++++++++- src/main/scala/mydit/Rep.scala | 8 +-- 4 files changed, 85 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65a0a82..541af65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +1.5: + +* [#12](https://github.com/ngocdaothanh/mydit/issues/12) + Exit if binlog filename/position is too old + 1.4: * [#9](https://github.com/ngocdaothanh/mydit/issues/9) diff --git a/build.sbt b/build.sbt index 80a1ef6..2784e85 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ organization := "tv.cntt" name := "mydit" -version := "1.4-SNAPSHOT" +version := "1.5-SNAPSHOT" scalaVersion := "2.11.6" diff --git a/src/main/scala/mydit/MySQLExtractor.scala b/src/main/scala/mydit/MySQLExtractor.scala index 16e9609..e8493fd 100644 --- a/src/main/scala/mydit/MySQLExtractor.scala +++ b/src/main/scala/mydit/MySQLExtractor.scala @@ -1,6 +1,10 @@ package mydit +import scala.util.control.NonFatal + import com.github.shyiko.mysql.binlog.BinaryLogClient +import com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener +import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData import com.github.shyiko.mysql.binlog.event.Event import com.github.shyiko.mysql.binlog.event.EventData @@ -13,11 +17,17 @@ import com.github.shyiko.mysql.binlog.event.WriteRowsEventData // https://github.com/shyiko/rook/blob/master/rook-source-mysql/src/main/java/com/github/shyiko/rook/source/mysql/MySQLReplicationStream.java +object MySQLExtractor { + private final val RECONNECT_DELAY_SECS = 5 +} + /** @param only empty means all DBs should be replicated */ class MySQLExtractor( host: String, port: Int, username: String, password: String, only: Seq[String], binlogFilename_Position: Option[(String, Long)] ) { + import MySQLExtractor._ + private val client = new BinaryLogClient(host, port, username, password) binlogFilename_Position.foreach { case (f, p) => @@ -25,7 +35,7 @@ class MySQLExtractor( client.setBinlogPosition(p) } - client.registerEventListener(new BinaryLogClient.EventListener { + client.registerEventListener(new EventListener { override def onEvent(event: Event) { // client will automatically catch exception (if any) and log it out @@ -66,14 +76,78 @@ class MySQLExtractor( } } - def connect() { - client.connect() + def connectKeepAlive() { + // https://github.com/shyiko/mysql-binlog-connector-java/issues/37 + + val lifecycleListener = new LifecycleListener() { + private var shouldReconnect = true + + override def onCommunicationFailure(client: BinaryLogClient, e: Exception) { + if (e.getMessage == "1236 - Could not find first log file name in binary log index file") { + Log.error( + "Binlog {}/{} is no longer available on the master; need to rebootstrap", + client.getBinlogFilename, client.getBinlogPosition + ) + shouldReconnect = false + disconnectAndExit() + } else { + Log.warn("Communication failure", e) + } + } + + override def onConnect(client: BinaryLogClient) { + // BinaryLogClient already logs like this: + // [INFO] Connected to localhost:3306 at mysql-bin.000003/92866 + } + + override def onEventDeserializationFailure(client: BinaryLogClient, e: Exception) { + Log.warn("Event deserialization failure", e) + } + + override def onDisconnect(client: BinaryLogClient) { + if (shouldReconnect) { + Log.warn("Disconnected; reconnect in {} seconds", RECONNECT_DELAY_SECS) + connectInNewThread(RECONNECT_DELAY_SECS) + } else { + Log.warn("Disconnected; won't reconnect") + } + } + } + + client.registerLifecycleListener(lifecycleListener) + connectInNewThread(0) } def disconnect() { client.disconnect() } + def disconnectAndExit() { + try { + // Need to disconnect before exiting, otherwise we can't exit because + // there are still running threads + client.disconnect() + } catch { + case NonFatal(e) => Log.warn("Could not disconnect", e) + } finally { + Log.info("Program should now exit") + System.exit(-1) + } + } + + private def connectInNewThread(delaySecs: Long) { + new Thread { + override def run() { + try { + if (delaySecs > 0) Thread.sleep(delaySecs * 1000) + client.connect() + } catch { + case NonFatal(e) => Log.error("Error", e) + } + } + }.start() + } + //-------------------------------------------------------------------------- private def onFormatDescription(data: FormatDescriptionEventData) { diff --git a/src/main/scala/mydit/Rep.scala b/src/main/scala/mydit/Rep.scala index 508b865..3cc63c4 100644 --- a/src/main/scala/mydit/Rep.scala +++ b/src/main/scala/mydit/Rep.scala @@ -20,7 +20,7 @@ class Rep(config: Config) extends RepEvent.Listener { mo.binlogGetPosition ) my.addListener(this) - my.connect() + my.connectKeepAlive() //-------------------------------------------------------------------------- @@ -59,11 +59,7 @@ class Rep(config: Config) extends RepEvent.Listener { "Replicator program now exits because the failed replication event queue size exceeds {} (see config/application.conf)", config.maxFailedEventQueueSize ) - - // Need to disconnect before exiting, otherwise we can't exit because - // there are still running threads - my.disconnect() - System.exit(-1) + my.disconnectAndExit() } } else if (lastQSize > 0) { // newQSize is now 0, congratulations!