diff --git a/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java b/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java index 2e50db92d09d..8e4df6fb39d6 100644 --- a/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java +++ b/core/trino-main/src/main/java/io/trino/exchange/ExchangeManagerRegistry.java @@ -14,14 +14,12 @@ package io.trino.exchange; import io.airlift.log.Logger; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.spi.TrinoException; import io.trino.spi.classloader.ThreadContextClassLoader; import io.trino.spi.exchange.ExchangeManager; import io.trino.spi.exchange.ExchangeManagerFactory; import javax.annotation.PreDestroy; -import javax.inject.Inject; import java.io.File; import java.io.IOException; @@ -45,18 +43,10 @@ public class ExchangeManagerRegistry private static final File CONFIG_FILE = new File("etc/exchange-manager.properties"); private static final String EXCHANGE_MANAGER_NAME_PROPERTY = "exchange-manager.name"; - private final ExchangeHandleResolver handleResolver; - private final Map exchangeManagerFactories = new ConcurrentHashMap<>(); private volatile ExchangeManager exchangeManager; - @Inject - public ExchangeManagerRegistry(ExchangeHandleResolver handleResolver) - { - this.handleResolver = requireNonNull(handleResolver, "handleResolver is null"); - } - public void addExchangeManagerFactory(ExchangeManagerFactory factory) { requireNonNull(factory, "factory is null"); @@ -91,7 +81,6 @@ public synchronized void loadExchangeManager(String name, Map pr try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factory.getClass().getClassLoader())) { exchangeManager = factory.create(properties); } - handleResolver.setExchangeManagerHandleResolver(factory.getHandleResolver()); log.info("-- Loaded exchange manager %s --", name); diff --git a/core/trino-main/src/main/java/io/trino/metadata/ExchangeHandleResolver.java b/core/trino-main/src/main/java/io/trino/metadata/ExchangeHandleResolver.java deleted file mode 100644 index 26f457e06ed7..000000000000 --- a/core/trino-main/src/main/java/io/trino/metadata/ExchangeHandleResolver.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.metadata; - -import io.trino.spi.exchange.ExchangeManagerHandleResolver; -import io.trino.spi.exchange.ExchangeSinkInstanceHandle; -import io.trino.spi.exchange.ExchangeSourceHandle; - -import java.util.concurrent.atomic.AtomicReference; - -import static com.google.common.base.Preconditions.checkState; - -public class ExchangeHandleResolver -{ - private final AtomicReference exchangeManagerHandleResolver = new AtomicReference<>(); - - public void setExchangeManagerHandleResolver(ExchangeManagerHandleResolver resolver) - { - checkState(exchangeManagerHandleResolver.compareAndSet(null, resolver), "ExchangeManagerHandleResolver is already set"); - } - - public Class getExchangeSinkInstanceHandleClass() - { - ExchangeManagerHandleResolver resolver = exchangeManagerHandleResolver.get(); - checkState(resolver != null, "ExchangeManagerHandleResolver is not set"); - return resolver.getExchangeSinkInstanceHandleClass(); - } - - public Class getExchangeSourceHandleClass() - { - ExchangeManagerHandleResolver resolver = exchangeManagerHandleResolver.get(); - checkState(resolver != null, "ExchangeManagerHandleResolver is not set"); - return resolver.getExchangeSourceHandleHandleClass(); - } -} diff --git a/core/trino-main/src/main/java/io/trino/metadata/HandleJsonModule.java b/core/trino-main/src/main/java/io/trino/metadata/HandleJsonModule.java index e199110a1b87..2a1e4753a8a2 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/HandleJsonModule.java +++ b/core/trino-main/src/main/java/io/trino/metadata/HandleJsonModule.java @@ -37,7 +37,6 @@ public class HandleJsonModule public void configure(Binder binder) { binder.bind(HandleResolver.class).in(Scopes.SINGLETON); - binder.bind(ExchangeHandleResolver.class).in(Scopes.SINGLETON); } @ProvidesIntoSet @@ -101,14 +100,14 @@ public static com.fasterxml.jackson.databind.Module partitioningHandleModule(Han } @ProvidesIntoSet - public static com.fasterxml.jackson.databind.Module exchangeSinkInstanceHandleModule(ExchangeHandleResolver resolver) + public static com.fasterxml.jackson.databind.Module exchangeSinkInstanceHandleModule(HandleResolver resolver) { - return new AbstractTypedJacksonModule<>(ExchangeSinkInstanceHandle.class, ignored -> "ExchangeSinkInstance", ignored -> resolver.getExchangeSinkInstanceHandleClass()) {}; + return new AbstractTypedJacksonModule<>(ExchangeSinkInstanceHandle.class, resolver::getId, resolver::getHandleClass) {}; } @ProvidesIntoSet - public static com.fasterxml.jackson.databind.Module exchangeSourceHandleModule(ExchangeHandleResolver resolver) + public static com.fasterxml.jackson.databind.Module exchangeSourceHandleModule(HandleResolver resolver) { - return new AbstractTypedJacksonModule<>(ExchangeSourceHandle.class, ignored -> "ExchangeSource", ignored -> resolver.getExchangeSourceHandleClass()) {}; + return new AbstractTypedJacksonModule<>(ExchangeSourceHandle.class, resolver::getId, resolver::getHandleClass) {}; } } diff --git a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java index 2a8c60bc1d65..94d7700d23a6 100644 --- a/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java +++ b/core/trino-main/src/main/java/io/trino/testing/LocalQueryRunner.java @@ -78,7 +78,6 @@ import io.trino.metadata.CatalogManager; import io.trino.metadata.ColumnPropertyManager; import io.trino.metadata.DisabledSystemSecurityMetadata; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.metadata.FunctionBundle; import io.trino.metadata.FunctionManager; import io.trino.metadata.GlobalFunctionCatalog; @@ -462,7 +461,7 @@ private LocalQueryRunner( new TransactionsSystemTable(typeManager, transactionManager)), ImmutableSet.of()); - exchangeManagerRegistry = new ExchangeManagerRegistry(new ExchangeHandleResolver()); + exchangeManagerRegistry = new ExchangeManagerRegistry(); this.pluginManager = new PluginManager( (loader, createClassLoader) -> {}, catalogFactory, diff --git a/core/trino-main/src/test/java/io/trino/exchange/TestLazyExchangeDataSource.java b/core/trino-main/src/test/java/io/trino/exchange/TestLazyExchangeDataSource.java index 4270d6925325..97a6a41814c4 100644 --- a/core/trino-main/src/test/java/io/trino/exchange/TestLazyExchangeDataSource.java +++ b/core/trino-main/src/test/java/io/trino/exchange/TestLazyExchangeDataSource.java @@ -15,7 +15,6 @@ import com.google.common.util.concurrent.ListenableFuture; import io.trino.memory.context.SimpleLocalMemoryContext; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.operator.RetryPolicy; import io.trino.spi.QueryId; import io.trino.spi.exchange.ExchangeId; @@ -40,7 +39,7 @@ public void testIsBlockedCancellationIsolationInInitializationPhase() throw new UnsupportedOperationException(); }, RetryPolicy.NONE, - new ExchangeManagerRegistry(new ExchangeHandleResolver()))) { + new ExchangeManagerRegistry())) { ListenableFuture first = source.isBlocked(); ListenableFuture second = source.isBlocked(); assertThat(first) diff --git a/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java b/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java index 0c5eea56dd06..822ea8ddf653 100644 --- a/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java @@ -36,7 +36,6 @@ import io.trino.memory.MemoryPool; import io.trino.memory.QueryContext; import io.trino.memory.context.SimpleLocalMemoryContext; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.metadata.InternalNode; import io.trino.metadata.Split; import io.trino.operator.TaskContext; @@ -213,7 +212,7 @@ public MockRemoteTask( DataSize.ofBytes(1), () -> new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), "test"), () -> {}, - new ExchangeManagerRegistry(new ExchangeHandleResolver())); + new ExchangeManagerRegistry()); this.fragment = requireNonNull(fragment, "fragment is null"); this.nodeId = requireNonNull(nodeId, "nodeId is null"); diff --git a/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java b/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java index cc50e5fcb89a..897bd188ae21 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java +++ b/core/trino-main/src/test/java/io/trino/execution/TaskTestUtils.java @@ -29,7 +29,6 @@ import io.trino.execution.scheduler.NodeSchedulerConfig; import io.trino.execution.scheduler.UniformNodeSelectorFactory; import io.trino.index.IndexManager; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.metadata.InMemoryNodeManager; import io.trino.metadata.Split; import io.trino.operator.PagesIndex; @@ -174,7 +173,7 @@ public static LocalExecutionPlanner createTestingPlanner() new DynamicFilterConfig(), blockTypeOperators, new TableExecuteContextManager(), - new ExchangeManagerRegistry(new ExchangeHandleResolver())); + new ExchangeManagerRegistry()); } public static TaskInfo updateTask(SqlTask sqlTask, List splitAssignments, OutputBuffers outputBuffers) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java b/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java index 00c292320f04..03637a5151d8 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java @@ -27,7 +27,6 @@ import io.trino.memory.MemoryPool; import io.trino.memory.QueryContext; import io.trino.memory.context.LocalMemoryContext; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.operator.DriverContext; import io.trino.operator.OperatorContext; import io.trino.operator.PipelineContext; @@ -268,7 +267,7 @@ private SqlTask newSqlTask(QueryId queryId) sqlTask -> {}, DataSize.of(32, MEGABYTE), DataSize.of(200, MEGABYTE), - new ExchangeManagerRegistry(new ExchangeHandleResolver()), + new ExchangeManagerRegistry(), new CounterStat()); } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java index 4726074514ca..ff1b052d9d76 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java @@ -31,7 +31,6 @@ import io.trino.execution.executor.TaskExecutor; import io.trino.memory.MemoryPool; import io.trino.memory.QueryContext; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.operator.TaskContext; import io.trino.spi.QueryId; import io.trino.spi.predicate.Domain; @@ -405,7 +404,7 @@ private SqlTask createInitialTask() sqlTask -> {}, DataSize.of(32, MEGABYTE), DataSize.of(200, MEGABYTE), - new ExchangeManagerRegistry(new ExchangeHandleResolver()), + new ExchangeManagerRegistry(), new CounterStat()); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java index 724689abba36..cfeb71077d5b 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskManager.java @@ -42,7 +42,6 @@ import io.trino.memory.NodeMemoryConfig; import io.trino.memory.QueryContext; import io.trino.memory.context.LocalMemoryContext; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.metadata.InternalNode; import io.trino.operator.DirectExchangeClient; import io.trino.operator.DirectExchangeClientSupplier; @@ -373,7 +372,7 @@ private SqlTaskManager createSqlTaskManager(TaskManagerConfig taskManagerConfig, localSpillManager, new NodeSpillConfig(), new TestingGcMonitor(), - new ExchangeManagerRegistry(new ExchangeHandleResolver())); + new ExchangeManagerRegistry()); } private SqlTaskManager createSqlTaskManager( @@ -397,7 +396,7 @@ private SqlTaskManager createSqlTaskManager( localSpillManager, new NodeSpillConfig(), new TestingGcMonitor(), - new ExchangeManagerRegistry(new ExchangeHandleResolver()), + new ExchangeManagerRegistry(), stuckSplitStackTracePredicate); } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java index 8ffef4daafe5..62f21b60a2ad 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java @@ -25,7 +25,6 @@ import io.trino.exchange.ExchangeManagerRegistry; import io.trino.execution.StageId; import io.trino.execution.TaskId; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.plugin.exchange.filesystem.FileSystemExchangeManagerFactory; import io.trino.spi.QueryId; import io.trino.spi.TrinoException; @@ -64,7 +63,7 @@ public class TestDeduplicatingDirectExchangeBuffer @BeforeClass public void beforeClass() { - exchangeManagerRegistry = new ExchangeManagerRegistry(new ExchangeHandleResolver()); + exchangeManagerRegistry = new ExchangeManagerRegistry(); exchangeManagerRegistry.addExchangeManagerFactory(new FileSystemExchangeManagerFactory()); exchangeManagerRegistry.loadExchangeManager("filesystem", ImmutableMap.of( "exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager")); @@ -614,7 +613,7 @@ public void testExchangeManagerNotConfigured() directExecutor(), DataSize.of(100, BYTE), RetryPolicy.QUERY, - new ExchangeManagerRegistry(new ExchangeHandleResolver()), + new ExchangeManagerRegistry(), new QueryId("query"), createRandomExchangeId())) { TaskId task = createTaskId(0, 0); @@ -637,7 +636,7 @@ public void testExchangeManagerNotConfigured() directExecutor(), DataSize.of(100, BYTE), RetryPolicy.QUERY, - new ExchangeManagerRegistry(new ExchangeHandleResolver()), + new ExchangeManagerRegistry(), new QueryId("query"), createRandomExchangeId())) { TaskId task = createTaskId(0, 0); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClient.java b/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClient.java index bd3a665a2760..c4a95cc3a212 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClient.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDirectExchangeClient.java @@ -35,7 +35,6 @@ import io.trino.execution.TaskId; import io.trino.execution.buffer.PagesSerde; import io.trino.memory.context.SimpleLocalMemoryContext; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.spi.Page; import io.trino.spi.QueryId; import io.trino.spi.TrinoException; @@ -475,7 +474,7 @@ public void testDeduplicationTaskFailure() scheduler, DataSize.of(1, Unit.MEGABYTE), RetryPolicy.QUERY, - new ExchangeManagerRegistry(new ExchangeHandleResolver()), + new ExchangeManagerRegistry(), new QueryId("query"), createRandomExchangeId()); @@ -535,7 +534,7 @@ public void testDeduplication() scheduler, DataSize.of(1, Unit.KILOBYTE), RetryPolicy.QUERY, - new ExchangeManagerRegistry(new ExchangeHandleResolver()), + new ExchangeManagerRegistry(), new QueryId("query"), createRandomExchangeId()), maxResponseSize, diff --git a/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java index afd4d1495774..e15126b9a296 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestExchangeOperator.java @@ -28,7 +28,6 @@ import io.trino.execution.TaskId; import io.trino.execution.buffer.PagesSerdeFactory; import io.trino.execution.buffer.TestingPagesSerdeFactory; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.metadata.Split; import io.trino.operator.ExchangeOperator.ExchangeOperatorFactory; import io.trino.spi.Page; @@ -264,7 +263,7 @@ private SourceOperator createExchangeOperator() directExchangeClientSupplier, SERDE_FACTORY, RetryPolicy.NONE, - new ExchangeManagerRegistry(new ExchangeHandleResolver())); + new ExchangeManagerRegistry()); DriverContext driverContext = createTaskContext(scheduler, scheduledExecutor, TEST_SESSION) .addPipelineContext(0, true, true, false) diff --git a/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java index 0bc234015c90..4a7959a23c26 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestMergeOperator.java @@ -27,7 +27,6 @@ import io.trino.execution.TaskId; import io.trino.execution.buffer.PagesSerdeFactory; import io.trino.execution.buffer.TestingPagesSerdeFactory; -import io.trino.metadata.ExchangeHandleResolver; import io.trino.metadata.Split; import io.trino.spi.Page; import io.trino.spi.connector.SortOrder; @@ -95,7 +94,7 @@ public void setUp() new DirectExchangeClientConfig(), httpClient, executor, - new ExchangeManagerRegistry(new ExchangeHandleResolver())); + new ExchangeManagerRegistry()); orderingCompiler = new OrderingCompiler(new TypeOperators()); } diff --git a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManagerFactory.java b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManagerFactory.java index 8848834e7691..e8b724bb5658 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManagerFactory.java +++ b/core/trino-spi/src/main/java/io/trino/spi/exchange/ExchangeManagerFactory.java @@ -23,6 +23,4 @@ public interface ExchangeManagerFactory String getName(); ExchangeManager create(Map config); - - ExchangeManagerHandleResolver getHandleResolver(); } diff --git a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java index 2eaa0204a8b3..45af9f205faf 100644 --- a/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java +++ b/plugin/trino-exchange-filesystem/src/main/java/io/trino/plugin/exchange/filesystem/FileSystemExchangeManagerFactory.java @@ -19,9 +19,6 @@ import io.trino.plugin.base.jmx.PrefixObjectNameGeneratorModule; import io.trino.spi.exchange.ExchangeManager; import io.trino.spi.exchange.ExchangeManagerFactory; -import io.trino.spi.exchange.ExchangeManagerHandleResolver; -import io.trino.spi.exchange.ExchangeSinkInstanceHandle; -import io.trino.spi.exchange.ExchangeSourceHandle; import org.weakref.jmx.guice.MBeanModule; import java.util.Map; @@ -55,23 +52,4 @@ public ExchangeManager create(Map config) return injector.getInstance(FileSystemExchangeManager.class); } - - @Override - public ExchangeManagerHandleResolver getHandleResolver() - { - return new ExchangeManagerHandleResolver() - { - @Override - public Class getExchangeSinkInstanceHandleClass() - { - return FileSystemExchangeSinkInstanceHandle.class; - } - - @Override - public Class getExchangeSourceHandleHandleClass() - { - return FileSystemExchangeSourceHandle.class; - } - }; - } }