Skip to content

Commit

Permalink
Fix #12 Exit if binlog filename/position is too old
Browse files Browse the repository at this point in the history
  • Loading branch information
ngocdaothanh committed Apr 2, 2015
1 parent 9515dd6 commit 21e422b
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 10 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
1.5:

* [#12](https://github.com/ngocdaothanh/mydit/issues/12)
Exit if binlog filename/position is too old

1.4:

* [#9](https://github.com/ngocdaothanh/mydit/issues/9)
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
organization := "tv.cntt"
name := "mydit"
version := "1.4-SNAPSHOT"
version := "1.5-SNAPSHOT"

scalaVersion := "2.11.6"

Expand Down
80 changes: 77 additions & 3 deletions src/main/scala/mydit/MySQLExtractor.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package mydit

import scala.util.control.NonFatal

import com.github.shyiko.mysql.binlog.BinaryLogClient
import com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData
import com.github.shyiko.mysql.binlog.event.Event
import com.github.shyiko.mysql.binlog.event.EventData
Expand All @@ -13,19 +17,25 @@ import com.github.shyiko.mysql.binlog.event.WriteRowsEventData

// https://github.com/shyiko/rook/blob/master/rook-source-mysql/src/main/java/com/github/shyiko/rook/source/mysql/MySQLReplicationStream.java

object MySQLExtractor {
private final val RECONNECT_DELAY_SECS = 5
}

/** @param only empty means all DBs should be replicated */
class MySQLExtractor(
host: String, port: Int, username: String, password: String, only: Seq[String],
binlogFilename_Position: Option[(String, Long)]
) {
import MySQLExtractor._

private val client = new BinaryLogClient(host, port, username, password)

binlogFilename_Position.foreach { case (f, p) =>
client.setBinlogFilename(f)
client.setBinlogPosition(p)
}

client.registerEventListener(new BinaryLogClient.EventListener {
client.registerEventListener(new EventListener {
override def onEvent(event: Event) {
// client will automatically catch exception (if any) and log it out

Expand Down Expand Up @@ -66,14 +76,78 @@ class MySQLExtractor(
}
}

def connect() {
client.connect()
def connectKeepAlive() {
// https://github.com/shyiko/mysql-binlog-connector-java/issues/37

val lifecycleListener = new LifecycleListener() {
private var shouldReconnect = true

override def onCommunicationFailure(client: BinaryLogClient, e: Exception) {
if (e.getMessage == "1236 - Could not find first log file name in binary log index file") {
Log.error(
"Binlog {}/{} is no longer available on the master; need to rebootstrap",
client.getBinlogFilename, client.getBinlogPosition
)
shouldReconnect = false
disconnectAndExit()
} else {
Log.warn("Communication failure", e)
}
}

override def onConnect(client: BinaryLogClient) {
// BinaryLogClient already logs like this:
// [INFO] Connected to localhost:3306 at mysql-bin.000003/92866
}

override def onEventDeserializationFailure(client: BinaryLogClient, e: Exception) {
Log.warn("Event deserialization failure", e)
}

override def onDisconnect(client: BinaryLogClient) {
if (shouldReconnect) {
Log.warn("Disconnected; reconnect in {} seconds", RECONNECT_DELAY_SECS)
connectInNewThread(RECONNECT_DELAY_SECS)
} else {
Log.warn("Disconnected; won't reconnect")
}
}
}

client.registerLifecycleListener(lifecycleListener)
connectInNewThread(0)
}

def disconnect() {
client.disconnect()
}

def disconnectAndExit() {
try {
// Need to disconnect before exiting, otherwise we can't exit because
// there are still running threads
client.disconnect()
} catch {
case NonFatal(e) => Log.warn("Could not disconnect", e)
} finally {
Log.info("Program should now exit")
System.exit(-1)
}
}

private def connectInNewThread(delaySecs: Long) {
new Thread {
override def run() {
try {
if (delaySecs > 0) Thread.sleep(delaySecs * 1000)
client.connect()
} catch {
case NonFatal(e) => Log.error("Error", e)
}
}
}.start()
}

//--------------------------------------------------------------------------

private def onFormatDescription(data: FormatDescriptionEventData) {
Expand Down
8 changes: 2 additions & 6 deletions src/main/scala/mydit/Rep.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Rep(config: Config) extends RepEvent.Listener {
mo.binlogGetPosition
)
my.addListener(this)
my.connect()
my.connectKeepAlive()

//--------------------------------------------------------------------------

Expand Down Expand Up @@ -59,11 +59,7 @@ class Rep(config: Config) extends RepEvent.Listener {
"Replicator program now exits because the failed replication event queue size exceeds {} (see config/application.conf)",
config.maxFailedEventQueueSize
)

// Need to disconnect before exiting, otherwise we can't exit because
// there are still running threads
my.disconnect()
System.exit(-1)
my.disconnectAndExit()
}
} else if (lastQSize > 0) {
// newQSize is now 0, congratulations!
Expand Down

0 comments on commit 21e422b

Please sign in to comment.