Skip to content

Commit

Permalink
inner-764: support server-side cursor (#2388)
Browse files Browse the repository at this point in the history
Signed-off-by: dcy <dcy10000@gmail.com>
  • Loading branch information
dcy10000 authored Jan 11, 2021
1 parent 84c3f87 commit 0b33a0d
Show file tree
Hide file tree
Showing 27 changed files with 967 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,37 @@
*/
package com.actiontech.dble.backend.mysql;

import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.backend.mysql.store.CursorCache;
import com.actiontech.dble.backend.mysql.store.CursorCacheForGeneral;
import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.server.NonBlockingSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/**
* @author mycat, CrazyPig
*/
public class PreparedStatement {
public class PreparedStatement implements Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger(PreparedStatement.class);
private long id;
private String statement;
private int columnsNumber;
private int parametersNumber;
private int[] parametersType;
private CursorCache cursorCache;
private List<FieldPacket> fieldPackets;
private Consumer<Integer> prepareCallback = null;

/**
* store the byte data from COM_STMT_SEND_LONG_DATA
* <pre>
Expand All @@ -29,10 +45,9 @@ public class PreparedStatement {
*/
private Map<Long, ByteArrayOutputStream> longDataMap;

public PreparedStatement(long id, String statement, int columnsNumber, int parametersNumber) {
public PreparedStatement(long id, String statement, int parametersNumber) {
this.id = id;
this.statement = statement;
this.columnsNumber = columnsNumber;
this.parametersNumber = parametersNumber;
this.parametersType = new int[parametersNumber];
this.longDataMap = new HashMap<>();
Expand Down Expand Up @@ -62,6 +77,45 @@ public ByteArrayOutputStream getLongData(long paramId) {
return longDataMap.get(paramId);
}

public void setColumnsNumber(int columnsNumber) {
this.columnsNumber = columnsNumber;
}

public void initCursor(NonBlockingSession session, ResponseHandler responseHandler, int fieldCount, List<FieldPacket> tmpFieldPackets) {
if (cursorCache != null) {
cursorCache.close();
LOGGER.warn("cursor in one prepareStatement init twice. Maybe something wrong");
}
if (session.getShardingService().getRequestScope().isUsingJoin()) {
/*
todo:could use optimized implementation here
*/
cursorCache = new CursorCacheForGeneral(fieldCount);
} else {
cursorCache = new CursorCacheForGeneral(fieldCount);
}

this.fieldPackets = tmpFieldPackets;
}

public List<FieldPacket> getFieldPackets() {
return fieldPackets;
}

public CursorCache getCursorCache() {
return cursorCache;
}


public void setPrepareCallback(Consumer<Integer> prepareCallback) {
this.prepareCallback = prepareCallback;
}


public void onPrepareOk(int columnCount) {
prepareCallback.accept(columnCount);
}

/**
* reset value which is used by COM_STMT_RESET
*/
Expand Down Expand Up @@ -93,4 +147,10 @@ public Map<Long, ByteArrayOutputStream> getLongDataMap() {
return longDataMap;
}

@Override
public void close() {
if (cursorCache != null) {
cursorCache.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.server.RequestScope;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.server.variables.OutputStateEnum;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.singleton.TraceManager;
Expand All @@ -39,6 +41,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import static com.actiontech.dble.net.mysql.StatusFlags.SERVER_STATUS_CURSOR_EXISTS;

/**
* @author mycat
*/
Expand All @@ -61,6 +65,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
private final boolean modifiedSQL;
protected Set<RouteResultsetNode> connRrns = new ConcurrentSkipListSet<>();
private Map<String, Integer> shardingNodePauseInfo; // only for debug
private RequestScope requestScope;

public MultiNodeQueryHandler(RouteResultset rrs, NonBlockingSession session) {
super(session);
Expand All @@ -77,6 +82,7 @@ public MultiNodeQueryHandler(RouteResultset rrs, NonBlockingSession session) {
this.sessionAutocommit = session.getShardingService().isAutocommit();
this.modifiedSQL = rrs.getNodes()[0].isModifySQL();
initDebugInfo();
requestScope = session.getShardingService().getRequestScope();
}

@Override
Expand Down Expand Up @@ -281,6 +287,9 @@ public void okResponse(byte[] data, AbstractService service) {
TraceManager.TraceObject traceObject = TraceManager.serviceTrace(service, "get-ok-response");
TraceManager.finishSpan(service, traceObject);
this.netOutBytes += data.length;
if (OutputStateEnum.PREPARE.equals(requestScope.getOutputState())) {
return;
}
boolean executeResponse = ((MySQLResponseService) service).syncAndExecute();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("received ok response ,executeResponse:" + executeResponse + " from " + service);
Expand Down Expand Up @@ -378,6 +387,7 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, AbstractService ser
}

this.netOutBytes += eof.length;

if (errorResponse.get()) {
return;
}
Expand All @@ -391,12 +401,22 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, AbstractService ser
session.releaseConnectionIfSafe((MySQLResponseService) service, false);
}
}

boolean zeroReached;
lock.lock();
try {
unResponseRrns.remove(rNode);
zeroReached = canResponse();
if (zeroReached) {
if (OutputStateEnum.PREPARE.equals(requestScope.getOutputState())) {
requestScope.getCurrentPreparedStatement().onPrepareOk(fieldCount);
return;
}
if (requestScope.isUsingCursor()) {
requestScope.getCurrentPreparedStatement().getCursorCache().done();
session.getShardingService().writeDirectly(byteBuffer);
return;
}
this.resultSize += eof.length;
if (!rrs.isCallStatement()) {
if (this.sessionAutocommit && !session.getShardingService().isTxStart() && !session.getShardingService().isLocked()) { // clear all connections
Expand Down Expand Up @@ -431,6 +451,9 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, AbstractService ser
@Override
public boolean rowResponse(final byte[] row, RowDataPacket rowPacketNull, boolean isLeft, AbstractService service) {
this.netOutBytes += row.length;
if (OutputStateEnum.PREPARE.equals(requestScope.getOutputState())) {
return false;
}
if (errorResponse.get()) {
// the connection has been closed or set to "txInterrupt" properly
//in tryErrorFinished() method! If we close it here, it can
Expand Down Expand Up @@ -463,13 +486,19 @@ public boolean rowResponse(final byte[] row, RowDataPacket rowPacketNull, boolea
}

RowDataPacket rowDataPk = new RowDataPacket(fieldCount);
row[3] = (byte) session.getShardingService().nextPacketId();
if (!requestScope.isUsingCursor()) {
row[3] = (byte) session.getShardingService().nextPacketId();
}
rowDataPk.read(row);
if (session.isPrepared()) {
BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
binRowDataPk.read(fieldPackets, rowDataPk);
binRowDataPk.setPacketId(rowDataPk.getPacketId());
byteBuffer = binRowDataPk.write(byteBuffer, session.getShardingService(), true);
if (requestScope.isPrepared()) {
if (requestScope.isUsingCursor()) {
requestScope.getCurrentPreparedStatement().getCursorCache().add(rowDataPk);
} else {
BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
binRowDataPk.read(fieldPackets, rowDataPk);
binRowDataPk.setPacketId(rowDataPk.getPacketId());
byteBuffer = binRowDataPk.write(byteBuffer, session.getShardingService(), true);
}
} else {
byteBuffer = rowDataPk.write(byteBuffer, session.getShardingService(), true);
}
Expand Down Expand Up @@ -529,6 +558,9 @@ private void writeEofResult(byte[] eof, ShardingService source) {
if (byteBuffer == null) {
return;
}
if (requestScope.isUsingCursor()) {
return;
}
EOFRowPacket eofRowPacket = new EOFRowPacket();
eofRowPacket.read(eof);
eofRowPacket.setPacketId((byte) session.getShardingService().nextPacketId());
Expand Down Expand Up @@ -562,7 +594,10 @@ private void executeFieldEof(byte[] header, List<byte[]> fields, byte[] eof) {
}
ShardingService service = session.getShardingService();
fieldCount = fields.size();
header[3] = (byte) session.getShardingService().nextPacketId();
if (OutputStateEnum.PREPARE.equals(requestScope.getOutputState())) {
return;
}
header[3] = (byte) service.nextPacketId();
byteBuffer = service.writeToBuffer(header, byteBuffer);

if (!errorResponse.get()) {
Expand All @@ -584,7 +619,17 @@ private void executeFieldEof(byte[] header, List<byte[]> fields, byte[] eof) {
fieldPkg.setPacketId(session.getShardingService().nextPacketId());
byteBuffer = fieldPkg.write(byteBuffer, service, false);
}
if (requestScope.isUsingCursor()) {
requestScope.getCurrentPreparedStatement().initCursor(session, this, fields.size(), fieldPackets);
}

eof[3] = (byte) session.getShardingService().nextPacketId();
if (requestScope.isUsingCursor()) {
byte statusFlag = 0;
statusFlag |= service.getSession2().getShardingService().isAutocommit() ? 2 : 1;
statusFlag |= SERVER_STATUS_CURSOR_EXISTS;
eof[7] = statusFlag;
}
byteBuffer = service.writeToBuffer(eof, byteBuffer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
package com.actiontech.dble.backend.mysql.nio.handler;

import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.nio.handler.builder.BaseHandlerBuilder;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.AutoTxOperation;
import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap;
import com.actiontech.dble.backend.mysql.nio.handler.util.HandlerTool;
Expand All @@ -21,6 +20,7 @@
import com.actiontech.dble.plan.common.item.ItemField;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.services.factorys.FinalHandlerFactory;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.singleton.TraceManager;
import org.slf4j.Logger;
Expand All @@ -40,14 +40,14 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
private final int queueSize;
private Map<MySQLResponseService, BlockingQueue<HeapItem>> queues;
private RowDataComparator rowComparator;
private OutputHandler outputHandler;
private DMLResponseHandler outputHandler;
private volatile boolean noNeedRows = false;

public MultiNodeSelectHandler(RouteResultset rrs, NonBlockingSession session) {
super(rrs, session);
this.queueSize = SystemConfig.getInstance().getMergeQueueSize();
this.queues = new ConcurrentHashMap<>();
outputHandler = new OutputHandler(BaseHandlerBuilder.getSequenceId(), session);
outputHandler = FinalHandlerFactory.createFinalHandler(session);
}

@Override
Expand Down
Loading

0 comments on commit 0b33a0d

Please sign in to comment.