Skip to content

Commit

Permalink
[regression-test][framework] support cases that can only run in non-c…
Browse files Browse the repository at this point in the history
…oncurrent-mode. (apache#26487)
  • Loading branch information
shuke987 authored and seawinde committed Nov 12, 2023
1 parent 05ee284 commit 6d3c2b0
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ class RegressionTest {
static GroovyShell shell
static ExecutorService scriptExecutors
static ExecutorService suiteExecutors
static ExecutorService singleSuiteExecutors
static ExecutorService actionExecutors
static ThreadLocal<Integer> threadLoadedClassNum = new ThreadLocal<>()
static final int cleanLoadedClassesThreshold = 20
static String nonConcurrentTestGroup = "nonConcurrent"

static void main(String[] args) {
CommandLine cmd = ConfigOptions.initCommands(args)
Expand All @@ -70,6 +72,7 @@ class RegressionTest {
}
actionExecutors.shutdown()
suiteExecutors.shutdown()
singleSuiteExecutors.shutdown()
scriptExecutors.shutdown()
log.info("Test finished")
if (!success) {
Expand All @@ -96,6 +99,12 @@ class RegressionTest {
.build();
suiteExecutors = Executors.newFixedThreadPool(config.suiteParallel, suiteFactory)

BasicThreadFactory singleSuiteFactory = new BasicThreadFactory.Builder()
.namingPattern("non-concurrent-thread-%d")
.priority(Thread.MAX_PRIORITY)
.build();
singleSuiteExecutors = Executors.newFixedThreadPool(1, singleSuiteFactory)

BasicThreadFactory actionFactory = new BasicThreadFactory.Builder()
.namingPattern("action-thread-%d")
.priority(Thread.MAX_PRIORITY)
Expand Down Expand Up @@ -131,9 +140,9 @@ class RegressionTest {
return sources
}

static void runScript(Config config, ScriptSource source, Recorder recorder) {
static void runScript(Config config, ScriptSource source, Recorder recorder, boolean isSingleThreadScript) {
def suiteFilter = { String suiteName, String groupName ->
canRun(config, suiteName, groupName)
canRun(config, suiteName, groupName, isSingleThreadScript)
}
def file = source.getFile()
int failureLimit = Integer.valueOf(config.otherConfigs.getOrDefault("max_failure_num", "-1").toString());
Expand All @@ -144,7 +153,14 @@ class RegressionTest {
return;
}
def eventListeners = getEventListeners(config, recorder)
new ScriptContext(file, suiteExecutors, actionExecutors,
ExecutorService executors = null
if (isSingleThreadScript) {
executors = singleSuiteExecutors
} else {
executors = suiteExecutors
}

new ScriptContext(file, executors, actionExecutors,
config, eventListeners, suiteFilter).start { scriptContext ->
try {
SuiteScript suiteScript = source.toScript(scriptContext, shell)
Expand All @@ -168,7 +184,26 @@ class RegressionTest {
scriptSources.eachWithIndex { source, i ->
// log.info("Prepare scripts [${i + 1}/${totalFile}]".toString())
def future = scriptExecutors.submit {
runScript(config, source, recorder)
runScript(config, source, recorder, false)
}
futures.add(future)
}

// wait all scripts
for (Future future : futures) {
try {
future.get()
} catch (Throwable t) {
// do nothing, because already save to Recorder
}
}

log.info('Start to run single scripts')
futures.clear()
scriptSources.eachWithIndex { source, i ->
// log.info("Prepare scripts [${i + 1}/${totalFile}]".toString())
def future = scriptExecutors.submit {
runScript(config, source, recorder, true)
}
futures.add(future)
}
Expand All @@ -192,8 +227,9 @@ class RegressionTest {
{ fileName -> fileName.substring(0, fileName.lastIndexOf(".")) == "load" })
}
log.info('Start to run scripts')
runScripts(config, recorder, directoryFilter,
runScripts(config, recorder, directoryFilter,
{ fileName -> fileName.substring(0, fileName.lastIndexOf(".")) != "load" })

return recorder
}

Expand Down Expand Up @@ -228,7 +264,18 @@ class RegressionTest {
return true
}

static boolean canRun(Config config, String suiteName, String group) {
static boolean canRun(Config config, String suiteName, String group, boolean isSingleThreadScript) {
Set<String> suiteGroups = group.split(',').collect { g -> g.trim() }.toSet();
if (isSingleThreadScript) {
if (!suiteGroups.contains(nonConcurrentTestGroup)) {
return false
}
} else {
if (suiteGroups.contains(nonConcurrentTestGroup)) {
return false
}
}

return filterGroups(config, group) && filterSuites(config, suiteName)
}

Expand Down
2 changes: 1 addition & 1 deletion regression-test/pipeline/p0/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ testDirectories = ""
excludeGroups = ""
// this suites will not be executed

excludeSuites = "test_sql_block_rule,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_jdbc_query_mysql"
excludeSuites = "test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_jdbc_query_mysql"

// this directories will not be executed
excludeDirectories = "workload_manager_p1,fault_injection_p0"
Expand Down
2 changes: 1 addition & 1 deletion regression-test/pipeline/p1/conf/regression-conf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ testSuites = ""
// this suites will not be executed
excludeSuites = "test_big_pad,test_profile,test_broker_load,test_spark_load,test_analyze_stats_p1,test_refresh_mtmv,test_bitmap_filter"
// this dir will not be executed
excludeDirectories = "workload_manager_p1"
excludeDirectories = "workload_manager_p1,fault_injection_p0"
cacheDataPath="/data/regression/"

s3Endpoint = "cos.ap-hongkong.myqcloud.com"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_index_failure_injection", "p0") {
suite("test_index_failure_injection", "p0", "nonConcurrent") {
// define a sql table
def testTable_dup = "httplogs_dup"
def testTable_unique = "httplogs_unique"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def create_table_sql = """
"""
def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49"

suite("test_segcompaction_correctness") {
suite("test_segcompaction_correctness", "nonConcurrent") {
def runLoadWithSegcompaction = {
String ak = getS3AK()
String sk = getS3SK()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def create_table_sql = """
"""
def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49"

suite("test_too_many_segments") { // the epic -238 case
suite("test_too_many_segments", "nonConcurrent") { // the epic -238 case
def runLoadWithTooManySegments = {
String ak = getS3AK()
String sk = getS3SK()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_sql_block_rule") {
suite("test_sql_block_rule", "nonConcurrent") {

sql """
DROP SQL_BLOCK_RULE if exists test_rule_partition
Expand Down

0 comments on commit 6d3c2b0

Please sign in to comment.