Skip to content

Commit

Permalink
[fix](cloud) Fix cloud meet e-230 not retry query from observer (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng authored and seawinde committed Jul 17, 2024
1 parent ce0f672 commit 3ebbf93
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -758,8 +758,7 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
UUID uuid = UUID.randomUUID();
queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
}

executor.execute(queryId);
executor.queryRetry(queryId);
} catch (IOException e) {
// Client failed.
LOG.warn("Process one query failed because IOException: ", e);
Expand Down
10 changes: 9 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,15 @@ public boolean isHandleQueryInFe() {
public void execute() throws Exception {
UUID uuid = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
TUniqueId firstQueryId = queryId;
if (Config.enable_print_request_before_execution) {
LOG.info("begin to execute query {} {}", queryId, originStmt == null ? "null" : originStmt.originStmt);
}
queryRetry(queryId);
}

public void queryRetry(TUniqueId queryId) throws Exception {
TUniqueId firstQueryId = queryId;
UUID uuid;
int retryTime = Config.max_query_retry_time;
retryTime = retryTime <= 0 ? 1 : retryTime + 1;
for (int i = 1; i <= retryTime; i++) {
Expand Down Expand Up @@ -751,6 +756,9 @@ private void executeByNereids(TUniqueId queryId) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Command({}) process failed.", originStmt.originStmt, e);
}
if (Config.isCloudMode() && e.getDetailMessage().contains(FeConstants.CLOUD_RETRY_E230)) {
throw e;
}
context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
throw new NereidsException("Command (" + originStmt.originStmt + ") process failed",
new AnalysisException(e.getMessage(), e));
Expand Down
205 changes: 106 additions & 99 deletions regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -24,120 +24,127 @@ suite("test_retry_e-230") {
}
def options = new ClusterOptions()
options.enableDebugPoints()
options.setFeNum(1)
// one master, one observer
options.setFeNum(2)
options.feConfigs.add('max_query_retry_time=100')
options.feConfigs.add('sys_log_verbose_modules=org')
options.setBeNum(1)
options.cloudMode = true
docker(options) {
def tbl = 'test_retry_e_230_tbl'
def tbl1 = 'table_1'
def tbl2 = 'table_2'
sql """ DROP TABLE IF EXISTS ${tbl} """
sql """ DROP TABLE IF EXISTS ${tbl1} """
sql """ DROP TABLE IF EXISTS ${tbl2} """
try {
sql """set global experimental_enable_pipeline_x_engine=false"""
cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null])
// 1. connect to master
options.connectToFollower = false
for (def j = 0; j < 2; j++) {
docker(options) {
def tbl = 'test_retry_e_230_tbl'
def tbl1 = 'table_1'
def tbl2 = 'table_2'
sql """ DROP TABLE IF EXISTS ${tbl} """
sql """ DROP TABLE IF EXISTS ${tbl1} """
sql """ DROP TABLE IF EXISTS ${tbl2} """
try {
sql """set global experimental_enable_pipeline_x_engine=false"""
cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null])

sql """
CREATE TABLE ${tbl} (
`k1` int(11) NULL,
`k2` int(11) NULL
)
DUPLICATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num"="1"
);
"""
for (def i = 1; i <= 5; i++) {
sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
}
sql """
CREATE TABLE ${tbl} (
`k1` int(11) NULL,
`k2` int(11) NULL
)
DUPLICATE KEY(`k1`, `k2`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_num"="1"
);
"""
for (def i = 1; i <= 5; i++) {
sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
}

cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null])
def futrue1 = thread {
Thread.sleep(3000)
cluster.clearBackendDebugPoints()
}
cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null])
def futrue1 = thread {
Thread.sleep(3000)
cluster.clearBackendDebugPoints()
}

def begin = System.currentTimeMillis();
def futrue2 = thread {
def result = try_sql """select * from ${tbl}"""
}
def begin = System.currentTimeMillis();
def futrue2 = thread {
def result = try_sql """select * from ${tbl}"""
}

futrue2.get()
def cost = System.currentTimeMillis() - begin;
log.info("time cost: {}", cost)
futrue1.get()
// fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
assertTrue(cost > 3000 && cost < 100000)
futrue2.get()
def cost = System.currentTimeMillis() - begin;
log.info("time cost: {}", cost)
futrue1.get()
// fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
assertTrue(cost > 3000 && cost < 100000)

sql """
CREATE TABLE IF NOT EXISTS ${tbl1} (
`siteid` int(11) NOT NULL COMMENT "",
`citycode` int(11) NOT NULL COMMENT "",
`userid` int(11) NOT NULL COMMENT "",
`pv` int(11) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`siteid`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
"""
sql """
CREATE TABLE IF NOT EXISTS ${tbl1} (
`siteid` int(11) NOT NULL COMMENT "",
`citycode` int(11) NOT NULL COMMENT "",
`userid` int(11) NOT NULL COMMENT "",
`pv` int(11) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`siteid`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
"""

sql """
CREATE TABLE IF NOT EXISTS ${tbl2} (
`siteid` int(11) NOT NULL COMMENT "",
`citycode` int(11) NOT NULL COMMENT "",
`userid` int(11) NOT NULL COMMENT "",
`pv` int(11) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`siteid`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
"""
sql """
CREATE TABLE IF NOT EXISTS ${tbl2} (
`siteid` int(11) NOT NULL COMMENT "",
`citycode` int(11) NOT NULL COMMENT "",
`userid` int(11) NOT NULL COMMENT "",
`pv` int(11) NOT NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`siteid`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`siteid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
"""

sql """
insert into ${tbl1} values (9,10,11,12), (1,2,3,4)
"""
sql """
insert into ${tbl1} values (9,10,11,12), (1,2,3,4)
"""

// dp again
cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null])
// dp again
cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null])

def futrue3 = thread {
Thread.sleep(4000)
cluster.clearBackendDebugPoints()
}
def futrue3 = thread {
Thread.sleep(4000)
cluster.clearBackendDebugPoints()
}

begin = System.currentTimeMillis();
def futrue4 = thread {
def result = try_sql """insert into ${tbl2} select * from ${tbl1}"""
}
begin = System.currentTimeMillis();
def futrue4 = thread {
def result = try_sql """insert into ${tbl2} select * from ${tbl1}"""
}

futrue4.get()
cost = System.currentTimeMillis() - begin;
log.info("time cost insert into select : {}", cost)
futrue3.get()
// fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
assertTrue(cost > 4000 && cost < 100000)
futrue4.get()
cost = System.currentTimeMillis() - begin;
log.info("time cost insert into select : {}", cost)
futrue3.get()
// fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
assertTrue(cost > 4000 && cost < 100000)

} finally {
cluster.clearFrontendDebugPoints()
cluster.clearBackendDebugPoints()
sql """ DROP TABLE IF EXISTS ${tbl} """
sql """ DROP TABLE IF EXISTS ${tbl1} """
sql """ DROP TABLE IF EXISTS ${tbl2} """
}
} finally {
cluster.clearFrontendDebugPoints()
cluster.clearBackendDebugPoints()
sql """ DROP TABLE IF EXISTS ${tbl} """
sql """ DROP TABLE IF EXISTS ${tbl1} """
sql """ DROP TABLE IF EXISTS ${tbl2} """
}
}
// 2. connect to follower
options.connectToFollower = true
}
}

0 comments on commit 3ebbf93

Please sign in to comment.