diff --git a/src/main/java/com/actiontech/dble/backend/mysql/PreparedStatement.java b/src/main/java/com/actiontech/dble/backend/mysql/PreparedStatement.java index 0df47663be..a2e71b00cb 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/PreparedStatement.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/PreparedStatement.java @@ -5,21 +5,37 @@ */ package com.actiontech.dble.backend.mysql; +import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; +import com.actiontech.dble.backend.mysql.store.CursorCache; +import com.actiontech.dble.backend.mysql.store.CursorCacheForGeneral; +import com.actiontech.dble.net.mysql.FieldPacket; +import com.actiontech.dble.server.NonBlockingSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Consumer; /** * @author mycat, CrazyPig */ -public class PreparedStatement { +public class PreparedStatement implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(PreparedStatement.class); private long id; private String statement; private int columnsNumber; private int parametersNumber; private int[] parametersType; + private CursorCache cursorCache; + private List fieldPackets; + private Consumer prepareCallback = null; + /** * store the byte data from COM_STMT_SEND_LONG_DATA *
@@ -29,10 +45,9 @@ public class PreparedStatement {
      */
     private Map longDataMap;
 
-    public PreparedStatement(long id, String statement, int columnsNumber, int parametersNumber) {
+    public PreparedStatement(long id, String statement, int parametersNumber) {
         this.id = id;
         this.statement = statement;
-        this.columnsNumber = columnsNumber;
         this.parametersNumber = parametersNumber;
         this.parametersType = new int[parametersNumber];
         this.longDataMap = new HashMap<>();
@@ -62,6 +77,45 @@ public ByteArrayOutputStream getLongData(long paramId) {
         return longDataMap.get(paramId);
     }
 
+    public void setColumnsNumber(int columnsNumber) {
+        this.columnsNumber = columnsNumber;
+    }
+
+    public void initCursor(NonBlockingSession session, ResponseHandler responseHandler, int fieldCount, List tmpFieldPackets) {
+        if (cursorCache != null) {
+            cursorCache.close();
+            LOGGER.warn("cursor in one prepareStatement init twice. Maybe something wrong");
+        }
+        if (session.getShardingService().getRequestScope().isUsingJoin()) {
+            /*
+            todo:could use optimized implementation here
+             */
+            cursorCache = new CursorCacheForGeneral(fieldCount);
+        } else {
+            cursorCache = new CursorCacheForGeneral(fieldCount);
+        }
+
+        this.fieldPackets = tmpFieldPackets;
+    }
+
+    public List getFieldPackets() {
+        return fieldPackets;
+    }
+
+    public CursorCache getCursorCache() {
+        return cursorCache;
+    }
+
+
+    public void setPrepareCallback(Consumer prepareCallback) {
+        this.prepareCallback = prepareCallback;
+    }
+
+
+    public void onPrepareOk(int columnCount) {
+        prepareCallback.accept(columnCount);
+    }
+
     /**
      * reset value which is used by COM_STMT_RESET
      */
@@ -93,4 +147,10 @@ public Map getLongDataMap() {
         return longDataMap;
     }
 
+    @Override
+    public void close() {
+        if (cursorCache != null) {
+            cursorCache.close();
+        }
+    }
 }
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java
index 81e2e36ccd..0bac759f98 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java
@@ -21,7 +21,9 @@
 import com.actiontech.dble.route.RouteResultset;
 import com.actiontech.dble.route.RouteResultsetNode;
 import com.actiontech.dble.server.NonBlockingSession;
+import com.actiontech.dble.server.RequestScope;
 import com.actiontech.dble.server.parser.ServerParse;
+import com.actiontech.dble.server.variables.OutputStateEnum;
 import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
 import com.actiontech.dble.services.mysqlsharding.ShardingService;
 import com.actiontech.dble.singleton.TraceManager;
@@ -39,6 +41,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.LockSupport;
 
+import static com.actiontech.dble.net.mysql.StatusFlags.SERVER_STATUS_CURSOR_EXISTS;
+
 /**
  * @author mycat
  */
@@ -61,6 +65,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
     private final boolean modifiedSQL;
     protected Set connRrns = new ConcurrentSkipListSet<>();
     private Map shardingNodePauseInfo; // only for debug
+    private RequestScope requestScope;
 
     public MultiNodeQueryHandler(RouteResultset rrs, NonBlockingSession session) {
         super(session);
@@ -77,6 +82,7 @@ public MultiNodeQueryHandler(RouteResultset rrs, NonBlockingSession session) {
         this.sessionAutocommit = session.getShardingService().isAutocommit();
         this.modifiedSQL = rrs.getNodes()[0].isModifySQL();
         initDebugInfo();
+        requestScope = session.getShardingService().getRequestScope();
     }
 
     @Override
@@ -281,6 +287,9 @@ public void okResponse(byte[] data, AbstractService service) {
         TraceManager.TraceObject traceObject = TraceManager.serviceTrace(service, "get-ok-response");
         TraceManager.finishSpan(service, traceObject);
         this.netOutBytes += data.length;
+        if (OutputStateEnum.PREPARE.equals(requestScope.getOutputState())) {
+            return;
+        }
         boolean executeResponse = ((MySQLResponseService) service).syncAndExecute();
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("received ok response ,executeResponse:" + executeResponse + " from " + service);
@@ -378,6 +387,7 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, AbstractService ser
         }
 
         this.netOutBytes += eof.length;
+
         if (errorResponse.get()) {
             return;
         }
@@ -391,12 +401,22 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, AbstractService ser
                 session.releaseConnectionIfSafe((MySQLResponseService) service, false);
             }
         }
+
         boolean zeroReached;
         lock.lock();
         try {
             unResponseRrns.remove(rNode);
             zeroReached = canResponse();
             if (zeroReached) {
+                if (OutputStateEnum.PREPARE.equals(requestScope.getOutputState())) {
+                    requestScope.getCurrentPreparedStatement().onPrepareOk(fieldCount);
+                    return;
+                }
+                if (requestScope.isUsingCursor()) {
+                    requestScope.getCurrentPreparedStatement().getCursorCache().done();
+                    session.getShardingService().writeDirectly(byteBuffer);
+                    return;
+                }
                 this.resultSize += eof.length;
                 if (!rrs.isCallStatement()) {
                     if (this.sessionAutocommit && !session.getShardingService().isTxStart() && !session.getShardingService().isLocked()) { // clear all connections
@@ -431,6 +451,9 @@ public void rowEofResponse(final byte[] eof, boolean isLeft, AbstractService ser
     @Override
     public boolean rowResponse(final byte[] row, RowDataPacket rowPacketNull, boolean isLeft, AbstractService service) {
         this.netOutBytes += row.length;
+        if (OutputStateEnum.PREPARE.equals(requestScope.getOutputState())) {
+            return false;
+        }
         if (errorResponse.get()) {
             // the connection has been closed or set to "txInterrupt" properly
             //in tryErrorFinished() method! If we close it here, it can
@@ -463,13 +486,19 @@ public boolean rowResponse(final byte[] row, RowDataPacket rowPacketNull, boolea
                 }
 
                 RowDataPacket rowDataPk = new RowDataPacket(fieldCount);
-                row[3] = (byte) session.getShardingService().nextPacketId();
+                if (!requestScope.isUsingCursor()) {
+                    row[3] = (byte) session.getShardingService().nextPacketId();
+                }
                 rowDataPk.read(row);
-                if (session.isPrepared()) {
-                    BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
-                    binRowDataPk.read(fieldPackets, rowDataPk);
-                    binRowDataPk.setPacketId(rowDataPk.getPacketId());
-                    byteBuffer = binRowDataPk.write(byteBuffer, session.getShardingService(), true);
+                if (requestScope.isPrepared()) {
+                    if (requestScope.isUsingCursor()) {
+                        requestScope.getCurrentPreparedStatement().getCursorCache().add(rowDataPk);
+                    } else {
+                        BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
+                        binRowDataPk.read(fieldPackets, rowDataPk);
+                        binRowDataPk.setPacketId(rowDataPk.getPacketId());
+                        byteBuffer = binRowDataPk.write(byteBuffer, session.getShardingService(), true);
+                    }
                 } else {
                     byteBuffer = rowDataPk.write(byteBuffer, session.getShardingService(), true);
                 }
@@ -529,6 +558,9 @@ private void writeEofResult(byte[] eof, ShardingService source) {
         if (byteBuffer == null) {
             return;
         }
+        if (requestScope.isUsingCursor()) {
+            return;
+        }
         EOFRowPacket eofRowPacket = new EOFRowPacket();
         eofRowPacket.read(eof);
         eofRowPacket.setPacketId((byte) session.getShardingService().nextPacketId());
@@ -562,7 +594,10 @@ private void executeFieldEof(byte[] header, List fields, byte[] eof) {
         }
         ShardingService service = session.getShardingService();
         fieldCount = fields.size();
-        header[3] = (byte) session.getShardingService().nextPacketId();
+        if (OutputStateEnum.PREPARE.equals(requestScope.getOutputState())) {
+            return;
+        }
+        header[3] = (byte) service.nextPacketId();
         byteBuffer = service.writeToBuffer(header, byteBuffer);
 
         if (!errorResponse.get()) {
@@ -584,7 +619,17 @@ private void executeFieldEof(byte[] header, List fields, byte[] eof) {
                 fieldPkg.setPacketId(session.getShardingService().nextPacketId());
                 byteBuffer = fieldPkg.write(byteBuffer, service, false);
             }
+            if (requestScope.isUsingCursor()) {
+                requestScope.getCurrentPreparedStatement().initCursor(session, this, fields.size(), fieldPackets);
+            }
+
             eof[3] = (byte) session.getShardingService().nextPacketId();
+            if (requestScope.isUsingCursor()) {
+                byte statusFlag = 0;
+                statusFlag |= service.getSession2().getShardingService().isAutocommit() ? 2 : 1;
+                statusFlag |= SERVER_STATUS_CURSOR_EXISTS;
+                eof[7] = statusFlag;
+            }
             byteBuffer = service.writeToBuffer(eof, byteBuffer);
         }
     }
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java
index b1709aa637..b2d046dc07 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java
@@ -5,8 +5,7 @@
 package com.actiontech.dble.backend.mysql.nio.handler;
 
 import com.actiontech.dble.DbleServer;
-import com.actiontech.dble.backend.mysql.nio.handler.builder.BaseHandlerBuilder;
-import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler;
+import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
 import com.actiontech.dble.backend.mysql.nio.handler.transaction.AutoTxOperation;
 import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap;
 import com.actiontech.dble.backend.mysql.nio.handler.util.HandlerTool;
@@ -21,6 +20,7 @@
 import com.actiontech.dble.plan.common.item.ItemField;
 import com.actiontech.dble.route.RouteResultset;
 import com.actiontech.dble.server.NonBlockingSession;
+import com.actiontech.dble.services.factorys.FinalHandlerFactory;
 import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
 import com.actiontech.dble.singleton.TraceManager;
 import org.slf4j.Logger;
@@ -40,14 +40,14 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
     private final int queueSize;
     private Map> queues;
     private RowDataComparator rowComparator;
-    private OutputHandler outputHandler;
+    private DMLResponseHandler outputHandler;
     private volatile boolean noNeedRows = false;
 
     public MultiNodeSelectHandler(RouteResultset rrs, NonBlockingSession session) {
         super(rrs, session);
         this.queueSize = SystemConfig.getInstance().getMergeQueueSize();
         this.queues = new ConcurrentHashMap<>();
-        outputHandler = new OutputHandler(BaseHandlerBuilder.getSequenceId(), session);
+        outputHandler = FinalHandlerFactory.createFinalHandler(session);
     }
 
     @Override
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java
index 5bc83d32fb..6c2bd5feb4 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java
@@ -20,6 +20,8 @@
 import com.actiontech.dble.route.RouteResultset;
 import com.actiontech.dble.route.RouteResultsetNode;
 import com.actiontech.dble.server.NonBlockingSession;
+import com.actiontech.dble.server.RequestScope;
+import com.actiontech.dble.server.variables.OutputStateEnum;
 import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
 import com.actiontech.dble.services.mysqlsharding.ShardingService;
 import com.actiontech.dble.singleton.TraceManager;
@@ -36,6 +38,8 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static com.actiontech.dble.net.mysql.StatusFlags.SERVER_STATUS_CURSOR_EXISTS;
+
 /**
  * @author mycat
  */
@@ -57,6 +61,7 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
     private volatile boolean connClosed = false;
     protected AtomicBoolean writeToClient = new AtomicBoolean(false);
 
+    private RequestScope requestScope;
 
     public SingleNodeHandler(RouteResultset rrs, NonBlockingSession session) {
         this.rrs = rrs;
@@ -68,6 +73,8 @@ public SingleNodeHandler(RouteResultset rrs, NonBlockingSession session) {
             throw new IllegalArgumentException("session is null!");
         }
         this.session = session;
+        requestScope = session.getShardingService().getRequestScope();
+
     }
 
 
@@ -244,6 +251,9 @@ public void okResponse(byte[] data, AbstractService service) {
         TraceManager.TraceObject traceObject = TraceManager.serviceTrace(service, "get-ok-packet");
         TraceManager.finishSpan(service, traceObject);
         this.netOutBytes += data.length;
+        if (OutputStateEnum.PREPARE.equals(requestScope.getOutputState())) {
+            return;
+        }
         boolean executeResponse = ((MySQLResponseService) service).syncAndExecute();
         if (executeResponse) {
             this.resultSize += data.length;
@@ -282,6 +292,11 @@ public void rowEofResponse(byte[] eof, boolean isLeft, AbstractService service)
         TraceManager.finishSpan(service, traceObject);
         this.netOutBytes += eof.length;
         this.resultSize += eof.length;
+        if (OutputStateEnum.PREPARE.equals(requestScope.getOutputState())) {
+            requestScope.getCurrentPreparedStatement().onPrepareOk(fieldCount);
+            writeToClient.compareAndSet(false, true);
+            return;
+        }
         // if it's call statement,it will not release connection
         if (!rrs.isCallStatement()) {
             session.releaseConnectionIfSafe((MySQLResponseService) service, false);
@@ -295,10 +310,16 @@ public void rowEofResponse(byte[] eof, boolean isLeft, AbstractService service)
         ShardingService shardingService = session.getShardingService();
         session.setResponseTime(true);
         doSqlStat();
+        if (requestScope.isUsingCursor()) {
+            requestScope.getCurrentPreparedStatement().getCursorCache().done();
+            session.getShardingService().writeDirectly(buffer);
+        }
         lock.lock();
         try {
             if (writeToClient.compareAndSet(false, true)) {
-                eofRowPacket.write(buffer, shardingService);
+                if (!requestScope.isUsingCursor()) {
+                    eofRowPacket.write(buffer, shardingService);
+                }
             }
         } finally {
             lock.unlock();
@@ -331,6 +352,10 @@ public void fieldEofResponse(byte[] header, List fields, List fields, List fieldsNull, List fieldsNull, List fieldsNull, List fieldPackets, byte[] eofNull, boolean isLeft, AbstractService service) {
+        this.fieldPackets = fieldPackets;
+    }
+
+    @Override
+    public boolean rowResponse(byte[] rowNull, RowDataPacket rowPacket, boolean isLeft, AbstractService service) {
+        return false;
+    }
+
+    @Override
+    public void rowEofResponse(byte[] data, boolean isLeft, AbstractService service) {
+        requestScope.getCurrentPreparedStatement().onPrepareOk(fieldPackets.size());
+        return;
+
+    }
+
+
+}
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java
index 532e17df12..1497bf2a8e 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java
@@ -24,6 +24,7 @@
 import com.actiontech.dble.plan.common.exception.MySQLOutPutException;
 import com.actiontech.dble.plan.common.field.Field;
 import com.actiontech.dble.plan.common.item.Item;
+import com.actiontech.dble.server.NonBlockingSession;
 import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
 import com.actiontech.dble.singleton.BufferPoolManager;
 import com.actiontech.dble.util.FairLinkedBlockingDeque;
@@ -78,6 +79,9 @@ public JoinHandler(long id, Session session, boolean isLeftJoin, List lef
         this.leftFieldPackets = new ArrayList<>();
         this.rightFieldPackets = new ArrayList<>();
         this.otherJoinOn = otherJoinOn;
+        if (session instanceof NonBlockingSession) {
+            ((NonBlockingSession) session).getShardingService().getRequestScope().setUsingJoin(true);
+        }
     }
 
     @Override
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/store/CursorCache.java b/src/main/java/com/actiontech/dble/backend/mysql/store/CursorCache.java
new file mode 100644
index 0000000000..273c3b044c
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/mysql/store/CursorCache.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+
+package com.actiontech.dble.backend.mysql.store;
+
+import com.actiontech.dble.net.mysql.RowDataPacket;
+
+import java.util.Iterator;
+
+/**
+ * @author dcy
+ * Create Date: 2020-12-31
+ */
+public interface CursorCache {
+    String CHARSET = "UTF-8";
+
+
+    void add(RowDataPacket row);
+
+
+    void done();
+
+    boolean isDone();
+
+    /**
+     * using Iterator to reduce resource usage
+     *
+     * @param expectRowNum
+     * @return
+     */
+    Iterator fetchBatch(long expectRowNum);
+
+
+    int getRowCount();
+
+
+    void close();
+}
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/store/CursorCacheForGeneral.java b/src/main/java/com/actiontech/dble/backend/mysql/store/CursorCacheForGeneral.java
new file mode 100644
index 0000000000..d0005be16c
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/backend/mysql/store/CursorCacheForGeneral.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+
+package com.actiontech.dble.backend.mysql.store;
+
+import com.actiontech.dble.config.model.SystemConfig;
+import com.actiontech.dble.net.mysql.RowDataPacket;
+import com.actiontech.dble.plan.common.external.ResultStore;
+import com.actiontech.dble.singleton.BufferPoolManager;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * @author dcy
+ * Create Date: 2020-12-22
+ */
+public class CursorCacheForGeneral implements CursorCache {
+    ResultStore localResult;
+    private volatile boolean complete = false;
+
+
+    public CursorCacheForGeneral(int fieldCount) {
+        final UnSortedLocalResult unSortedLocalResult = new UnSortedLocalResult(fieldCount, BufferPoolManager.getBufferPool(), CHARSET);
+        /*
+            max-memory before persist.
+         */
+        unSortedLocalResult.setMaxMemory(SystemConfig.getInstance().getMaxHeapTableSize());
+        /*
+            read buffer chunk size
+         */
+        unSortedLocalResult.setMaxReadMemorySize(SystemConfig.getInstance().getHeapTableBufferChunkSize());
+        this.localResult = unSortedLocalResult;
+
+    }
+
+
+    @Override
+    public void add(RowDataPacket row) {
+        localResult.add(row);
+    }
+
+
+    @Override
+    public void done() {
+        localResult.done();
+        complete = true;
+    }
+
+
+    @Override
+    public boolean isDone() {
+        return complete;
+    }
+
+
+    @Override
+    public Iterator fetchBatch(long expectRowNum) {
+        return new ScannerIterator(expectRowNum);
+    }
+
+    @Override
+    public int getRowCount() {
+        return localResult.getRowCount();
+    }
+
+
+    @Override
+    public void close() {
+        localResult.close();
+    }
+
+    private final class ScannerIterator implements Iterator {
+        private RowDataPacket nextPacket;
+        private boolean fetched = false;
+        long readCount = 0;
+        long limitCount;
+
+        private ScannerIterator(long limitCount) {
+            this.limitCount = limitCount;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (fetched) {
+                return true;
+            }
+            if (readCount >= limitCount) {
+                return false;
+            }
+            nextPacket = localResult.next();
+            fetched = true;
+            readCount++;
+            return nextPacket != null;
+        }
+
+        @Override
+        public RowDataPacket next() {
+            if (!fetched) {
+                throw new NoSuchElementException("please call hasNext()  before this.");
+            }
+            fetched = false;
+            return nextPacket;
+        }
+    }
+}
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/store/LocalResult.java b/src/main/java/com/actiontech/dble/backend/mysql/store/LocalResult.java
index 25d60d008e..aa9b8b3111 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/store/LocalResult.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/store/LocalResult.java
@@ -34,6 +34,7 @@ public abstract class LocalResult implements ResultStore {
     /* @bug 1208 */
     protected String charset = "UTF-8";
     protected MemSizeController bufferMC;
+    protected int maxReadMemorySize = -1;
 
     public LocalResult(int initialCapacity, int fieldsCount, BufferPool pool, String charset) {
         this.rows = new ArrayList<>(initialCapacity);
@@ -45,6 +46,10 @@ public LocalResult(int initialCapacity, int fieldsCount, BufferPool pool, String
         this.charset = charset;
     }
 
+    public void setMaxMemory(int maxMemory) {
+        this.maxMemory = maxMemory;
+    }
+
     /**
      * add a row into localresult
      *
@@ -221,4 +226,8 @@ public LocalResult setMemSizeController(MemSizeController memSizeController) {
         this.bufferMC = memSizeController;
         return this;
     }
+
+    public void setMaxReadMemorySize(int maxReadMemorySize) {
+        this.maxReadMemorySize = maxReadMemorySize;
+    }
 }
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/store/UnSortedLocalResult.java b/src/main/java/com/actiontech/dble/backend/mysql/store/UnSortedLocalResult.java
index 4c4c951e11..7bf4dd581a 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/store/UnSortedLocalResult.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/store/UnSortedLocalResult.java
@@ -21,7 +21,11 @@ public UnSortedLocalResult(int initialCapacity, int fieldsCount, BufferPool pool
 
     @Override
     protected ResultExternal makeExternal() {
-        return new UnSortedResultDiskBuffer(pool, fieldsCount);
+        if (maxReadMemorySize != -1) {
+            return new UnSortedResultDiskBuffer(pool, fieldsCount, maxReadMemorySize);
+        } else {
+            return new UnSortedResultDiskBuffer(pool, fieldsCount);
+        }
     }
 
     @Override
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/store/diskbuffer/ResultDiskBuffer.java b/src/main/java/com/actiontech/dble/backend/mysql/store/diskbuffer/ResultDiskBuffer.java
index 47f1141732..f98a3b4402 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/store/diskbuffer/ResultDiskBuffer.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/store/diskbuffer/ResultDiskBuffer.java
@@ -127,6 +127,13 @@ static class ResultDiskTape {
             this.readBuffer = pool.allocate();
         }
 
+        ResultDiskTape(BufferPool pool, FileStore file, int fieldCount, int maxReadMemorySize) {
+            this.pool = pool;
+            this.file = file;
+            this.fieldCount = fieldCount;
+            this.readBuffer = pool.allocate(maxReadMemorySize);
+        }
+
         public boolean isEnd() {
             return isReadAll();
         }
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/store/diskbuffer/UnSortedResultDiskBuffer.java b/src/main/java/com/actiontech/dble/backend/mysql/store/diskbuffer/UnSortedResultDiskBuffer.java
index 3277caaa1c..8f8de8f421 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/store/diskbuffer/UnSortedResultDiskBuffer.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/store/diskbuffer/UnSortedResultDiskBuffer.java
@@ -30,6 +30,11 @@ public UnSortedResultDiskBuffer(BufferPool pool, int columnCount) {
         mainTape = new ResultDiskTape(pool, file, columnCount);
     }
 
+    public UnSortedResultDiskBuffer(BufferPool pool, int columnCount, int maxReadMemorySize) {
+        super(pool, columnCount);
+        mainTape = new ResultDiskTape(pool, file, columnCount, maxReadMemorySize);
+    }
+
     @Override
     public int tapeCount() {
         return 1;
diff --git a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java
index cc02489b7b..618c97ee85 100644
--- a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java
+++ b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java
@@ -111,6 +111,9 @@ private SystemConfig() {
     private int bufferPoolPageSize = 1024 * 1024 * 2;
     //minimum allocation unit
     private short bufferPoolChunkSize = 4096;
+    //used for cursor temp result
+    private int maxHeapTableSize = 4096;
+    private int heapTableBufferChunkSize = -1;
     // buffer pool page number
     private short bufferPoolPageNumber = (short) (Platform.getMaxDirectMemory() * 0.8 / bufferPoolPageSize);
     private boolean useDefaultPageNumber = true;
@@ -1184,6 +1187,25 @@ public void setTraceEndPoint(String traceEndPoint) {
         this.traceEndPoint = traceEndPoint;
     }
 
+    public int getMaxHeapTableSize() {
+        return maxHeapTableSize;
+    }
+
+    public void setMaxHeapTableSize(int maxHeapTableSize) {
+        this.maxHeapTableSize = maxHeapTableSize;
+    }
+
+    public int getHeapTableBufferChunkSize() {
+        if (heapTableBufferChunkSize == -1) {
+            return bufferPoolChunkSize;
+        }
+        return heapTableBufferChunkSize;
+    }
+
+    public void setHeapTableBufferChunkSize(int heapTableBufferChunkSize) {
+        this.heapTableBufferChunkSize = heapTableBufferChunkSize;
+    }
+
     @Override
     public String toString() {
         return "SystemConfig [" +
@@ -1266,6 +1288,8 @@ public String toString() {
                 ", useOuterHa=" + useOuterHa +
                 ", fakeMySQLVersion=" + fakeMySQLVersion +
                 ", traceEndPoint=" + traceEndPoint +
+                ", maxHeapTableSize=" + maxHeapTableSize +
+                ", heapTableBufferChunkSize=" + heapTableBufferChunkSize +
                 "]";
     }
 }
diff --git a/src/main/java/com/actiontech/dble/net/handler/FrontendPrepareHandler.java b/src/main/java/com/actiontech/dble/net/handler/FrontendPrepareHandler.java
index 210083c933..3a3c8d4249 100644
--- a/src/main/java/com/actiontech/dble/net/handler/FrontendPrepareHandler.java
+++ b/src/main/java/com/actiontech/dble/net/handler/FrontendPrepareHandler.java
@@ -24,4 +24,6 @@ public interface FrontendPrepareHandler {
 
     void clear();
 
+    void fetch(byte[] data);
+
 }
diff --git a/src/main/java/com/actiontech/dble/net/mysql/CursorTypeFlags.java b/src/main/java/com/actiontech/dble/net/mysql/CursorTypeFlags.java
new file mode 100644
index 0000000000..c0d4f44bd8
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/net/mysql/CursorTypeFlags.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright (C) 2016-2020 ActionTech.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+
+package com.actiontech.dble.net.mysql;
+
+public final class CursorTypeFlags {
+    private CursorTypeFlags() {
+    }
+
+
+    public static final int CURSOR_TYPE_NO_CURSOR = 0;
+    public static final int CURSOR_TYPE_READ_ONLY = 1;
+    public static final int CURSOR_TYPE_FOR_UPDATE = 2;
+    public static final int CURSOR_TYPE_SCROLLABLE = 4;
+}
diff --git a/src/main/java/com/actiontech/dble/net/mysql/ExecutePacket.java b/src/main/java/com/actiontech/dble/net/mysql/ExecutePacket.java
index 33ef017ddb..67e669a163 100644
--- a/src/main/java/com/actiontech/dble/net/mysql/ExecutePacket.java
+++ b/src/main/java/com/actiontech/dble/net/mysql/ExecutePacket.java
@@ -68,6 +68,7 @@ public class ExecutePacket extends MySQLPacket {
 
     private BindValue[] values;
     private PreparedStatement preStmt;
+    private byte flag;
 
     public ExecutePacket(PreparedStatement preStmt) {
         this.preStmt = preStmt;
@@ -80,7 +81,7 @@ public void read(byte[] data, CharsetNames charset) throws UnsupportedEncodingEx
         packetId = mm.read();
         mm.read(); //[17] COM_STMT_EXECUTE
         mm.readUB4(); //stmt-id
-        mm.read(); // flags
+        flag = mm.read(); // flags
         mm.readUB4(); //iteration-count
 
         int parameterCount = values.length;
@@ -142,9 +143,14 @@ protected String getPacketInfo() {
     }
 
 
-
     public BindValue[] getValues() {
         return values;
     }
 
+    /**
+     * @return the flag
+     */
+    public byte getFlag() {
+        return flag;
+    }
 }
diff --git a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java
index 4cee3634f6..9852604d0d 100644
--- a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java
+++ b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java
@@ -83,7 +83,7 @@ public class NonBlockingSession extends Session {
     private SavePointHandler savePointHandler;
     private TransactionHandlerManager transactionManager;
 
-    private boolean prepared;
+
     private volatile boolean needWaitFinished = false;
 
     // kill query
@@ -934,13 +934,6 @@ public String getSessionXaID() {
         return transactionManager.getSessionXaID();
     }
 
-    public boolean isPrepared() {
-        return prepared;
-    }
-
-    public void setPrepared(boolean prepared) {
-        this.prepared = prepared;
-    }
 
     public MySQLResponseService freshConn(BackendConnection errConn, ResponseHandler queryHandler) {
         for (final RouteResultsetNode node : this.getTargetKeys()) {
diff --git a/src/main/java/com/actiontech/dble/server/RequestScope.java b/src/main/java/com/actiontech/dble/server/RequestScope.java
new file mode 100644
index 0000000000..b2c792e9d9
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/server/RequestScope.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+package com.actiontech.dble.server;
+
+import com.actiontech.dble.backend.mysql.PreparedStatement;
+import com.actiontech.dble.server.variables.OutputStateEnum;
+
+import java.io.Closeable;
+
+/**
+ * these variable only take effect in one request scope.
+ * When new request coming,all of those will re-init.
+ *
+ * @author dcy
+ */
+public class RequestScope implements Closeable {
+    private OutputStateEnum outputState = OutputStateEnum.NORMAL_QUERY;
+    private boolean usingCursor = false;
+    private boolean prepared = false;
+    private boolean usingJoin = false;
+    private PreparedStatement currentPreparedStatement;
+
+
+    public boolean isUsingJoin() {
+        return usingJoin;
+    }
+
+    public void setUsingJoin(boolean usingJoin) {
+        this.usingJoin = usingJoin;
+    }
+
+    public OutputStateEnum getOutputState() {
+        return outputState;
+    }
+
+    public void setOutputState(OutputStateEnum outputState) {
+        this.outputState = outputState;
+    }
+
+    public boolean isUsingCursor() {
+        return usingCursor;
+    }
+
+    public void setUsingCursor(boolean usingCursor) {
+        this.usingCursor = usingCursor;
+    }
+
+    public boolean isPrepared() {
+        return prepared;
+    }
+
+    public void setPrepared(boolean prepared) {
+        this.prepared = prepared;
+    }
+
+
+    public PreparedStatement getCurrentPreparedStatement() {
+        return currentPreparedStatement;
+    }
+
+    public void setCurrentPreparedStatement(PreparedStatement currentPreparedStatement) {
+        this.currentPreparedStatement = currentPreparedStatement;
+    }
+
+
+    @Override
+    public void close() {
+        //recycle disk resource if needed.
+    }
+
+
+}
diff --git a/src/main/java/com/actiontech/dble/server/handler/ServerPrepareHandler.java b/src/main/java/com/actiontech/dble/server/handler/ServerPrepareHandler.java
index 72e2f7b712..1543a708a6 100644
--- a/src/main/java/com/actiontech/dble/server/handler/ServerPrepareHandler.java
+++ b/src/main/java/com/actiontech/dble/server/handler/ServerPrepareHandler.java
@@ -8,16 +8,21 @@
 import com.actiontech.dble.backend.mysql.BindValue;
 import com.actiontech.dble.backend.mysql.ByteUtil;
 import com.actiontech.dble.backend.mysql.PreparedStatement;
+import com.actiontech.dble.backend.mysql.store.CursorCache;
 import com.actiontech.dble.config.ErrorCode;
 import com.actiontech.dble.config.Fields;
 import com.actiontech.dble.net.handler.FrontendPrepareHandler;
-import com.actiontech.dble.net.mysql.ExecutePacket;
-import com.actiontech.dble.net.mysql.LongDataPacket;
-import com.actiontech.dble.net.mysql.OkPacket;
-import com.actiontech.dble.net.mysql.ResetPacket;
+import com.actiontech.dble.net.mysql.*;
+import com.actiontech.dble.server.RequestScope;
+import com.actiontech.dble.server.parser.PrepareChangeVisitor;
+import com.actiontech.dble.server.parser.PrepareStatementCalculateVisitor;
 import com.actiontech.dble.server.response.PreparedStmtResponse;
+import com.actiontech.dble.server.variables.OutputStateEnum;
 import com.actiontech.dble.services.mysqlsharding.ShardingService;
 import com.actiontech.dble.util.HexFormatUtil;
+import com.alibaba.druid.sql.SQLUtils;
+import com.alibaba.druid.sql.ast.SQLStatement;
+import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
 import com.google.common.escape.Escaper;
 import com.google.common.escape.Escapers;
 import com.google.common.escape.Escapers.Builder;
@@ -27,9 +32,15 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
+import static com.actiontech.dble.net.mysql.StatusFlags.SERVER_STATUS_CURSOR_EXISTS;
+import static com.alibaba.druid.util.JdbcConstants.MYSQL;
+
 /**
  * @author mycat, CrazyPig, zhuam
  */
@@ -60,11 +71,40 @@ public void prepare(String sql) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("use server prepare, sql: " + sql);
         }
-        int columnCount = getColumnCount(sql);
-        int paramCount = getParamCount(sql);
-        PreparedStatement pStmt = new PreparedStatement(++pStmtId, sql, columnCount, paramCount);
+
+        final List statements = SQLUtils.parseStatements(sql, MYSQL, true);
+        if (statements.isEmpty()) {
+            service.writeErrMessage(ErrorCode.ERR_WRONG_USED, "can't parse sql into statement");
+            return;
+        }
+        if (statements.size() > 1) {
+            service.writeErrMessage(ErrorCode.ERR_WRONG_USED, "can't use more than one statement in prepare-statement");
+            return;
+        }
+        final SQLStatement sqlStatement = statements.get(0);
+
+        int paramCount = getParamCount(sqlStatement);
+        PreparedStatement pStmt = new PreparedStatement(++pStmtId, sql, paramCount);
+        final RequestScope requestScope = service.getRequestScope();
+        service.getRequestScope().setCurrentPreparedStatement(pStmt);
+        service.getRequestScope().setPrepared(true);
         pStmtForId.put(pStmt.getId(), pStmt);
-        PreparedStmtResponse.response(pStmt, service);
+        if (!(sqlStatement instanceof SQLSelectStatement)) {
+            //notSelect
+            PreparedStmtResponse.response(pStmt, service);
+        } else {
+            //isSelect,should calculate column count to support cursor if possible.
+            final PrepareChangeVisitor visitor = new PrepareChangeVisitor();
+            sqlStatement.accept(visitor);
+            requestScope.setOutputState(OutputStateEnum.PREPARE);
+            service.query(sqlStatement.toString());
+            requestScope.getCurrentPreparedStatement().setPrepareCallback((columnCount) -> {
+                pStmt.setColumnsNumber(columnCount);
+                PreparedStmtResponse.response(pStmt, service);
+            });
+        }
+
+
     }
 
     @Override
@@ -110,6 +150,8 @@ public void execute(byte[] data) {
         if ((pStmt = pStmtForId.get(statementId)) == null) {
             service.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, "Unknown pStmtId when executing.");
         } else {
+            service.getRequestScope().setCurrentPreparedStatement(pStmt);
+            service.getRequestScope().setPrepared(true);
             ExecutePacket packet = new ExecutePacket(pStmt);
             try {
                 packet.read(data, service.getCharset());
@@ -120,7 +162,10 @@ public void execute(byte[] data) {
             BindValue[] bindValues = packet.getValues();
             // reset the Parameter
             String sql = prepareStmtBindValue(pStmt, bindValues);
-            service.getSession2().setPrepared(true);
+
+            if (packet.getFlag() == CursorTypeFlags.CURSOR_TYPE_READ_ONLY) {
+                service.getRequestScope().setUsingCursor(true);
+            }
             if (LOGGER.isDebugEnabled()) {
                 LOGGER.debug("execute prepare sql: " + sql);
             }
@@ -136,29 +181,40 @@ public void close(byte[] data) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("close prepare stmt, stmtId = " + psId);
         }
+        final PreparedStatement preparedStatement = pStmtForId.get(psId);
+        if (preparedStatement != null) {
+            try {
+                preparedStatement.close();
+            } catch (Exception e) {
+                LOGGER.error("", e);
+            }
+        }
         pStmtForId.remove(psId);
     }
 
     @Override
     public void clear() {
+        for (PreparedStatement preparedStatement : this.pStmtForId.values()) {
+            try {
+                preparedStatement.close();
+            } catch (Exception e) {
+                LOGGER.error("", e);
+            }
+
+        }
         this.pStmtForId.clear();
     }
 
-    // TODO:the size of columns of prepared statement
     private int getColumnCount(String sql) {
-        return 0;
+        throw new UnsupportedOperationException();
     }
 
+
     // the size of parameters of prepared statement
-    private int getParamCount(String sql) {
-        char[] cArr = sql.toCharArray();
-        int count = 0;
-        for (char aCArr : cArr) {
-            if (aCArr == '?') {
-                count++;
-            }
-        }
-        return count;
+    private int getParamCount(SQLStatement statement) {
+        final PrepareStatementCalculateVisitor visitor = new PrepareStatementCalculateVisitor();
+        statement.accept(visitor);
+        return visitor.getArgumentCount();
     }
 
     /**
@@ -243,4 +299,64 @@ private String prepareStmtBindValue(PreparedStatement pStmt, BindValue[] bindVal
         return sb.toString();
     }
 
+    @Override
+    public void fetch(byte[] data) {
+
+        long statementId = ByteUtil.readUB4(data, 4 + 1); //skip to read
+        PreparedStatement pStmt;
+        if ((pStmt = pStmtForId.get(statementId)) == null) {
+            service.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, "Unknown pStmtId when executing.");
+        } else {
+
+            service.getRequestScope().setCurrentPreparedStatement(pStmt);
+            service.getRequestScope().setPrepared(true);
+
+            long expectSize = ByteUtil.readUB4(data, 4 + 1 + 4);
+            final CursorCache cursorCache = pStmt.getCursorCache();
+            final List fieldPackets = service.getRequestScope().getCurrentPreparedStatement().getFieldPackets();
+            ByteBuffer buffer = service.getSession2().getSource().allocate();
+            try {
+                int packetId = 1;
+                final Iterator rowDataPacketIt = cursorCache.fetchBatch(expectSize);
+                while (rowDataPacketIt.hasNext()) {
+                    final RowDataPacket dataPacket = rowDataPacketIt.next();
+                    BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
+                    binRowDataPk.read(fieldPackets, dataPacket);
+                    binRowDataPk.setPacketId(packetId++);
+                    buffer = binRowDataPk.write(buffer, service, true);
+                }
+                if (packetId == 1) {
+                    /*
+                    no more rows
+                     */
+                    try {
+                        pStmt.close();
+                    } catch (Exception e) {
+                        LOGGER.error("", e);
+                    }
+                }
+
+
+                EOFPacket ok = new EOFPacket();
+                ok.setPacketId(packetId++);
+
+                //            ok.setAffectedRows(0);
+                //            ok.setInsertId(0);
+                int statusFlag = 0;
+                statusFlag |= service.getSession2().getShardingService().isAutocommit() ? 2 : 1;
+                statusFlag |= SERVER_STATUS_CURSOR_EXISTS;
+                ok.setStatus(statusFlag);
+                ok.setWarningCount(0);
+                ok.write(buffer, service, true);
+                service.writeDirectly(buffer);
+
+
+            } finally {
+                service.getSession2().getSource().recycle(buffer);
+            }
+
+        }
+    }
+
+
 }
diff --git a/src/main/java/com/actiontech/dble/server/parser/PrepareChangeVisitor.java b/src/main/java/com/actiontech/dble/server/parser/PrepareChangeVisitor.java
new file mode 100644
index 0000000000..5b0f9a8aad
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/server/parser/PrepareChangeVisitor.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+
+package com.actiontech.dble.server.parser;
+
+import com.alibaba.druid.sql.SQLUtils;
+import com.alibaba.druid.sql.ast.SQLExpr;
+import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator;
+import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
+import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter;
+
+/**
+ * @author dcy
+ * Create Date: 2020-12-25
+ */
+public class PrepareChangeVisitor extends MySqlASTVisitorAdapter {
+    @Override
+    public boolean visit(MySqlSelectQueryBlock x) {
+
+        /*
+            Change sql.
+            append the  '1!=1' condition.
+            So every sql  return field packets with zero rows.
+         */
+        final SQLExpr sqlExpr = SQLUtils.buildCondition(SQLBinaryOperator.BooleanAnd, SQLUtils.toSQLExpr("1 != 1"), true, x.getWhere());
+        x.setWhere(sqlExpr);
+        /*
+            in single node mysql ,one '1!=1' condition is enough.It always return zero rows.
+            because dble split complex query before send.So every query should append this condition.So...there need return true to access nested select.
+         */
+        return true;
+    }
+
+
+}
diff --git a/src/main/java/com/actiontech/dble/server/parser/PrepareStatementCalculateVisitor.java b/src/main/java/com/actiontech/dble/server/parser/PrepareStatementCalculateVisitor.java
new file mode 100644
index 0000000000..5f237edec6
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/server/parser/PrepareStatementCalculateVisitor.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+
+package com.actiontech.dble.server.parser;
+
+import com.alibaba.druid.sql.ast.SQLObject;
+import com.alibaba.druid.sql.ast.expr.SQLVariantRefExpr;
+import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
+import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter;
+
+/**
+ * @author dcy
+ * Create Date: 2020-12-24
+ */
+public class PrepareStatementCalculateVisitor extends MySqlASTVisitorAdapter {
+    private int argumentCount = 0;
+    private boolean selectStatement = false;
+
+    public PrepareStatementCalculateVisitor() {
+    }
+
+    @Override
+    public boolean visit(SQLVariantRefExpr x) {
+        argumentCount++;
+        return false;
+    }
+
+
+    @Override
+    public boolean visit(MySqlSelectQueryBlock x) {
+        selectStatement = true;
+        /*
+        access 'where' only
+         */
+        accept(x.getWhere());
+
+        return false;
+    }
+
+
+    public boolean isSelectStatement() {
+        return selectStatement;
+    }
+
+    public int getArgumentCount() {
+        return argumentCount;
+    }
+
+    //    private  void accept(List list) {
+    //        if (list != null) {
+    //            list.forEach(node -> node.accept(this));
+    //        }
+    //    }
+
+    private  void accept(T e) {
+        if (e != null) {
+            e.accept(this);
+        }
+    }
+
+
+}
diff --git a/src/main/java/com/actiontech/dble/server/variables/OutputStateEnum.java b/src/main/java/com/actiontech/dble/server/variables/OutputStateEnum.java
new file mode 100644
index 0000000000..fdd1448053
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/server/variables/OutputStateEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+
+package com.actiontech.dble.server.variables;
+
+/**
+ * @author dcy
+ * Create Date: 2020-12-24
+ */
+public enum OutputStateEnum {
+    /**
+     *
+     */
+    NORMAL_QUERY,
+    /**
+     * used for prepare
+     */
+    PREPARE,
+
+}
diff --git a/src/main/java/com/actiontech/dble/services/factorys/FinalHandlerFactory.java b/src/main/java/com/actiontech/dble/services/factorys/FinalHandlerFactory.java
new file mode 100644
index 0000000000..bfdb098962
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/services/factorys/FinalHandlerFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2016-2020 ActionTech.
+ * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
+ * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
+ */
+
+package com.actiontech.dble.services.factorys;
+
+import com.actiontech.dble.backend.mysql.nio.handler.builder.BaseHandlerBuilder;
+import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
+import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler;
+import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandlerForPrepare;
+import com.actiontech.dble.server.NonBlockingSession;
+import com.actiontech.dble.server.variables.OutputStateEnum;
+
+/**
+ * Created by szf on 2020/6/28.
+ */
+public final class FinalHandlerFactory {
+
+    private FinalHandlerFactory() {
+    }
+
+    public static DMLResponseHandler createFinalHandler(NonBlockingSession session) {
+        final OutputStateEnum outputState = session.getShardingService().getRequestScope().getOutputState();
+
+        switch (outputState) {
+            case NORMAL_QUERY:
+                return new OutputHandler(BaseHandlerBuilder.getSequenceId(), session);
+            case PREPARE:
+                return new OutputHandlerForPrepare(BaseHandlerBuilder.getSequenceId(), session);
+            default:
+                throw new UnsupportedOperationException("illegal outputState");
+        }
+
+
+    }
+
+
+}
diff --git a/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java b/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java
index da51569e00..bbb5f51637 100644
--- a/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java
+++ b/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java
@@ -17,6 +17,7 @@
 import com.actiontech.dble.net.service.ServiceTask;
 import com.actiontech.dble.route.RouteResultset;
 import com.actiontech.dble.server.NonBlockingSession;
+import com.actiontech.dble.server.RequestScope;
 import com.actiontech.dble.server.ServerQueryHandler;
 import com.actiontech.dble.server.ServerSptPrepare;
 import com.actiontech.dble.server.handler.ServerLoadDataInfileHandler;
@@ -84,6 +85,7 @@ public class ShardingService extends BusinessService {
     private final NonBlockingSession session;
     private boolean sessionReadOnly = false;
     private ServerSptPrepare sptprepare;
+    private volatile RequestScope requestScope;
 
     public ShardingService(AbstractConnection connection) {
         super(connection);
@@ -98,6 +100,10 @@ public ShardingService(AbstractConnection connection) {
         this.proto = new MySQLProtoHandlerImpl();
     }
 
+    public RequestScope getRequestScope() {
+        return requestScope;
+    }
+
     @Override
     public void handleVariable(MysqlVariable var) {
         String val = var.getValue();
@@ -207,77 +213,85 @@ protected void handleInnerData(byte[] data) {
             sc.changeUserAuthSwitch(data, changeUserPacket);
             return;
         }*/
-        switch (data[4]) {
-            case MySQLPacket.COM_INIT_DB:
-                commands.doInitDB();
-                protoLogicHandler.initDB(data);
-                break;
-            case MySQLPacket.COM_QUERY:
-                commands.doQuery();
-                protoLogicHandler.query(data);
-                break;
-            case MySQLPacket.COM_PING:
-                commands.doPing();
-                Ping.response(connection);
-                break;
-            case MySQLPacket.COM_HEARTBEAT:
-                commands.doHeartbeat();
-                Heartbeat.response(connection, data);
-                break;
-            case MySQLPacket.COM_QUIT:
-                commands.doQuit();
-                connection.close("quit cmd");
-                break;
-            case MySQLPacket.COM_STMT_PREPARE:
-                commands.doStmtPrepare();
-                String prepareSql = protoLogicHandler.stmtPrepare(data);
-                // record SQL
-                if (prepareSql != null) {
-                    this.setExecuteSql(prepareSql);
-                    prepareHandler.prepare(prepareSql);
-                }
-                break;
-            case MySQLPacket.COM_STMT_SEND_LONG_DATA:
-                commands.doStmtSendLongData();
-                blobDataQueue.offer(data);
-                break;
-            case MySQLPacket.COM_STMT_CLOSE:
-                commands.doStmtClose();
-                stmtClose(data);
-                break;
-            case MySQLPacket.COM_STMT_RESET:
-                commands.doStmtReset();
-                blobDataQueue.clear();
-                prepareHandler.reset(data);
-                break;
-            case MySQLPacket.COM_STMT_EXECUTE:
-                commands.doStmtExecute();
-                this.stmtExecute(data, blobDataQueue);
-                break;
-            case MySQLPacket.COM_SET_OPTION:
-                commands.doOther();
-                protoLogicHandler.setOption(data);
-                break;
-            case MySQLPacket.COM_CHANGE_USER:
-                commands.doOther();
-                /* changeUserPacket = new ChangeUserPacket(sc.getClientFlags(), CharsetUtil.getCollationIndex(sc.getCharset().getCollation()));
-                sc.changeUser(data, changeUserPacket, isAuthSwitch);*/
-                break;
-            case MySQLPacket.COM_RESET_CONNECTION:
-                commands.doOther();
-                protoLogicHandler.resetConnection();
-                break;
-            case MySQLPacket.COM_FIELD_LIST:
-                commands.doOther();
-                protoLogicHandler.fieldList(data);
-                break;
-            case MySQLPacket.COM_PROCESS_KILL:
-                commands.doKill();
-                writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
-                break;
-            default:
-                commands.doOther();
-                writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
+        try (RequestScope requestScope = new RequestScope()) {
+            this.requestScope = requestScope;
+            switch (data[4]) {
+                case MySQLPacket.COM_INIT_DB:
+                    commands.doInitDB();
+                    protoLogicHandler.initDB(data);
+                    break;
+                case MySQLPacket.COM_QUERY:
+                    commands.doQuery();
+                    protoLogicHandler.query(data);
+                    break;
+                case MySQLPacket.COM_PING:
+                    commands.doPing();
+                    Ping.response(connection);
+                    break;
+                case MySQLPacket.COM_HEARTBEAT:
+                    commands.doHeartbeat();
+                    Heartbeat.response(connection, data);
+                    break;
+                case MySQLPacket.COM_QUIT:
+                    commands.doQuit();
+                    connection.close("quit cmd");
+                    break;
+                case MySQLPacket.COM_STMT_PREPARE:
+                    commands.doStmtPrepare();
+                    String prepareSql = protoLogicHandler.stmtPrepare(data);
+                    // record SQL
+                    if (prepareSql != null) {
+                        this.setExecuteSql(prepareSql);
+                        prepareHandler.prepare(prepareSql);
+                    }
+                    break;
+                case MySQLPacket.COM_STMT_SEND_LONG_DATA:
+                    commands.doStmtSendLongData();
+                    blobDataQueue.offer(data);
+                    break;
+                case MySQLPacket.COM_STMT_CLOSE:
+                    commands.doStmtClose();
+                    stmtClose(data);
+                    break;
+                case MySQLPacket.COM_STMT_RESET:
+                    commands.doStmtReset();
+                    blobDataQueue.clear();
+                    prepareHandler.reset(data);
+                    break;
+                case MySQLPacket.COM_STMT_EXECUTE:
+                    commands.doStmtExecute();
+                    this.stmtExecute(data, blobDataQueue);
+                    break;
+                case MySQLPacket.COM_STMT_FETCH:
+                    commands.doStmtFetch();
+                    this.stmtFetch(data);
+                    break;
+                case MySQLPacket.COM_SET_OPTION:
+                    commands.doOther();
+                    protoLogicHandler.setOption(data);
+                    break;
+                case MySQLPacket.COM_CHANGE_USER:
+                    commands.doOther();
+                    /* changeUserPacket = new ChangeUserPacket(sc.getClientFlags(), CharsetUtil.getCollationIndex(sc.getCharset().getCollation()));
+                    sc.changeUser(data, changeUserPacket, isAuthSwitch);*/
+                    break;
+                case MySQLPacket.COM_RESET_CONNECTION:
+                    commands.doOther();
+                    protoLogicHandler.resetConnection();
+                    break;
+                case MySQLPacket.COM_FIELD_LIST:
+                    commands.doOther();
+                    protoLogicHandler.fieldList(data);
+                    break;
+                case MySQLPacket.COM_PROCESS_KILL:
+                    commands.doKill();
+                    writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
+                    break;
+                default:
+                    commands.doOther();
+                    writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
+            }
+
         }
     }
 
@@ -333,6 +347,14 @@ public void stmtExecute(byte[] data, Queue dataQueue) {
         }
     }
 
+    public void stmtFetch(byte[] data) {
+        if (prepareHandler != null) {
+            prepareHandler.fetch(data);
+        } else {
+            writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Prepare unsupported!");
+        }
+    }
+
     public void stmtSendLongData(byte[] data) {
         if (prepareHandler != null) {
             prepareHandler.sendLongData(data);
@@ -613,6 +635,9 @@ public void cleanup() {
         if (getLoadDataInfileHandler() != null) {
             getLoadDataInfileHandler().clear();
         }
+        if (prepareHandler != null) {
+            prepareHandler.clear();
+        }
     }
 
     protected void sessionStart() {
diff --git a/src/main/java/com/actiontech/dble/statistic/CommandCount.java b/src/main/java/com/actiontech/dble/statistic/CommandCount.java
index fb5d191ef4..bacad72fce 100644
--- a/src/main/java/com/actiontech/dble/statistic/CommandCount.java
+++ b/src/main/java/com/actiontech/dble/statistic/CommandCount.java
@@ -16,6 +16,7 @@ public class CommandCount {
     private long stmtSendLongData;
     private long stmtReset;
     private long stmtExecute;
+    private long stmtFetch;
     private long stmtClose;
     private long ping;
     private long kill;
@@ -75,6 +76,14 @@ public long stmtExecuteCount() {
         return stmtExecute;
     }
 
+    public void doStmtFetch() {
+        ++stmtFetch;
+    }
+
+    public long stmtFetchCount() {
+        return stmtFetch;
+    }
+
     public void doStmtClose() {
         ++stmtClose;
     }