diff --git a/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java b/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java index 3cd2c4238df1..b0ed344b26b3 100644 --- a/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java +++ b/core/trino-main/src/main/java/io/trino/connector/CoordinatorDynamicCatalogManager.java @@ -276,8 +276,8 @@ public void createCatalog(CatalogName catalogName, ConnectorName connectorName, CatalogConnector catalog = allCatalogs.computeIfAbsent( catalogProperties.catalogHandle(), handle -> catalogFactory.createCatalog(catalogProperties)); - activeCatalogs.put(catalogName, catalog.getCatalog()); catalogStore.addOrReplaceCatalog(catalogProperties); + activeCatalogs.put(catalogName, catalog.getCatalog()); log.debug("Added catalog: %s", catalog.getCatalogHandle()); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestCreateCatalogTask.java b/core/trino-main/src/test/java/io/trino/execution/TestCreateCatalogTask.java index 5fbf889d2d27..fc502fdb3eba 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestCreateCatalogTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestCreateCatalogTask.java @@ -14,15 +14,26 @@ package io.trino.execution; import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; import io.trino.client.NodeVersion; +import io.trino.connector.CatalogStoreManager; +import io.trino.connector.InMemoryCatalogStore; import io.trino.connector.MockConnectorPlugin; import io.trino.execution.warnings.WarningCollector; import io.trino.plugin.tpch.TpchPlugin; import io.trino.spi.TrinoException; +import io.trino.spi.catalog.CatalogName; +import io.trino.spi.catalog.CatalogProperties; +import io.trino.spi.catalog.CatalogStore; +import io.trino.spi.catalog.CatalogStoreFactory; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; +import io.trino.spi.connector.ConnectorName; import io.trino.spi.resourcegroups.ResourceGroupId; import io.trino.sql.tree.CreateCatalog; import io.trino.sql.tree.Identifier; @@ -38,6 +49,7 @@ import org.junit.jupiter.api.TestInstance; import java.net.URI; +import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -47,8 +59,10 @@ import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector; import static java.util.Collections.emptyList; import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; @TestInstance(TestInstance.Lifecycle.PER_METHOD) public class TestCreateCatalogTask @@ -149,6 +163,51 @@ public void failCreateCatalog() .withMessageContaining("TEST create catalog fail: " + TEST_CATALOG); } + @Test + public void testAddOrReplaceCatalogFail() + { + try (QueryRunner queryRunner = new StandaloneQueryRunner( + TEST_SESSION, + builder -> builder + .setAdditionalModule(new FailingAddOrReplaceCatalogStoreModule()) + .addProperty("catalog.store", "failing_add_or_replace"))) { + queryRunner.installPlugin(new TpchPlugin()); + Map, DataDefinitionTask> tasks = queryRunner.getCoordinator().getInstance(new Key<>() {}); + CreateCatalogTask task = (CreateCatalogTask) tasks.get(CreateCatalog.class); + QueryStateMachine queryStateMachine = QueryStateMachine.begin( + Optional.empty(), + "test", + Optional.empty(), + queryRunner.getDefaultSession(), + URI.create("fake://uri"), + new ResourceGroupId("test"), + false, + queryRunner.getTransactionManager(), + queryRunner.getAccessControl(), + directExecutor(), + queryRunner.getPlannerContext().getMetadata(), + WarningCollector.NOOP, + createPlanOptimizersStatsCollector(), + Optional.empty(), + true, + new NodeVersion("test")); + + CreateCatalog statement = new CreateCatalog( + new NodeLocation(1, 1), + new Identifier(TEST_CATALOG), + true, + new Identifier("tpch"), + TPCH_PROPERTIES, + Optional.empty(), + Optional.empty()); + + assertThatThrownBy(() -> getFutureValue(task.execute(statement, queryStateMachine, emptyList(), WarningCollector.NOOP))) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Add or replace catalog failed"); + assertThat(queryRunner.getPlannerContext().getMetadata().catalogExists(queryStateMachine.getSession(), TEST_CATALOG)).isFalse(); + } + } + private static class FailConnectorFactory implements ConnectorFactory { @@ -164,4 +223,71 @@ public Connector create(String catalogName, Map config, Connecto throw new IllegalArgumentException("TEST create catalog fail: " + catalogName); } } + + private static class FailingAddOrReplaceCatalogStoreModule + implements Module + { + @Override + public void configure(Binder binder) {} + + @Provides + @Singleton + public CatalogStoreFactory createCatalogStoreFactory(CatalogStoreManager catalogStoreManager) + { + FailingAddOrReplaceCatalogStoreFactory factory = new FailingAddOrReplaceCatalogStoreFactory(); + catalogStoreManager.addCatalogStoreFactory(factory); + return factory; + } + } + + private static class FailingAddOrReplaceCatalogStoreFactory + implements CatalogStoreFactory + { + @Override + public String getName() + { + return "failing_add_or_replace"; + } + + @Override + public CatalogStore create(Map config) + { + return new FailingAddOrReplaceCatalogStore(new InMemoryCatalogStore()); + } + } + + private static class FailingAddOrReplaceCatalogStore + implements CatalogStore + { + private final CatalogStore delegate; + + FailingAddOrReplaceCatalogStore(CatalogStore delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public Collection getCatalogs() + { + return delegate.getCatalogs(); + } + + @Override + public CatalogProperties createCatalogProperties(CatalogName catalogName, ConnectorName connectorName, Map properties) + { + return delegate.createCatalogProperties(catalogName, connectorName, properties); + } + + @Override + public void addOrReplaceCatalog(CatalogProperties catalogProperties) + { + throw new RuntimeException("Add or replace catalog failed"); + } + + @Override + public void removeCatalog(CatalogName catalogName) + { + delegate.removeCatalog(catalogName); + } + } }