Skip to content

Commit

Permalink
Move exchange setup to top-level DistributedQueryRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Mar 2, 2022
1 parent e29fce5 commit a0d8caa
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.airlift.log.Logging;
import io.trino.Session;
import io.trino.metadata.QualifiedObjectName;
import io.trino.plugin.exchange.FileSystemExchangePlugin;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.MetastoreConfig;
Expand Down Expand Up @@ -100,7 +99,6 @@ public static class Builder<SELF extends Builder<?>>
{
private boolean skipTimezoneSetup;
private ImmutableMap.Builder<String, String> hiveProperties = ImmutableMap.builder();
private Map<String, String> exchangeManagerProperties = ImmutableMap.of();
private List<TpchTable<?>> initialTables = ImmutableList.of();
private Optional<String> initialSchemasLocationBase = Optional.empty();
private Function<Session, Session> initialTablesSessionMutator = Function.identity();
Expand Down Expand Up @@ -150,12 +148,6 @@ public SELF addHiveProperty(String key, String value)
return self();
}

public SELF setExchangeManagerProperties(Map<String, String> exchangeManagerProperties)
{
this.exchangeManagerProperties = ImmutableMap.copyOf(requireNonNull(exchangeManagerProperties, "exchangeManagerProperties is null"));
return self();
}

public SELF setInitialTables(Iterable<TpchTable<?>> initialTables)
{
this.initialTables = ImmutableList.copyOf(requireNonNull(initialTables, "initialTables is null"));
Expand Down Expand Up @@ -240,11 +232,6 @@ public DistributedQueryRunner build()
HiveMetastore metastore = this.metastore.apply(queryRunner);
queryRunner.installPlugin(new TestingHivePlugin(metastore, module, cachingDirectoryLister));

if (!exchangeManagerProperties.isEmpty()) {
queryRunner.installPlugin(new FileSystemExchangePlugin());
queryRunner.loadExchangeManager("filesystem", exchangeManagerProperties);
}

Map<String, String> hiveProperties = new HashMap<>();
if (!skipTimezoneSetup) {
assertEquals(DateTimeZone.getDefault(), TIME_ZONE, "Timezone not configured correctly. Add -Duser.timezone=America/Bahia_Banderas to your JVM arguments");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.trino.plugin.exchange.FileSystemExchangePlugin;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.DistributedQueryRunner;
import io.trino.tpch.TpchTable;
Expand Down Expand Up @@ -91,7 +90,6 @@ public static class Builder
{
private Optional<File> metastoreDirectory = Optional.empty();
private ImmutableMap.Builder<String, String> icebergProperties = ImmutableMap.builder();
private Map<String, String> exchangeManagerProperties = ImmutableMap.of();
private List<TpchTable<?>> initialTables = ImmutableList.of();

protected Builder()
Expand Down Expand Up @@ -121,12 +119,6 @@ public Builder addIcebergProperty(String key, String value)
return self();
}

public Builder setExchangeManagerProperties(Map<String, String> exchangeManagerProperties)
{
this.exchangeManagerProperties = ImmutableMap.copyOf(requireNonNull(exchangeManagerProperties, "exchangeManagerProperties is null"));
return self();
}

public Builder setInitialTables(Iterable<TpchTable<?>> initialTables)
{
this.initialTables = ImmutableList.copyOf(requireNonNull(initialTables, "initialTables is null"));
Expand All @@ -142,11 +134,6 @@ public DistributedQueryRunner build()
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

if (!exchangeManagerProperties.isEmpty()) {
queryRunner.installPlugin(new FileSystemExchangePlugin());
queryRunner.loadExchangeManager("filesystem", exchangeManagerProperties);
}

Path dataDir = metastoreDirectory.map(File::toPath).orElseGet(() -> queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data"));

queryRunner.installPlugin(new IcebergPlugin());
Expand Down
5 changes: 5 additions & 0 deletions testing/trino-testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
<artifactId>trino-client</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-exchange</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-main</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.SessionPropertyManager;
import io.trino.metadata.SqlFunction;
import io.trino.plugin.exchange.FileSystemExchangePlugin;
import io.trino.server.BasicQueryInfo;
import io.trino.server.SessionPropertyDefaults;
import io.trino.server.testing.TestingTrinoServer;
Expand Down Expand Up @@ -625,6 +626,7 @@ public static class Builder<SELF extends Builder<?>>
private Map<String, String> extraProperties = new HashMap<>();
private Map<String, String> coordinatorProperties = ImmutableMap.of();
private Optional<Map<String, String>> backupCoordinatorProperties = Optional.empty();
private Map<String, String> exchangeManagerProperties = ImmutableMap.of();
private String environment = ENVIRONMENT;
private Module additionalModule = EMPTY_MODULE;
private Optional<Path> baseDataDir = Optional.empty();
Expand Down Expand Up @@ -673,6 +675,12 @@ public SELF setBackupCoordinatorProperties(Map<String, String> backupCoordinator
return self();
}

public SELF setExchangeManagerProperties(Map<String, String> exchangeManagerProperties)
{
this.exchangeManagerProperties = ImmutableMap.copyOf(requireNonNull(exchangeManagerProperties, "exchangeManagerProperties is null"));
return self();
}

/**
* Sets coordinator properties being equal to a map containing given key and value.
* Note, that calling this method OVERWRITES previously set property values.
Expand Down Expand Up @@ -744,7 +752,7 @@ protected SELF self()
public DistributedQueryRunner build()
throws Exception
{
return new DistributedQueryRunner(
DistributedQueryRunner queryRunner = new DistributedQueryRunner(
defaultSession,
nodeCount,
extraProperties,
Expand All @@ -755,6 +763,13 @@ public DistributedQueryRunner build()
baseDataDir,
systemAccessControls,
eventListeners);

if (!exchangeManagerProperties.isEmpty()) {
queryRunner.installPlugin(new FileSystemExchangePlugin());
queryRunner.loadExchangeManager("filesystem", exchangeManagerProperties);
}

return queryRunner;
}
}
}

0 comments on commit a0d8caa

Please sign in to comment.