Skip to content

Commit

Permalink
Add concurrent adding columns test to BaseConnectorTest
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed May 24, 2022
1 parent 239d847 commit 7c69c7d
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,13 @@ protected String errorMessageForInsertIntoNotNullColumn(String columnName)
return format("NULL not allowed for column \"%s\"(?s).*", columnName.toUpperCase(ENGLISH));
}

@Override
public void testAddColumnConcurrently()
{
// TODO: Difficult to determine whether the exception is concurrent issue or not from the error message
throw new SkipException("TODO: Enable this test after finding the failure cause");
}

@Override
protected JdbcSqlExecutor onRemoteDatabase()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ public void testDropColumn()
assertQueryFails("ALTER TABLE " + tableName + " DROP COLUMN a", "(?s).* Missing columns: 'a' while processing query: 'a', required columns: 'a' 'a'.*");
}

@Override
public void testAddColumnConcurrently()
{
// TODO: Default storage engine doesn't support adding new columns
throw new SkipException("TODO: test not implemented yet");
}

@Override
public void testAddColumn()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ protected void verifyConcurrentUpdateFailurePermissible(Exception e)
assertThat(e).hasMessageContaining("Failed to commit Iceberg update to table");
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageContaining("Cannot update Iceberg table: supplied previous location does not match current location");
}

@Test
public void testDeleteOnV1Table()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,13 @@ public void testInsertRowConcurrently()
throw new SkipException("TODO");
}

@Override
public void testAddColumnConcurrently()
{
// TODO Support these test once kudu connector can create tables with default partitions
throw new SkipException("TODO");
}

@Test
@Override
public void testDelete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,13 @@ public void testLimitPushdown()
assertThat(query("SELECT name FROM nation LIMIT 2147483648")).isNotFullyPushedDown(LimitNode.class);
}

@Override
public void testAddColumnConcurrently()
{
// TODO: Enable after supporting multi-document transaction https://www.mongodb.com/docs/manual/core/transactions/
throw new SkipException("TODO");
}

private void assertOneNotNullResult(String query)
{
MaterializedResult results = getQueryRunner().execute(getSession(), query).toTestTypes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,13 @@ protected String errorMessageForInsertIntoNotNullColumn(String columnName)
return format("ORA-01400: cannot insert NULL into \\(.*\"%s\"\\)\n", columnName.toUpperCase(ENGLISH));
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessage("ORA-14411: The DDL cannot be run concurrently with other DDLs\n");
}

private void predicatePushdownTest(String oracleType, String oracleLiteral, String operator, String filterLiteral)
{
String tableName = ("test_pdown_" + oracleType.replaceAll("[^a-zA-Z0-9]", ""))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,13 @@ protected TestTable createTableWithDoubleAndRealColumns(String name, List<String
return new TestTable(onRemoteDatabase(), name, "(t_double double primary key, u_double double, v_real float, w_real float)", rows);
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageContaining("Concurrent modification to table");
}

@Override
protected SqlExecutor onRemoteDatabase()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,13 @@ protected TestTable createTableWithDoubleAndRealColumns(String name, List<String
return new TestTable(onRemoteDatabase(), name, "(t_double double primary key, u_double double, v_real float, w_real float)", rows);
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageContaining("Concurrent modification to table");
}

@Override
protected SqlExecutor onRemoteDatabase()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static java.util.Arrays.asList;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -880,4 +881,16 @@ public void testAlterTable()

assertUpdate("DROP TABLE test_alter_table");
}

@Override
protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
assertThat(e)
.hasMessageContaining("Failed to perform metadata operation")
.getCause()
.hasMessageMatching(
"(?s).*SQLIntegrityConstraintViolationException.*" +
"|.*Unique index or primary key violation.*" +
"|.*Deadlock found when trying to get lock; try restarting transaction.*");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2847,6 +2847,71 @@ protected void verifyConcurrentInsertFailurePermissible(Exception e)
throw new AssertionError("Unexpected concurrent insert failure", e);
}

// Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic.
@Test(timeOut = 60_000, invocationCount = 4)
public void testAddColumnConcurrently()
throws Exception
{
if (!hasBehavior(SUPPORTS_ADD_COLUMN)) {
// Covered by testAddColumn
return;
}

int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_add_column", "(col integer)")) {
String tableName = table.getName();

List<Future<Optional<String>>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(30, SECONDS);
try {
String columnName = "col" + threadNumber;
getQueryRunner().execute("ALTER TABLE " + tableName + " ADD COLUMN " + columnName + " integer");
return Optional.of(columnName);
}
catch (Exception e) {
RuntimeException trinoException = getTrinoExceptionCause(e);
try {
verifyConcurrentAddColumnFailurePermissible(trinoException);
}
catch (Throwable verifyFailure) {
if (verifyFailure != e) {
verifyFailure.addSuppressed(e);
}
throw verifyFailure;
}
return Optional.<String>empty();
}
}))
.collect(toImmutableList());

List<String> addedColumns = futures.stream()
.map(future -> tryGetFutureValue(future, 30, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out")))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(toImmutableList());

assertThat(query("DESCRIBE " + tableName))
.projected(0)
.skippingTypesCheck()
.matches(Stream.concat(Stream.of("col"), addedColumns.stream())
.map(value -> format("'%s'", value))
.collect(joining(",", "VALUES ", "")));
}
finally {
executor.shutdownNow();
executor.awaitTermination(30, SECONDS);
}
}

protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
{
// By default, do not expect ALTER TABLE ADD COLUMN to fail in case of concurrent inserts
throw new AssertionError("Unexpected concurrent add column failure", e);
}

@Test
public void testUpdateWithPredicates()
{
Expand Down

0 comments on commit 7c69c7d

Please sign in to comment.