Skip to content

Commit

Permalink
Replace RaptorConnectorId with CatalogName
Browse files Browse the repository at this point in the history
We used to use entities like *ConnectorId as there was a plan to
dynamically add and remove catalogs. In such case catalog name was
not unique enough. However that plan was abandoned long time ago.
  • Loading branch information
kokosing committed Oct 8, 2021
1 parent a007051 commit 81bdf7c
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Connector create(String catalogName, Map<String, String> config, Connecto
metadataModule,
new BackupModule(backupProviders),
new StorageModule(),
new RaptorModule(catalogName),
new RaptorModule(),
new RaptorSecurityModule());

Injector injector = app
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,13 @@

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.trino.plugin.raptor.legacy.metadata.SchemaDaoUtil.createTablesWithRetry;
import static java.util.Objects.requireNonNull;

public class RaptorModule
implements Module
{
private final String connectorId;

public RaptorModule(String connectorId)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
}

@Override
public void configure(Binder binder)
{
binder.bind(RaptorConnectorId.class).toInstance(new RaptorConnectorId(connectorId));
binder.bind(RaptorConnector.class).in(Scopes.SINGLETON);
binder.bind(RaptorMetadataFactory.class).in(Scopes.SINGLETON);
binder.bind(RaptorSplitManager.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.raptor.legacy;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.raptor.legacy.backup.BackupService;
import io.trino.plugin.raptor.legacy.metadata.BucketShards;
import io.trino.plugin.raptor.legacy.metadata.ShardManager;
Expand Down Expand Up @@ -71,17 +72,17 @@ public class RaptorSplitManager
private final ExecutorService executor;

@Inject
public RaptorSplitManager(RaptorConnectorId connectorId, NodeSupplier nodeSupplier, ShardManager shardManager, BackupService backupService)
public RaptorSplitManager(CatalogName catalogName, NodeSupplier nodeSupplier, ShardManager shardManager, BackupService backupService)
{
this(connectorId, nodeSupplier, shardManager, requireNonNull(backupService, "backupService is null").isBackupAvailable());
this(catalogName, nodeSupplier, shardManager, requireNonNull(backupService, "backupService is null").isBackupAvailable());
}

public RaptorSplitManager(RaptorConnectorId connectorId, NodeSupplier nodeSupplier, ShardManager shardManager, boolean backupAvailable)
public RaptorSplitManager(CatalogName catalogName, NodeSupplier nodeSupplier, ShardManager shardManager, boolean backupAvailable)
{
this.nodeSupplier = requireNonNull(nodeSupplier, "nodeSupplier is null");
this.shardManager = requireNonNull(shardManager, "shardManager is null");
this.backupAvailable = backupAvailable;
this.executor = newCachedThreadPool(daemonThreadsNamed("raptor-split-" + connectorId + "-%s"));
this.executor = newCachedThreadPool(daemonThreadsNamed("raptor-split-" + catalogName + "-%s"));
}

@PreDestroy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.configuration.ConfigurationAwareModule;
import io.trino.plugin.raptor.legacy.RaptorConnectorId;
import io.trino.plugin.base.CatalogName;
import org.weakref.jmx.MBeanExporter;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -77,7 +77,7 @@ private static Optional<BackupStore> createBackupStore(
@Nullable BackupStore store,
LifeCycleManager lifeCycleManager,
MBeanExporter exporter,
RaptorConnectorId connectorId,
CatalogName catalogName,
BackupConfig config)
{
if (store == null) {
Expand All @@ -86,14 +86,14 @@ private static Optional<BackupStore> createBackupStore(

BackupStore proxy = new TimeoutBackupStore(
store,
connectorId.toString(),
catalogName.toString(),
config.getTimeout(),
config.getTimeoutThreads());

lifeCycleManager.addInstance(proxy);

BackupStore managed = new ManagedBackupStore(proxy);
exporter.exportWithGeneratedName(managed, BackupStore.class, connectorId.toString());
exporter.exportWithGeneratedName(managed, BackupStore.class, catalogName.toString());

return Optional.of(managed);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.units.Duration;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.raptor.legacy.NodeSupplier;
import io.trino.plugin.raptor.legacy.RaptorConnectorId;
import io.trino.plugin.raptor.legacy.backup.BackupService;
import io.trino.plugin.raptor.legacy.metadata.BucketNode;
import io.trino.plugin.raptor.legacy.metadata.Distribution;
Expand Down Expand Up @@ -95,15 +95,15 @@ public BucketBalancer(
ShardManager shardManager,
BucketBalancerConfig config,
BackupService backupService,
RaptorConnectorId connectorId)
CatalogName catalogName)
{
this(nodeSupplier,
shardManager,
config.isBalancerEnabled(),
config.getBalancerInterval(),
backupService.isBackupAvailable(),
nodeManager.getCurrentNode().isCoordinator(),
connectorId.toString());
catalogName.toString());
}

public BucketBalancer(
Expand All @@ -113,15 +113,15 @@ public BucketBalancer(
Duration interval,
boolean backupAvailable,
boolean coordinator,
String connectorId)
String catalogName)
{
this.nodeSupplier = requireNonNull(nodeSupplier, "nodeSupplier is null");
this.shardManager = requireNonNull(shardManager, "shardManager is null");
this.enabled = enabled;
this.interval = requireNonNull(interval, "interval is null");
this.backupAvailable = backupAvailable;
this.coordinator = coordinator;
this.executor = newSingleThreadScheduledExecutor(daemonThreadsNamed("bucket-balancer-" + connectorId));
this.executor = newSingleThreadScheduledExecutor(daemonThreadsNamed("bucket-balancer-" + catalogName));
}

@PostConstruct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import io.trino.orc.metadata.ColumnMetadata;
import io.trino.orc.metadata.OrcColumnId;
import io.trino.orc.metadata.OrcType;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.raptor.legacy.RaptorColumnHandle;
import io.trino.plugin.raptor.legacy.RaptorConnectorId;
import io.trino.plugin.raptor.legacy.backup.BackupManager;
import io.trino.plugin.raptor.legacy.backup.BackupStore;
import io.trino.plugin.raptor.legacy.metadata.ColumnInfo;
Expand Down Expand Up @@ -162,7 +162,7 @@ public RaptorStorageManager(
StorageService storageService,
Optional<BackupStore> backupStore,
StorageManagerConfig config,
RaptorConnectorId connectorId,
CatalogName catalogName,
BackupManager backgroundBackupManager,
ShardRecoveryManager recoveryManager,
ShardRecorder shardRecorder,
Expand All @@ -176,7 +176,7 @@ public RaptorStorageManager(
recoveryManager,
shardRecorder,
typeManager,
connectorId.toString(),
catalogName.toString(),
config.getDeletionThreads(),
config.getShardRecoveryTimeout(),
config.getMaxShardRows(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.units.Duration;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.raptor.legacy.NodeSupplier;
import io.trino.plugin.raptor.legacy.RaptorConnectorId;
import io.trino.plugin.raptor.legacy.backup.BackupStore;
import io.trino.plugin.raptor.legacy.metadata.ShardManager;
import io.trino.plugin.raptor.legacy.metadata.ShardMetadata;
Expand Down Expand Up @@ -85,15 +85,15 @@ public ShardEjector(
StorageService storageService,
StorageManagerConfig config,
Optional<BackupStore> backupStore,
RaptorConnectorId connectorId)
CatalogName catalogName)
{
this(nodeManager.getCurrentNode().getNodeIdentifier(),
nodeSupplier,
shardManager,
storageService,
config.getShardEjectorInterval(),
backupStore,
connectorId.toString());
catalogName.toString());
}

public ShardEjector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airlift.slice.Slice;
import io.trino.PagesIndexPageSorter;
import io.trino.operator.PagesIndex;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.base.security.AllowAllAccessControl;
import io.trino.plugin.raptor.legacy.metadata.MetadataDao;
import io.trino.plugin.raptor.legacy.metadata.ShardManager;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void setup()
createTablesWithRetry(dbi);
dataDir = Files.createTempDir();

RaptorConnectorId connectorId = new RaptorConnectorId("test");
CatalogName connectorId = new CatalogName("test");
NodeManager nodeManager = new TestingNodeManager();
NodeSupplier nodeSupplier = nodeManager::getWorkerNodes;
ShardManager shardManager = createShardManager(dbi);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import io.airlift.units.Duration;
import io.trino.client.NodeVersion;
import io.trino.metadata.InternalNode;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.raptor.legacy.NodeSupplier;
import io.trino.plugin.raptor.legacy.RaptorColumnHandle;
import io.trino.plugin.raptor.legacy.RaptorConnectorId;
import io.trino.plugin.raptor.legacy.RaptorMetadata;
import io.trino.plugin.raptor.legacy.RaptorSplitManager;
import io.trino.plugin.raptor.legacy.RaptorTableHandle;
Expand Down Expand Up @@ -107,7 +107,7 @@ public void setup()
String nodeName = UUID.randomUUID().toString();
nodeManager.addNode(new InternalNode(nodeName, new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN, false));

RaptorConnectorId connectorId = new RaptorConnectorId("raptor");
CatalogName connectorId = new CatalogName("raptor");
metadata = new RaptorMetadata(dbi, shardManager);

metadata.createTable(SESSION, TEST_TABLE, false);
Expand Down Expand Up @@ -166,7 +166,7 @@ public void testAssignRandomNodeWhenBackupAvailable()
throws URISyntaxException
{
TestingNodeManager nodeManager = new TestingNodeManager();
RaptorConnectorId connectorId = new RaptorConnectorId("raptor");
CatalogName connectorId = new CatalogName("raptor");
NodeSupplier nodeSupplier = nodeManager::getWorkerNodes;
InternalNode node = new InternalNode(UUID.randomUUID().toString(), new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN, false);
nodeManager.addNode(node);
Expand All @@ -184,7 +184,7 @@ public void testNoNodes()
{
deleteShardNodes();

RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), ImmutableSet::of, shardManager, true);
RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new CatalogName("fbraptor"), ImmutableSet::of, shardManager, true);
ConnectorSplitSource splitSource = getSplits(raptorSplitManagerWithBackup, tableHandle);
getSplits(splitSource, 1000);
}
Expand Down

0 comments on commit 81bdf7c

Please sign in to comment.