diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 3c2a25a5..c7e816f9 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -20,11 +20,13 @@ import com.github.shyiko.mysql.binlog.event.EventHeader; import com.github.shyiko.mysql.binlog.event.EventHeaderV4; import com.github.shyiko.mysql.binlog.event.EventType; +import com.github.shyiko.mysql.binlog.event.GtidEventData; import com.github.shyiko.mysql.binlog.event.RotateEventData; import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType; import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException; import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; +import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer; import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer; import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream; import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean; @@ -35,7 +37,9 @@ import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel; import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket; import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.Command; import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand; +import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand; import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand; import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand; @@ -80,6 +84,9 @@ public class BinaryLogClient implements BinaryLogClientMXBean { private volatile String binlogFilename; private volatile long binlogPosition = 4; + private GtidSet gtidSet; + private final Object gtidSetAccessLock = new Object(); + private EventDeserializer eventDeserializer = new EventDeserializer(); private final List eventListeners = new LinkedList(); @@ -200,6 +207,34 @@ public void setBinlogPosition(long binlogPosition) { this.binlogPosition = binlogPosition; } + /** + * @return GTID set. Note that this value changes with each received GTID event (provided client is in GTID mode). + * @see #setGtidSet(String) + */ + public String getGtidSet() { + synchronized (gtidSetAccessLock) { + return gtidSet != null ? gtidSet.toString() : null; + } + } + + /** + * @param gtidSet GTID set (can be an empty string). + *

NOTE #1: Any value but null will switch BinaryLogClient into a GTID mode (in which case GTID set will be + * updated with each incoming GTID event) as well as set binlogFilename to "" (empty string) (meaning + * BinaryLogClient will request events "outside of the set" starting from the oldest known binlog). + *

NOTE #2: {@link #setBinlogFilename(String)} and {@link #setBinlogPosition(long)} can be used to specify the + * exact position from which MySQL server should start streaming events (taking into account GTID set). + * @see #getGtidSet() + */ + public void setGtidSet(String gtidSet) { + if (gtidSet != null && this.binlogFilename == null) { + this.binlogFilename = ""; + } + synchronized (gtidSetAccessLock) { + this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null; + } + } + /** * @return true if "keep alive" thread should be automatically started (default), false otherwise. * @see #setKeepAlive(boolean) @@ -309,7 +344,7 @@ public void connect() throws IOException { if (checksumType != ChecksumType.NONE) { confirmSupportOfChecksum(checksumType); } - channel.write(new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition)); + requestBinaryLogStream(); } catch (IOException e) { if (channel != null && channel.isOpen()) { channel.close(); @@ -328,14 +363,42 @@ public void connect() throws IOException { if (keepAlive && !isKeepAliveThreadRunning()) { spawnKeepAliveThread(); } - EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(EventType.ROTATE); - if (eventDataDeserializer.getClass() != RotateEventDataDeserializer.class && + ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class); + synchronized (gtidSetAccessLock) { + if (gtidSet != null) { + ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class); + } + } + listenForEventPackets(); + } + + private void requestBinaryLogStream() throws IOException { + Command dumpBinaryLogCommand; + synchronized (gtidSetAccessLock) { + if (gtidSet != null) { + dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId, binlogFilename, binlogPosition, gtidSet); + } else { + dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition); + } + } + channel.write(dumpBinaryLogCommand); + } + + private void ensureEventDataDeserializer(EventType eventType, + Class eventDataDeserializerClass) { + EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType); + if (eventDataDeserializer.getClass() != eventDataDeserializerClass && eventDataDeserializer.getClass() != EventDeserializer.EventDataWrapper.Deserializer.class) { - eventDeserializer.setEventDataDeserializer(EventType.ROTATE, - new EventDeserializer.EventDataWrapper.Deserializer(new RotateEventDataDeserializer(), + EventDataDeserializer internalEventDataDeserializer; + try { + internalEventDataDeserializer = eventDataDeserializerClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + eventDeserializer.setEventDataDeserializer(eventType, + new EventDeserializer.EventDataWrapper.Deserializer(internalEventDataDeserializer, eventDataDeserializer)); } - listenForEventPackets(); } private void authenticate(String salt, int collation) throws IOException { @@ -526,6 +589,7 @@ private void listenForEventPackets() throws IOException { if (isConnected()) { notifyEventListeners(event); updateClientBinlogFilenameAndPosition(event); + updateGtidSet(event); } } } catch (Exception e) { @@ -565,6 +629,24 @@ private void updateClientBinlogFilenameAndPosition(Event event) { } } + private void updateGtidSet(Event event) { + EventHeader eventHeader = event.getHeader(); + if (eventHeader.getEventType() == EventType.GTID) { + synchronized (gtidSetAccessLock) { + if (gtidSet != null) { + EventData eventData = event.getData(); + GtidEventData gtidEventData; + if (eventData instanceof EventDeserializer.EventDataWrapper) { + gtidEventData = (GtidEventData) ((EventDeserializer.EventDataWrapper) eventData).getInternal(); + } else { + gtidEventData = (GtidEventData) eventData; + } + gtidSet.add(gtidEventData.getGtid()); + } + } + } + } + private ResultSetRowPacket[] readResultSet() throws IOException { List resultSet = new LinkedList(); while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ } diff --git a/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java b/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java new file mode 100644 index 00000000..96ac1cf6 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java @@ -0,0 +1,215 @@ +/* + * Copyright 2015 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog; + +import java.util.*; + +/** + * GTID set as described in GTID Concepts + * of MySQL 5.6 Reference Manual. + * + *

+ * gtid_set: uuid_set[,uuid_set]...
+ * uuid_set: uuid:interval[:interval]...
+ * uuid: hhhhhhhh-hhhh-hhhh-hhhh-hhhhhhhhhhhh, h: [0-9|A-F]
+ * interval: n[-n], (n >= 1)
+ * 
+ * + * @author Stanley Shyiko + */ +public class GtidSet { + + private final Map map = new LinkedHashMap(); + + public GtidSet(String gtidSet) { + String[] uuidSets = gtidSet.isEmpty() ? new String[0] : gtidSet.split(","); + for (String uuidSet : uuidSets) { + int uuidSeparatorIndex = uuidSet.indexOf(":"); + String sourceId = uuidSet.substring(0, uuidSeparatorIndex); + List intervals = new ArrayList(); + String[] rawIntervals = uuidSet.substring(uuidSeparatorIndex + 1).split(":"); + for (String interval : rawIntervals) { + String[] is = interval.split("-"); + long[] split = new long[is.length]; + for (int i = 0, e = is.length; i < e; i++) { + split[i] = Long.parseLong(is[i]); + } + if (split.length == 1) { + split = new long[] {split[0], split[0] + 1}; + } + intervals.add(new Interval(split[0], split[1])); + } + map.put(sourceId, new UUIDSet(sourceId, intervals)); + } + } + + public Collection getUUIDSets() { + return map.values(); + } + + /** + * @param gtid GTID ("source_id:transaction_id") + * @return whether or not gtid was added to the set (false if it was already there) + */ + public boolean add(String gtid) { + String[] split = gtid.split(":"); + String sourceId = split[0]; + long transactionId = Long.parseLong(split[1]); + UUIDSet uuidSet = map.get(sourceId); + if (uuidSet == null) { + map.put(sourceId, uuidSet = new UUIDSet(sourceId, new ArrayList())); + } + List intervals = (List) uuidSet.intervals; + int index = findInterval(intervals, transactionId); + boolean addedToExisting = false; + if (index < intervals.size()) { + Interval interval = intervals.get(index); + if (interval.getStart() == transactionId + 1) { + interval.start = transactionId; + addedToExisting = true; + } else + if (interval.getEnd() == transactionId) { + interval.end = transactionId + 1; + addedToExisting = true; + } else + if (interval.getStart() <= transactionId && transactionId < interval.getEnd()) { + return false; + } + } + if (!addedToExisting) { + intervals.add(index, new Interval(transactionId, transactionId + 1)); + } + if (intervals.size() > 1) { + joinAdjacentIntervals(intervals, index); + } + return true; + } + + /** + * Collapses intervals like a-b:b-c into a-c (only in index+-1 range). + */ + private void joinAdjacentIntervals(List intervals, int index) { + for (int i = Math.min(index + 1, intervals.size() - 1), e = Math.max(index - 1, 0); i > e; i--) { + Interval a = intervals.get(i - 1), b = intervals.get(i); + if (a.getEnd() == b.getStart()) { + a.end = b.end; + intervals.remove(i); + } + } + } + + @Override + public String toString() { + List gtids = new ArrayList(); + for (UUIDSet uuidSet : map.values()) { + gtids.add(uuidSet.getUUID() + ":" + join(uuidSet.intervals, ":")); + } + return join(gtids, ","); + } + + /** + * @return index which is either a pointer to the interval containing v or a position at which v can be added + */ + private static int findInterval(List ii, long v) { + int l = 0, p = 0, r = ii.size(); + while (l < r) { + p = (l + r) / 2; + Interval i = ii.get(p); + if (i.getEnd() < v) { + l = p + 1; + } else + if (v < i.getStart()) { + r = p; + } else { + return p; + } + } + if (!ii.isEmpty() && ii.get(p).getEnd() < v) { + p++; + } + return p; + } + + private String join(Collection o, String delimiter) { + if (o.isEmpty()) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (Object o1 : o) { + sb.append(o1).append(delimiter); + } + return sb.substring(0, sb.length() - delimiter.length()); + } + + public static class UUIDSet { + + private String uuid; + private Collection intervals; + + public UUIDSet(String uuid, Collection intervals) { + this.uuid = uuid; + this.intervals = intervals; + } + + public String getUUID() { + return uuid; + } + + public Collection getIntervals() { + return intervals; + } + } + + public static class Interval implements Comparable { + + private long start; + private long end; + + public Interval(long start, long end) { + this.start = start; + this.end = end; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + @Override + public String toString() { + return start + "-" + end; + } + + @Override + public int compareTo(Interval o) { + return saturatedCast(this.start - o.start); + } + + private static int saturatedCast(long value) { + if (value > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + if (value < Integer.MIN_VALUE) { + return Integer.MIN_VALUE; + } + return (int) value; + } + } + +} diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogGtidCommand.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogGtidCommand.java new file mode 100644 index 00000000..6af07dd1 --- /dev/null +++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/command/DumpBinaryLogGtidCommand.java @@ -0,0 +1,78 @@ +/* + * Copyright 2015 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog.network.protocol.command; + +import com.github.shyiko.mysql.binlog.GtidSet; +import com.github.shyiko.mysql.binlog.io.ByteArrayOutputStream; + +import java.io.IOException; +import java.util.Collection; + +/** + * @author Stanley Shyiko + */ +public class DumpBinaryLogGtidCommand implements Command { + + private long serverId; + private String binlogFilename; + private long binlogPosition; + private GtidSet gtidSet; + + public DumpBinaryLogGtidCommand(long serverId, String binlogFilename, long binlogPosition, GtidSet gtidSet) { + this.serverId = serverId; + this.binlogFilename = binlogFilename; + this.binlogPosition = binlogPosition; + this.gtidSet = gtidSet; + } + + @Override + public byte[] toByteArray() throws IOException { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + buffer.writeInteger(CommandType.BINLOG_DUMP_GTID.ordinal(), 1); + buffer.writeInteger(0, 2); // flag + buffer.writeLong(this.serverId, 4); + buffer.writeInteger(this.binlogFilename.length(), 4); + buffer.writeString(this.binlogFilename); + buffer.writeLong(this.binlogPosition, 8); + Collection uuidSets = gtidSet.getUUIDSets(); + int dataSize = 8 /* number of uuidSets */; + for (GtidSet.UUIDSet uuidSet : uuidSets) { + dataSize += 16 /* uuid */ + 8 /* number of intervals */ + + uuidSet.getIntervals().size() /* number of intervals */ * 16 /* start-end */; + } + buffer.writeInteger(dataSize, 4); + buffer.writeLong(uuidSets.size(), 8); + for (GtidSet.UUIDSet uuidSet : uuidSets) { + buffer.write(hexToByteArray(uuidSet.getUUID().replace("-", ""))); + Collection intervals = uuidSet.getIntervals(); + buffer.writeLong(intervals.size(), 8); + for (GtidSet.Interval interval : intervals) { + buffer.writeLong(interval.getStart(), 8); + buffer.writeLong(interval.getEnd(), 8); + } + } + return buffer.toByteArray(); + } + + private static byte[] hexToByteArray(String uuid) { + byte[] b = new byte[uuid.length() / 2]; + for (int i = 0, j = 0; j < uuid.length(); j += 2) { + b[i++] = (byte) Integer.parseInt(uuid.charAt(j) + "" + uuid.charAt(j + 1), 16); + } + return b; + } + +} diff --git a/src/test/java/com/github/shyiko/mysql/binlog/GtidSetTest.java b/src/test/java/com/github/shyiko/mysql/binlog/GtidSetTest.java new file mode 100644 index 00000000..dc67eef9 --- /dev/null +++ b/src/test/java/com/github/shyiko/mysql/binlog/GtidSetTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2015 Stanley Shyiko + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.shyiko.mysql.binlog; + +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +/** + * @author Stanley Shyiko + */ +public class GtidSetTest { + + @Test + public void testAdd() throws Exception { + GtidSet gtidSet = new GtidSet("00000000-0000-0000-0000-000000000000:3-5"); + gtidSet.add("00000000-0000-0000-0000-000000000000:2"); + gtidSet.add("00000000-0000-0000-0000-000000000000:4"); + gtidSet.add("00000000-0000-0000-0000-000000000000:5"); + gtidSet.add("00000000-0000-0000-0000-000000000000:7"); + gtidSet.add("00000000-0000-0000-0000-000000000001:9"); + gtidSet.add("00000000-0000-0000-0000-000000000000:0"); + assertEquals(gtidSet.toString(), + "00000000-0000-0000-0000-000000000000:0-1:2-6:7-8,00000000-0000-0000-0000-000000000001:9-10"); + } + + + @Test + public void testJoin() throws Exception { + GtidSet gtidSet = new GtidSet("00000000-0000-0000-0000-000000000000:3-5:6-7"); + gtidSet.add("00000000-0000-0000-0000-000000000000:5"); + assertEquals(gtidSet.toString(), "00000000-0000-0000-0000-000000000000:3-7"); + } + + @Test + public void testEmptySet() throws Exception { + assertEquals(new GtidSet("").toString(), ""); + } + +} \ No newline at end of file