Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement concurrent tests in Kudu #12995

Merged
merged 1 commit into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -510,17 +510,12 @@ protected String errorMessageForInsertNegativeDate(String date)
}

@Override
public void testInsertRowConcurrently()
protected TestTable createTableWithOneIntegerColumn(String namePrefix)
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
{
// TODO Support these test once kudu connector can create tables with default partitions
throw new SkipException("TODO");
}

@Override
public void testAddColumnConcurrently()
{
// TODO Support these test once kudu connector can create tables with default partitions
throw new SkipException("TODO");
// TODO Remove this overriding method once kudu connector can create tables with default partitions
return new TestTable(getQueryRunner()::execute, namePrefix,
"(col integer WITH (primary_key=true)) " +
"WITH (partition_by_hash_columns = ARRAY['col'], partition_by_hash_buckets = 2)");
}

@Test
Expand All @@ -534,8 +529,24 @@ public void testWrittenStats()
@Override
public void testReadMetadataWithRelationsConcurrentModifications()
{
// TODO Support these test once kudu connector can create tables with default partitions
throw new SkipException("TODO");
try {
super.testReadMetadataWithRelationsConcurrentModifications();
}
catch (Exception expected) {
// The test failure is not guaranteed
// TODO (https://github.com/trinodb/trino/issues/12974): shouldn't fail
assertThat(expected)
.hasMessageMatching(".* table .* was deleted: Table deleted at .* UTC");
throw new SkipException("to be fixed");
}
}

@Override
protected String createTableSqlTemplateForConcurrentModifications()
{
// TODO Remove this overriding method once kudu connector can create tables with default partitions
return "CREATE TABLE %s(a integer WITH (primary_key=true)) " +
"WITH (partition_by_hash_columns = ARRAY['a'], partition_by_hash_buckets = 2)";
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,7 @@ protected void testReadMetadataWithRelationsConcurrentModifications(int readIter
Runnable writeInitialized = writeTasksInitialized::countDown;
Supplier<Boolean> done = () -> incompleteReadTasks.get() == 0;
List<Callable<Void>> writeTasks = new ArrayList<>();
writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_table", "CREATE TABLE %s(a integer)", "DROP TABLE %s"));
writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_table", createTableSqlTemplateForConcurrentModifications(), "DROP TABLE %s"));
if (hasBehavior(SUPPORTS_CREATE_VIEW)) {
writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_view", "CREATE VIEW %s AS SELECT 1 a", "DROP VIEW %s"));
}
Expand Down Expand Up @@ -1451,6 +1451,12 @@ protected void testReadMetadataWithRelationsConcurrentModifications(int readIter
assertTrue(executor.awaitTermination(10, SECONDS));
}

@Language("SQL")
protected String createTableSqlTemplateForConcurrentModifications()
{
return "CREATE TABLE %s(a integer)";
}

/**
* Run {@code sql} query at least {@code minIterations} times and keep running until other tasks complete.
* {@code incompleteReadTasks} is used for orchestrating end of execution.
Expand Down Expand Up @@ -3008,7 +3014,7 @@ public void testInsertRowConcurrently()
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_insert", "(col integer)")) {
try (TestTable table = createTableWithOneIntegerColumn("test_insert")) {
String tableName = table.getName();

List<Future<OptionalInt>> futures = IntStream.range(0, threads)
Expand Down Expand Up @@ -3076,7 +3082,7 @@ public void testAddColumnConcurrently()
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column", "(col integer)")) {
try (TestTable table = createTableWithOneIntegerColumn("test_add_column")) {
String tableName = table.getName();

List<Future<Optional<String>>> futures = IntStream.range(0, threads)
Expand Down Expand Up @@ -3128,6 +3134,11 @@ protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
throw new AssertionError("Unexpected concurrent add column failure", e);
}

protected TestTable createTableWithOneIntegerColumn(String namePrefix)
{
return new TestTable(getQueryRunner()::execute, namePrefix, "(col integer)");
}

@Test
public void testUpdateWithPredicates()
{
Expand Down