diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index fb13633319bdae3..cf1573810c9419a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -758,8 +758,7 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException UUID uuid = UUID.randomUUID(); queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); } - - executor.execute(queryId); + executor.queryRetry(queryId); } catch (IOException e) { // Client failed. LOG.warn("Process one query failed because IOException: ", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index fcd0d25a1303206..783b6b954905633 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -528,10 +528,15 @@ public boolean isHandleQueryInFe() { public void execute() throws Exception { UUID uuid = UUID.randomUUID(); TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - TUniqueId firstQueryId = queryId; if (Config.enable_print_request_before_execution) { LOG.info("begin to execute query {} {}", queryId, originStmt == null ? "null" : originStmt.originStmt); } + queryRetry(queryId); + } + + public void queryRetry(TUniqueId queryId) throws Exception { + TUniqueId firstQueryId = queryId; + UUID uuid; int retryTime = Config.max_query_retry_time; retryTime = retryTime <= 0 ? 1 : retryTime + 1; for (int i = 1; i <= retryTime; i++) { @@ -751,6 +756,9 @@ private void executeByNereids(TUniqueId queryId) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug("Command({}) process failed.", originStmt.originStmt, e); } + if (Config.isCloudMode() && e.getDetailMessage().contains(FeConstants.CLOUD_RETRY_E230)) { + throw e; + } context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); throw new NereidsException("Command (" + originStmt.originStmt + ") process failed", new AnalysisException(e.getMessage(), e)); diff --git a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy index 8f96d1fd9d265d8..2d8ca3f529674d5 100644 --- a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy +++ b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy @@ -24,120 +24,127 @@ suite("test_retry_e-230") { } def options = new ClusterOptions() options.enableDebugPoints() - options.setFeNum(1) + // one master, one observer + options.setFeNum(2) options.feConfigs.add('max_query_retry_time=100') options.feConfigs.add('sys_log_verbose_modules=org') options.setBeNum(1) options.cloudMode = true - docker(options) { - def tbl = 'test_retry_e_230_tbl' - def tbl1 = 'table_1' - def tbl2 = 'table_2' - sql """ DROP TABLE IF EXISTS ${tbl} """ - sql """ DROP TABLE IF EXISTS ${tbl1} """ - sql """ DROP TABLE IF EXISTS ${tbl2} """ - try { - sql """set global experimental_enable_pipeline_x_engine=false""" - cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null]) + // 1. connect to master + options.connectToFollower = false + for (def j = 0; j < 2; j++) { + docker(options) { + def tbl = 'test_retry_e_230_tbl' + def tbl1 = 'table_1' + def tbl2 = 'table_2' + sql """ DROP TABLE IF EXISTS ${tbl} """ + sql """ DROP TABLE IF EXISTS ${tbl1} """ + sql """ DROP TABLE IF EXISTS ${tbl2} """ + try { + sql """set global experimental_enable_pipeline_x_engine=false""" + cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null]) - sql """ - CREATE TABLE ${tbl} ( - `k1` int(11) NULL, - `k2` int(11) NULL - ) - DUPLICATE KEY(`k1`, `k2`) - COMMENT 'OLAP' - DISTRIBUTED BY HASH(`k1`) BUCKETS 1 - PROPERTIES ( - "replication_num"="1" - ); - """ - for (def i = 1; i <= 5; i++) { - sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})" - } + sql """ + CREATE TABLE ${tbl} ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num"="1" + ); + """ + for (def i = 1; i <= 5; i++) { + sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})" + } - cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null]) - def futrue1 = thread { - Thread.sleep(3000) - cluster.clearBackendDebugPoints() - } + cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null]) + def futrue1 = thread { + Thread.sleep(3000) + cluster.clearBackendDebugPoints() + } - def begin = System.currentTimeMillis(); - def futrue2 = thread { - def result = try_sql """select * from ${tbl}""" - } + def begin = System.currentTimeMillis(); + def futrue2 = thread { + def result = try_sql """select * from ${tbl}""" + } - futrue2.get() - def cost = System.currentTimeMillis() - begin; - log.info("time cost: {}", cost) - futrue1.get() - // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s - assertTrue(cost > 3000 && cost < 100000) + futrue2.get() + def cost = System.currentTimeMillis() - begin; + log.info("time cost: {}", cost) + futrue1.get() + // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s + assertTrue(cost > 3000 && cost < 100000) - sql """ - CREATE TABLE IF NOT EXISTS ${tbl1} ( - `siteid` int(11) NOT NULL COMMENT "", - `citycode` int(11) NOT NULL COMMENT "", - `userid` int(11) NOT NULL COMMENT "", - `pv` int(11) NOT NULL COMMENT "" - ) ENGINE=OLAP - DUPLICATE KEY(`siteid`) - COMMENT "OLAP" - DISTRIBUTED BY HASH(`siteid`) BUCKETS 1 - PROPERTIES ( - "replication_allocation" = "tag.location.default: 1", - "in_memory" = "false", - "storage_format" = "V2" - ) - """ + sql """ + CREATE TABLE IF NOT EXISTS ${tbl1} ( + `siteid` int(11) NOT NULL COMMENT "", + `citycode` int(11) NOT NULL COMMENT "", + `userid` int(11) NOT NULL COMMENT "", + `pv` int(11) NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`siteid`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`siteid`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ - sql """ - CREATE TABLE IF NOT EXISTS ${tbl2} ( - `siteid` int(11) NOT NULL COMMENT "", - `citycode` int(11) NOT NULL COMMENT "", - `userid` int(11) NOT NULL COMMENT "", - `pv` int(11) NOT NULL COMMENT "" - ) ENGINE=OLAP - DUPLICATE KEY(`siteid`) - COMMENT "OLAP" - DISTRIBUTED BY HASH(`siteid`) BUCKETS 1 - PROPERTIES ( - "replication_allocation" = "tag.location.default: 1", - "in_memory" = "false", - "storage_format" = "V2" - ) - """ + sql """ + CREATE TABLE IF NOT EXISTS ${tbl2} ( + `siteid` int(11) NOT NULL COMMENT "", + `citycode` int(11) NOT NULL COMMENT "", + `userid` int(11) NOT NULL COMMENT "", + `pv` int(11) NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`siteid`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`siteid`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ - sql """ - insert into ${tbl1} values (9,10,11,12), (1,2,3,4) - """ + sql """ + insert into ${tbl1} values (9,10,11,12), (1,2,3,4) + """ - // dp again - cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null]) + // dp again + cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null]) - def futrue3 = thread { - Thread.sleep(4000) - cluster.clearBackendDebugPoints() - } + def futrue3 = thread { + Thread.sleep(4000) + cluster.clearBackendDebugPoints() + } - begin = System.currentTimeMillis(); - def futrue4 = thread { - def result = try_sql """insert into ${tbl2} select * from ${tbl1}""" - } + begin = System.currentTimeMillis(); + def futrue4 = thread { + def result = try_sql """insert into ${tbl2} select * from ${tbl1}""" + } - futrue4.get() - cost = System.currentTimeMillis() - begin; - log.info("time cost insert into select : {}", cost) - futrue3.get() - // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s - assertTrue(cost > 4000 && cost < 100000) + futrue4.get() + cost = System.currentTimeMillis() - begin; + log.info("time cost insert into select : {}", cost) + futrue3.get() + // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s + assertTrue(cost > 4000 && cost < 100000) - } finally { - cluster.clearFrontendDebugPoints() - cluster.clearBackendDebugPoints() - sql """ DROP TABLE IF EXISTS ${tbl} """ - sql """ DROP TABLE IF EXISTS ${tbl1} """ - sql """ DROP TABLE IF EXISTS ${tbl2} """ - } + } finally { + cluster.clearFrontendDebugPoints() + cluster.clearBackendDebugPoints() + sql """ DROP TABLE IF EXISTS ${tbl} """ + sql """ DROP TABLE IF EXISTS ${tbl1} """ + sql """ DROP TABLE IF EXISTS ${tbl2} """ + } + } + // 2. connect to follower + options.connectToFollower = true } }