Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inner-836:big packet support #2501

Merged
merged 1 commit into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import java.nio.ByteBuffer;

import static com.actiontech.dble.backend.mysql.proto.handler.ProtoHandlerResultCode.*;
Expand All @@ -23,17 +24,19 @@ public MySQLProtoHandlerImpl() {
}

@Override
@Nonnull
public ProtoHandlerResult handle(ByteBuffer dataBuffer, int offset, boolean isSupportCompress) {
int position = dataBuffer.position();
int length = getPacketLength(dataBuffer, offset, isSupportCompress);
final ProtoHandlerResult.ProtoHandlerResultBuilder builder = ProtoHandlerResult.builder();
if (length == -1) {
if (offset != 0) {
return new ProtoHandlerResult(BUFFER_PACKET_UNCOMPLETE, offset);
return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).build();
} else if (!dataBuffer.hasRemaining()) {
throw new RuntimeException("invalid dataBuffer capacity ,too little buffer size " +
dataBuffer.capacity());
}
return new ProtoHandlerResult(REACH_END_BUFFER, offset);
return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).build();
}
if (position >= offset + length) {
// handle this package
Expand All @@ -42,7 +45,9 @@ public ProtoHandlerResult handle(ByteBuffer dataBuffer, int offset, boolean isSu
dataBuffer.get(data, 0, length);
data = checkData(data, length);
if (data == null) {
return new ProtoHandlerResult(REACH_END_BUFFER, offset);
builder.setCode(PART_OF_BIG_PACKET);
} else {
builder.setCode(COMPLETE_PACKET);
}

// offset to next position
Expand All @@ -51,17 +56,21 @@ public ProtoHandlerResult handle(ByteBuffer dataBuffer, int offset, boolean isSu
if (position != offset) {
// try next package parse
//dataBufferOffset = offset;
//should reset position after read.
dataBuffer.position(position);
return new ProtoHandlerResult(STLL_DATA_REMING, offset, data);
builder.setHasMorePacket(true);
} else {
builder.setHasMorePacket(false);
}
return new ProtoHandlerResult(REACH_END_BUFFER, offset, data);
builder.setOffset(offset).setPacketData(data);
return builder.build();
} else {
// not read whole message package ,so check if buffer enough and
// compact dataBuffer
if (!dataBuffer.hasRemaining()) {
return new ProtoHandlerResult(BUFFER_NOT_BIG_ENOUGH, offset, length);
return builder.setCode(BUFFER_NOT_BIG_ENOUGH).setHasMorePacket(false).setOffset(offset).setPacketLength(length).build();
} else {
return new ProtoHandlerResult(BUFFER_PACKET_UNCOMPLETE, offset, length);
return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).setPacketLength(length).build();
}
}
}
Expand All @@ -86,10 +95,12 @@ private int getPacketLength(ByteBuffer buffer, int offset, boolean isSupportComp

private byte[] checkData(byte[] data, int length) {
//session packet should be set to the latest one
//todo: this method doesn't apply for compress
if (length >= com.actiontech.dble.net.mysql.MySQLPacket.MAX_PACKET_SIZE + com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE) {
if (incompleteData == null) {
incompleteData = data;
} else {
//skip header in package
byte[] nextData = new byte[data.length - com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE];
System.arraycopy(data, com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE, nextData, 0, data.length - com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE);
incompleteData = dataMerge(nextData);
Expand All @@ -108,6 +119,7 @@ private byte[] checkData(byte[] data, int length) {
}

private byte[] dataMerge(byte[] data) {
//todo: could optimize here. for example ,use linked-buffer
byte[] newData = new byte[incompleteData.length + data.length];
System.arraycopy(incompleteData, 0, newData, 0, incompleteData.length);
System.arraycopy(data, 0, newData, incompleteData.length, data.length);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,20 @@
package com.actiontech.dble.backend.mysql.proto.handler;

public class ProtoHandlerResult {
import org.jetbrains.annotations.Nullable;

public final class ProtoHandlerResult {
final ProtoHandlerResultCode code;
final int offset;
final int packetLength;
final byte[] packetData;
final boolean hasMorePacket;

public ProtoHandlerResult(ProtoHandlerResultCode code, int offset, byte[] packetData) {
private ProtoHandlerResult(ProtoHandlerResultCode code, int offset, int packetLength, byte[] packetData, boolean hasMorePacket) {
this.code = code;
this.offset = offset;
this.packetData = packetData;
this.packetLength = 0;
}

public ProtoHandlerResult(ProtoHandlerResultCode code, int offset) {
this.code = code;
this.offset = offset;
this.packetData = null;
this.packetLength = 0;
}

public ProtoHandlerResult(ProtoHandlerResultCode code, int offset, int packetLength) {
this.code = code;
this.offset = offset;
this.packetData = null;
this.packetLength = packetLength;
this.packetData = packetData;
this.hasMorePacket = hasMorePacket;
}

public ProtoHandlerResultCode getCode() {
Expand All @@ -35,13 +25,64 @@ public int getOffset() {
return offset;
}

@Nullable
public byte[] getPacketData() {
return packetData;
}

public int getPacketLength() {
return packetLength;
}

public boolean isHasMorePacket() {
return hasMorePacket;
}

public static ProtoHandlerResultBuilder builder() {
return new ProtoHandlerResultBuilder();
}


public static final class ProtoHandlerResultBuilder {
ProtoHandlerResultCode code;
int offset;
int packetLength;
byte[] packetData;
boolean hasMorePacket;

private ProtoHandlerResultBuilder() {
}


public ProtoHandlerResultBuilder setCode(ProtoHandlerResultCode val) {
this.code = val;
return this;
}

public ProtoHandlerResultBuilder setOffset(int val) {
this.offset = val;
return this;
}

public ProtoHandlerResultBuilder setPacketLength(int val) {
this.packetLength = val;
return this;
}

public ProtoHandlerResultBuilder setPacketData(byte[] val) {
this.packetData = val;
return this;
}

public ProtoHandlerResultBuilder setHasMorePacket(boolean val) {
this.hasMorePacket = val;
return this;
}

public ProtoHandlerResult build() {
return new ProtoHandlerResult(code, offset, packetLength, packetData, hasMorePacket);
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,25 @@
*/

public enum ProtoHandlerResultCode {
REACH_END_BUFFER,
/**
* receive a complete packet and has no more data exists in buffer.
*/

COMPLETE_PACKET,

/**
* receive a part of big packet.
*/
PART_OF_BIG_PACKET,
BUFFER_PACKET_UNCOMPLETE,
BUFFER_NOT_BIG_ENOUGH,


@Deprecated
REACH_END_BUFFER,
/**
* receive a complete packet and has rest data exists in buffer.
*/
@Deprecated
STLL_DATA_REMING
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.actiontech.dble.cluster;

import com.actiontech.dble.cluster.general.bean.KvBean;
import org.jetbrains.annotations.Nullable;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public abstract class AbstractConnection implements Connection {
protected long netInBytes;
protected long netOutBytes;
protected long lastLargeMessageTime;
private int sequenceId = 0;

protected final ConcurrentLinkedQueue<WriteOutTask> writeQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<byte[]> decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -96,51 +97,57 @@ private void handle(ByteBuffer dataBuffer) {
while (hasRemaining) {
ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress);
switch (result.getCode()) {
case REACH_END_BUFFER:
readReachEnd();
byte[] packetData = result.getPacketData();
if (packetData != null) {
if (!isSupportCompress) {
service.handle(new ServiceTask(packetData, service));
} else {
List<byte[]> packs = CompressUtil.decompressMysqlPacket(packetData, decompressUnfinishedDataQueue);
for (byte[] pack : packs) {
if (pack.length != 0) {
service.handle(new ServiceTask(pack, service));
}
}
}
case PART_OF_BIG_PACKET:

sequenceId++;
if (!result.isHasMorePacket()) {
readReachEnd();
dataBuffer.clear();
}

break;
case COMPLETE_PACKET:
processPacketData(result);
if (!result.isHasMorePacket()) {
readReachEnd();
dataBuffer.clear();
}
dataBuffer.clear();
hasRemaining = false;
break;
case BUFFER_PACKET_UNCOMPLETE:
compactReadBuffer(dataBuffer, result.getOffset());
hasRemaining = false;
break;
case BUFFER_NOT_BIG_ENOUGH:
ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength());
hasRemaining = false;
break;
case STLL_DATA_REMING:
byte[] partData = result.getPacketData();
if (partData != null) {
if (!isSupportCompress) {
service.handle(new ServiceTask(partData, service));
} else {
List<byte[]> packs = CompressUtil.decompressMysqlPacket(partData, decompressUnfinishedDataQueue);
for (byte[] pack : packs) {
if (pack.length != 0) {
service.handle(new ServiceTask(pack, service));
}
}
}
}
offset = result.getOffset();
continue;
default:
throw new RuntimeException("unknown error when read data");
}

hasRemaining = result.isHasMorePacket();
if (hasRemaining) {
offset = result.getOffset();
}
}
}

private void processPacketData(ProtoHandlerResult result) {
byte[] packetData = result.getPacketData();
if (packetData != null) {
int tmpSequenceId = sequenceId;
if (!isSupportCompress) {
sequenceId = 0;
service.handle(new ServiceTask(packetData, service, tmpSequenceId));
} else {
List<byte[]> packs = CompressUtil.decompressMysqlPacket(packetData, decompressUnfinishedDataQueue);
if (decompressUnfinishedDataQueue.isEmpty()) {
sequenceId = 0;
}
for (byte[] pack : packs) {
if (pack.length != 0) {
service.handle(new ServiceTask(pack, service, tmpSequenceId++));
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ public void writeBigPackage(MySQLResponseService service, int size) {
BufferUtil.writeUB3(buffer, MySQLPacket.MAX_PACKET_SIZE);
buffer.put(packetId++);
remain = writeBody(buffer, isFirst, remain);
service.getSession().getShardingService().nextPacketId();
service.writeDirectly(buffer);
isFirst = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public abstract class MySQLPacket {

//HEADER_SIZE
public static final int PACKET_HEADER_SIZE = 4;

//2^24-1
public static final int MAX_PACKET_SIZE = 16777215;

public static final int MAX_EOF_SIZE = 9;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.actiontech.dble.buffer.BufferPool;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.service.AbstractService;

import com.actiontech.dble.singleton.BufferPoolManager;
import com.actiontech.dble.statistic.sql.StatisticListener;

Expand Down Expand Up @@ -97,7 +96,7 @@ public ByteBuffer write(ByteBuffer bb, AbstractService service,
service.writeDirectly(bb);
ByteBuffer tmpBuffer = service.allocate(totalSize);
BufferUtil.writeUB3(tmpBuffer, calcPacketSize());
tmpBuffer.put(packetId--);
tmpBuffer.put(packetId);
writeBody(tmpBuffer);
byte[] array = tmpBuffer.array();
service.recycleBuffer(tmpBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.mysql.MySQLPacket;
import com.actiontech.dble.services.VariablesService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.singleton.TraceManager;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -86,6 +87,9 @@ public ByteBuffer writeBigPackageToBuffer(byte[] data, ByteBuffer buffer) {
singlePacket = new byte[MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE];
ByteUtil.writeUB3(singlePacket, MySQLPacket.MAX_PACKET_SIZE);
singlePacket[3] = ++packetId;
if (this instanceof ShardingService) {
singlePacket[3] = (byte) ((ShardingService) this).nextPacketId();
}
System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, MySQLPacket.MAX_PACKET_SIZE);
srcPos += MySQLPacket.MAX_PACKET_SIZE;
length -= MySQLPacket.MAX_PACKET_SIZE;
Expand All @@ -94,6 +98,9 @@ public ByteBuffer writeBigPackageToBuffer(byte[] data, ByteBuffer buffer) {
singlePacket = new byte[length + MySQLPacket.PACKET_HEADER_SIZE];
ByteUtil.writeUB3(singlePacket, length);
singlePacket[3] = ++packetId;
if (this instanceof ShardingService) {
singlePacket[3] = (byte) ((ShardingService) this).nextPacketId();
}
System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, length);
buffer = writeToBuffer(singlePacket, buffer);
return buffer;
Expand Down
Loading