Skip to content

Commit

Permalink
inner-836:big packet support
Browse files Browse the repository at this point in the history
Signed-off-by: dcy <dcy10000@gmail.com>
  • Loading branch information
dcy10000 committed Feb 25, 2021
1 parent dfc1ac2 commit 2bd011b
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.nio.ByteBuffer;

import static com.actiontech.dble.backend.mysql.proto.handler.ProtoHandlerResultCode.*;
Expand All @@ -23,17 +24,19 @@ public MySQLProtoHandlerImpl() {
}

@Override
@Nonnull
public ProtoHandlerResult handle(ByteBuffer dataBuffer, int offset, boolean isSupportCompress) {
int position = dataBuffer.position();
int length = getPacketLength(dataBuffer, offset, isSupportCompress);
final ProtoHandlerResult.ProtoHandlerResultBuilder builder = ProtoHandlerResult.builder();
if (length == -1) {
if (offset != 0) {
return new ProtoHandlerResult(BUFFER_PACKET_UNCOMPLETE, offset);
return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).build();
} else if (!dataBuffer.hasRemaining()) {
throw new RuntimeException("invalid dataBuffer capacity ,too little buffer size " +
dataBuffer.capacity());
}
return new ProtoHandlerResult(REACH_END_BUFFER, offset);
return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).build();
}
if (position >= offset + length) {
// handle this package
Expand All @@ -42,7 +45,9 @@ public ProtoHandlerResult handle(ByteBuffer dataBuffer, int offset, boolean isSu
dataBuffer.get(data, 0, length);
data = checkData(data, length);
if (data == null) {
return new ProtoHandlerResult(REACH_END_BUFFER, offset);
builder.setCode(PART_OF_BIG_PACKET);
} else {
builder.setCode(COMPLETE_PACKET);
}

// offset to next position
Expand All @@ -51,17 +56,21 @@ public ProtoHandlerResult handle(ByteBuffer dataBuffer, int offset, boolean isSu
if (position != offset) {
// try next package parse
//dataBufferOffset = offset;
//should reset position after read.
dataBuffer.position(position);
return new ProtoHandlerResult(STLL_DATA_REMING, offset, data);
builder.setHasMorePacket(true);
} else {
builder.setHasMorePacket(false);
}
return new ProtoHandlerResult(REACH_END_BUFFER, offset, data);
builder.setOffset(offset).setPacketData(data);
return builder.build();
} else {
// not read whole message package ,so check if buffer enough and
// compact dataBuffer
if (!dataBuffer.hasRemaining()) {
return new ProtoHandlerResult(BUFFER_NOT_BIG_ENOUGH, offset, length);
return builder.setCode(BUFFER_NOT_BIG_ENOUGH).setHasMorePacket(false).setOffset(offset).setPacketLength(length).build();
} else {
return new ProtoHandlerResult(BUFFER_PACKET_UNCOMPLETE, offset, length);
return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).setPacketLength(length).build();
}
}
}
Expand All @@ -86,10 +95,12 @@ private int getPacketLength(ByteBuffer buffer, int offset, boolean isSupportComp

private byte[] checkData(byte[] data, int length) {
//session packet should be set to the latest one
//todo: this method doesn't apply for compress
if (length >= com.actiontech.dble.net.mysql.MySQLPacket.MAX_PACKET_SIZE + com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE) {
if (incompleteData == null) {
incompleteData = data;
} else {
//skip header in package
byte[] nextData = new byte[data.length - com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE];
System.arraycopy(data, com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE, nextData, 0, data.length - com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE);
incompleteData = dataMerge(nextData);
Expand All @@ -108,6 +119,7 @@ private byte[] checkData(byte[] data, int length) {
}

private byte[] dataMerge(byte[] data) {
//todo: could optimize here. for example ,use linked-buffer
byte[] newData = new byte[incompleteData.length + data.length];
System.arraycopy(incompleteData, 0, newData, 0, incompleteData.length);
System.arraycopy(data, 0, newData, incompleteData.length, data.length);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,20 @@
package com.actiontech.dble.backend.mysql.proto.handler;

public class ProtoHandlerResult {
import org.jetbrains.annotations.Nullable;

public final class ProtoHandlerResult {
final ProtoHandlerResultCode code;
final int offset;
final int packetLength;
final byte[] packetData;
final boolean hasMorePacket;

public ProtoHandlerResult(ProtoHandlerResultCode code, int offset, byte[] packetData) {
private ProtoHandlerResult(ProtoHandlerResultCode code, int offset, int packetLength, byte[] packetData, boolean hasMorePacket) {
this.code = code;
this.offset = offset;
this.packetData = packetData;
this.packetLength = 0;
}

public ProtoHandlerResult(ProtoHandlerResultCode code, int offset) {
this.code = code;
this.offset = offset;
this.packetData = null;
this.packetLength = 0;
}

public ProtoHandlerResult(ProtoHandlerResultCode code, int offset, int packetLength) {
this.code = code;
this.offset = offset;
this.packetData = null;
this.packetLength = packetLength;
this.packetData = packetData;
this.hasMorePacket = hasMorePacket;
}

public ProtoHandlerResultCode getCode() {
Expand All @@ -35,13 +25,64 @@ public int getOffset() {
return offset;
}

@Nullable
public byte[] getPacketData() {
return packetData;
}

public int getPacketLength() {
return packetLength;
}

public boolean isHasMorePacket() {
return hasMorePacket;
}

public static ProtoHandlerResultBuilder builder() {
return new ProtoHandlerResultBuilder();
}


public static final class ProtoHandlerResultBuilder {
ProtoHandlerResultCode code;
int offset;
int packetLength;
byte[] packetData;
boolean hasMorePacket;

private ProtoHandlerResultBuilder() {
}


public ProtoHandlerResultBuilder setCode(ProtoHandlerResultCode val) {
this.code = val;
return this;
}

public ProtoHandlerResultBuilder setOffset(int val) {
this.offset = val;
return this;
}

public ProtoHandlerResultBuilder setPacketLength(int val) {
this.packetLength = val;
return this;
}

public ProtoHandlerResultBuilder setPacketData(byte[] val) {
this.packetData = val;
return this;
}

public ProtoHandlerResultBuilder setHasMorePacket(boolean val) {
this.hasMorePacket = val;
return this;
}

public ProtoHandlerResult build() {
return new ProtoHandlerResult(code, offset, packetLength, packetData, hasMorePacket);
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,25 @@
*/

public enum ProtoHandlerResultCode {
REACH_END_BUFFER,
/**
* receive a complete packet and has no more data exists in buffer.
*/

COMPLETE_PACKET,

/**
* receive a part of big packet.
*/
PART_OF_BIG_PACKET,
BUFFER_PACKET_UNCOMPLETE,
BUFFER_NOT_BIG_ENOUGH,


@Deprecated
REACH_END_BUFFER,
/**
* receive a complete packet and has rest data exists in buffer.
*/
@Deprecated
STLL_DATA_REMING
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.actiontech.dble.cluster;

import com.actiontech.dble.cluster.general.bean.KvBean;
import org.jetbrains.annotations.Nullable;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public abstract class AbstractConnection implements Connection {
protected long netInBytes;
protected long netOutBytes;
protected long lastLargeMessageTime;
private int sequenceId = 0;

protected final ConcurrentLinkedQueue<WriteOutTask> writeQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<byte[]> decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -96,51 +97,57 @@ private void handle(ByteBuffer dataBuffer) {
while (hasRemaining) {
ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress);
switch (result.getCode()) {
case REACH_END_BUFFER:
readReachEnd();
byte[] packetData = result.getPacketData();
if (packetData != null) {
if (!isSupportCompress) {
service.handle(new ServiceTask(packetData, service));
} else {
List<byte[]> packs = CompressUtil.decompressMysqlPacket(packetData, decompressUnfinishedDataQueue);
for (byte[] pack : packs) {
if (pack.length != 0) {
service.handle(new ServiceTask(pack, service));
}
}
}
case PART_OF_BIG_PACKET:

sequenceId++;
if (!result.isHasMorePacket()) {
readReachEnd();
dataBuffer.clear();
}

break;
case COMPLETE_PACKET:
processPacketData(result);
if (!result.isHasMorePacket()) {
readReachEnd();
dataBuffer.clear();
}
dataBuffer.clear();
hasRemaining = false;
break;
case BUFFER_PACKET_UNCOMPLETE:
compactReadBuffer(dataBuffer, result.getOffset());
hasRemaining = false;
break;
case BUFFER_NOT_BIG_ENOUGH:
ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength());
hasRemaining = false;
break;
case STLL_DATA_REMING:
byte[] partData = result.getPacketData();
if (partData != null) {
if (!isSupportCompress) {
service.handle(new ServiceTask(partData, service));
} else {
List<byte[]> packs = CompressUtil.decompressMysqlPacket(partData, decompressUnfinishedDataQueue);
for (byte[] pack : packs) {
if (pack.length != 0) {
service.handle(new ServiceTask(pack, service));
}
}
}
}
offset = result.getOffset();
continue;
default:
throw new RuntimeException("unknown error when read data");
}

hasRemaining = result.isHasMorePacket();
if (hasRemaining) {
offset = result.getOffset();
}
}
}

private void processPacketData(ProtoHandlerResult result) {
byte[] packetData = result.getPacketData();
if (packetData != null) {
int tmpSequenceId = sequenceId;
if (!isSupportCompress) {
sequenceId = 0;
service.handle(new ServiceTask(packetData, service, tmpSequenceId));
} else {
List<byte[]> packs = CompressUtil.decompressMysqlPacket(packetData, decompressUnfinishedDataQueue);
if (decompressUnfinishedDataQueue.isEmpty()) {
sequenceId = 0;
}
for (byte[] pack : packs) {
if (pack.length != 0) {
service.handle(new ServiceTask(pack, service, tmpSequenceId++));
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void writeBigPackage(MySQLResponseService service, int size) {
BufferUtil.writeUB3(buffer, MySQLPacket.MAX_PACKET_SIZE);
buffer.put(packetId++);
remain = writeBody(buffer, isFirst, remain);
service.getSession().getShardingService().nextPacketId();
//service.getSession().getShardingService().nextPacketId();
service.writeDirectly(buffer);
isFirst = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public abstract class MySQLPacket {

//HEADER_SIZE
public static final int PACKET_HEADER_SIZE = 4;

//2^24-1
public static final int MAX_PACKET_SIZE = 16777215;

public static final int MAX_EOF_SIZE = 9;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.actiontech.dble.buffer.BufferPool;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.service.AbstractService;

import com.actiontech.dble.singleton.BufferPoolManager;
import com.actiontech.dble.statistic.sql.StatisticListener;

Expand Down Expand Up @@ -97,7 +96,7 @@ public ByteBuffer write(ByteBuffer bb, AbstractService service,
service.writeDirectly(bb);
ByteBuffer tmpBuffer = service.allocate(totalSize);
BufferUtil.writeUB3(tmpBuffer, calcPacketSize());
tmpBuffer.put(packetId--);
tmpBuffer.put(packetId);
writeBody(tmpBuffer);
byte[] array = tmpBuffer.array();
service.recycleBuffer(tmpBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.mysql.MySQLPacket;
import com.actiontech.dble.services.VariablesService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.singleton.TraceManager;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -86,6 +87,9 @@ public ByteBuffer writeBigPackageToBuffer(byte[] data, ByteBuffer buffer) {
singlePacket = new byte[MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE];
ByteUtil.writeUB3(singlePacket, MySQLPacket.MAX_PACKET_SIZE);
singlePacket[3] = ++packetId;
if (this instanceof ShardingService) {
singlePacket[3] = (byte) ((ShardingService) this).nextPacketId();
}
System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, MySQLPacket.MAX_PACKET_SIZE);
srcPos += MySQLPacket.MAX_PACKET_SIZE;
length -= MySQLPacket.MAX_PACKET_SIZE;
Expand All @@ -94,6 +98,9 @@ public ByteBuffer writeBigPackageToBuffer(byte[] data, ByteBuffer buffer) {
singlePacket = new byte[length + MySQLPacket.PACKET_HEADER_SIZE];
ByteUtil.writeUB3(singlePacket, length);
singlePacket[3] = ++packetId;
if (this instanceof ShardingService) {
singlePacket[3] = (byte) ((ShardingService) this).nextPacketId();
}
System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, length);
buffer = writeToBuffer(singlePacket, buffer);
return buffer;
Expand Down
Loading

0 comments on commit 2bd011b

Please sign in to comment.