Skip to content

Commit

Permalink
Fix handling failures while creating catalogs
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrrzysko authored and hashhar committed Oct 14, 2024
1 parent b4f8248 commit dd3d895
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ public void createCatalog(CatalogName catalogName, ConnectorName connectorName,
CatalogConnector catalog = allCatalogs.computeIfAbsent(
catalogProperties.catalogHandle(),
handle -> catalogFactory.createCatalog(catalogProperties));
activeCatalogs.put(catalogName, catalog.getCatalog());
catalogStore.addOrReplaceCatalog(catalogProperties);
activeCatalogs.put(catalogName, catalog.getCatalog());

log.debug("Added catalog: %s", catalog.getCatalogHandle());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,26 @@
package io.trino.execution;

import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.trino.client.NodeVersion;
import io.trino.connector.CatalogStoreManager;
import io.trino.connector.InMemoryCatalogStore;
import io.trino.connector.MockConnectorPlugin;
import io.trino.execution.warnings.WarningCollector;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.spi.TrinoException;
import io.trino.spi.catalog.CatalogName;
import io.trino.spi.catalog.CatalogProperties;
import io.trino.spi.catalog.CatalogStore;
import io.trino.spi.catalog.CatalogStoreFactory;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;
import io.trino.spi.connector.ConnectorName;
import io.trino.spi.resourcegroups.ResourceGroupId;
import io.trino.sql.tree.CreateCatalog;
import io.trino.sql.tree.Identifier;
Expand All @@ -38,6 +49,7 @@
import org.junit.jupiter.api.TestInstance;

import java.net.URI;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;

Expand All @@ -47,8 +59,10 @@
import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector;
import static java.util.Collections.emptyList;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

@TestInstance(TestInstance.Lifecycle.PER_METHOD)
public class TestCreateCatalogTask
Expand Down Expand Up @@ -149,6 +163,51 @@ public void failCreateCatalog()
.withMessageContaining("TEST create catalog fail: " + TEST_CATALOG);
}

@Test
public void testAddOrReplaceCatalogFail()
{
try (QueryRunner queryRunner = new StandaloneQueryRunner(
TEST_SESSION,
builder -> builder
.setAdditionalModule(new FailingAddOrReplaceCatalogStoreModule())
.addProperty("catalog.store", "failing_add_or_replace"))) {
queryRunner.installPlugin(new TpchPlugin());
Map<Class<? extends Statement>, DataDefinitionTask<?>> tasks = queryRunner.getCoordinator().getInstance(new Key<>() {});
CreateCatalogTask task = (CreateCatalogTask) tasks.get(CreateCatalog.class);
QueryStateMachine queryStateMachine = QueryStateMachine.begin(
Optional.empty(),
"test",
Optional.empty(),
queryRunner.getDefaultSession(),
URI.create("fake://uri"),
new ResourceGroupId("test"),
false,
queryRunner.getTransactionManager(),
queryRunner.getAccessControl(),
directExecutor(),
queryRunner.getPlannerContext().getMetadata(),
WarningCollector.NOOP,
createPlanOptimizersStatsCollector(),
Optional.empty(),
true,
new NodeVersion("test"));

CreateCatalog statement = new CreateCatalog(
new NodeLocation(1, 1),
new Identifier(TEST_CATALOG),
true,
new Identifier("tpch"),
TPCH_PROPERTIES,
Optional.empty(),
Optional.empty());

assertThatThrownBy(() -> getFutureValue(task.execute(statement, queryStateMachine, emptyList(), WarningCollector.NOOP)))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Add or replace catalog failed");
assertThat(queryRunner.getPlannerContext().getMetadata().catalogExists(queryStateMachine.getSession(), TEST_CATALOG)).isFalse();
}
}

private static class FailConnectorFactory
implements ConnectorFactory
{
Expand All @@ -164,4 +223,71 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
throw new IllegalArgumentException("TEST create catalog fail: " + catalogName);
}
}

private static class FailingAddOrReplaceCatalogStoreModule
implements Module
{
@Override
public void configure(Binder binder) {}

@Provides
@Singleton
public CatalogStoreFactory createCatalogStoreFactory(CatalogStoreManager catalogStoreManager)
{
FailingAddOrReplaceCatalogStoreFactory factory = new FailingAddOrReplaceCatalogStoreFactory();
catalogStoreManager.addCatalogStoreFactory(factory);
return factory;
}
}

private static class FailingAddOrReplaceCatalogStoreFactory
implements CatalogStoreFactory
{
@Override
public String getName()
{
return "failing_add_or_replace";
}

@Override
public CatalogStore create(Map<String, String> config)
{
return new FailingAddOrReplaceCatalogStore(new InMemoryCatalogStore());
}
}

private static class FailingAddOrReplaceCatalogStore
implements CatalogStore
{
private final CatalogStore delegate;

FailingAddOrReplaceCatalogStore(CatalogStore delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
}

@Override
public Collection<StoredCatalog> getCatalogs()
{
return delegate.getCatalogs();
}

@Override
public CatalogProperties createCatalogProperties(CatalogName catalogName, ConnectorName connectorName, Map<String, String> properties)
{
return delegate.createCatalogProperties(catalogName, connectorName, properties);
}

@Override
public void addOrReplaceCatalog(CatalogProperties catalogProperties)
{
throw new RuntimeException("Add or replace catalog failed");
}

@Override
public void removeCatalog(CatalogName catalogName)
{
delegate.removeCatalog(catalogName);
}
}
}

0 comments on commit dd3d895

Please sign in to comment.