Skip to content

Commit

Permalink
Allow to override binding of Hdfs filesystem for caching
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Aug 6, 2020
1 parent 4b47fff commit ff9f050
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,9 @@
*/
package io.prestosql.plugin.hive.rubix;

import javax.inject.Qualifier;
import io.prestosql.plugin.hive.ConfigurationInitializer;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForRubix
public interface RubixHdfsInitializer
extends ConfigurationInitializer
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.qubole.rubix.core.CachingFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoAdlFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoAzureBlobFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoDistributedFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoGoogleHadoopFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoNativeAzureFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoS3FileSystem;
Expand All @@ -32,7 +31,6 @@
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.prestosql.plugin.base.CatalogName;
import io.prestosql.plugin.hive.ConfigurationInitializer;
import io.prestosql.plugin.hive.HdfsConfigurationInitializer;
import io.prestosql.plugin.hive.util.RetryDriver;
import io.prestosql.spi.HostAddress;
Expand Down Expand Up @@ -91,8 +89,6 @@ public class RubixInitializer

private static final String RUBIX_GS_FS_CLASS_NAME = CachingPrestoGoogleHadoopFileSystem.class.getName();

private static final String RUBIX_DISTRIBUTED_FS_CLASS_NAME = CachingPrestoDistributedFileSystem.class.getName();

private static final RetryDriver DEFAULT_COORDINATOR_RETRY_DRIVER = retry()
// unlimited attempts
.maxAttempts(MAX_VALUE)
Expand All @@ -116,7 +112,7 @@ public class RubixInitializer
private final NodeManager nodeManager;
private final CatalogName catalogName;
private final HdfsConfigurationInitializer hdfsConfigurationInitializer;
private final Optional<ConfigurationInitializer> extraConfigInitializer;
private final RubixHdfsInitializer rubixHdfsInitializer;

private volatile boolean cacheReady;
@Nullable
Expand All @@ -130,9 +126,9 @@ public RubixInitializer(
NodeManager nodeManager,
CatalogName catalogName,
HdfsConfigurationInitializer hdfsConfigurationInitializer,
@ForRubix Optional<ConfigurationInitializer> extraConfigInitializer)
RubixHdfsInitializer rubixHdfsInitializer)
{
this(DEFAULT_COORDINATOR_RETRY_DRIVER, rubixConfig, nodeManager, catalogName, hdfsConfigurationInitializer, extraConfigInitializer);
this(DEFAULT_COORDINATOR_RETRY_DRIVER, rubixConfig, nodeManager, catalogName, hdfsConfigurationInitializer, rubixHdfsInitializer);
}

@VisibleForTesting
Expand All @@ -142,7 +138,7 @@ public RubixInitializer(
NodeManager nodeManager,
CatalogName catalogName,
HdfsConfigurationInitializer hdfsConfigurationInitializer,
Optional<ConfigurationInitializer> extraConfigInitializer)
RubixHdfsInitializer rubixHdfsInitializer)
{
this.coordinatorRetryDriver = coordinatorRetryDriver;
this.startServerOnCoordinator = rubixConfig.isStartServerOnCoordinator();
Expand All @@ -155,7 +151,7 @@ public RubixInitializer(
this.nodeManager = nodeManager;
this.catalogName = catalogName;
this.hdfsConfigurationInitializer = hdfsConfigurationInitializer;
this.extraConfigInitializer = extraConfigInitializer;
this.rubixHdfsInitializer = rubixHdfsInitializer;
}

void initializeRubix()
Expand Down Expand Up @@ -312,8 +308,6 @@ private void updateRubixConfiguration(Configuration config)

config.set("fs.gs.impl", RUBIX_GS_FS_CLASS_NAME);

config.set("fs.hdfs.impl", RUBIX_DISTRIBUTED_FS_CLASS_NAME);

extraConfigInitializer.ifPresent(initializer -> initializer.initializeConfiguration(config));
rubixHdfsInitializer.initializeConfiguration(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
*/
package io.prestosql.plugin.hive.rubix;

import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.prestosql.plugin.hive.ConfigurationInitializer;
import com.qubole.rubix.prestosql.CachingPrestoDistributedFileSystem;
import io.prestosql.plugin.hive.DynamicConfigurationProvider;
import org.apache.hadoop.conf.Configuration;

import java.util.Set;

Expand All @@ -31,6 +32,8 @@
public class RubixModule
implements Module
{
private static final String RUBIX_DISTRIBUTED_FS_CLASS_NAME = CachingPrestoDistributedFileSystem.class.getName();

@Override
public void configure(Binder binder)
{
Expand All @@ -43,7 +46,8 @@ public void configure(Binder binder)
// dependency for many objects) whenever initialization error happens
// (Guice doesn't fail-fast)
binder.bind(RubixStarter.class).asEagerSingleton();
newOptionalBinder(binder, Key.get(ConfigurationInitializer.class, ForRubix.class));
newOptionalBinder(binder, RubixHdfsInitializer.class)
.setDefault().to(DefaultRubixHdfsInitializer.class).in(Scopes.SINGLETON);
newSetBinder(binder, DynamicConfigurationProvider.class).addBinding().to(RubixConfigurationInitializer.class).in(Scopes.SINGLETON);
}

Expand All @@ -56,4 +60,15 @@ private RubixStarter(RubixInitializer rubixInitializer, Set<DynamicConfiguration
rubixInitializer.initializeRubix();
}
}

@VisibleForTesting
static class DefaultRubixHdfsInitializer
implements RubixHdfsInitializer
{
@Override
public void initializeConfiguration(Configuration config)
{
config.set("fs.hdfs.impl", RUBIX_DISTRIBUTED_FS_CLASS_NAME);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.prestosql.plugin.hive.authentication.NoHdfsAuthentication;
import io.prestosql.plugin.hive.orc.OrcReaderConfig;
import io.prestosql.plugin.hive.rubix.RubixConfig.ReadMode;
import io.prestosql.plugin.hive.rubix.RubixModule.DefaultRubixHdfsInitializer;
import io.prestosql.spi.Node;
import io.prestosql.spi.session.PropertyMetadata;
import io.prestosql.testing.TestingConnectorSession;
Expand All @@ -63,7 +64,6 @@
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -187,7 +187,7 @@ private void initializeRubix(RubixConfig rubixConfig, List<Node> nodes)
nodeManager,
new CatalogName("catalog"),
configurationInitializer,
Optional.empty());
new DefaultRubixHdfsInitializer());
rubixConfigInitializer = new RubixConfigurationInitializer(rubixInitializer);
rubixInitializer.initializeRubix();
retry().run("wait for rubix to startup", () -> {
Expand Down Expand Up @@ -303,7 +303,7 @@ public void testCoordinatorNotJoining()
new TestingNodeManager(ImmutableList.of(workerNode)),
new CatalogName("catalog"),
configurationInitializer,
Optional.empty());
new DefaultRubixHdfsInitializer());
assertThatThrownBy(rubixInitializer::initializeRubix)
.hasMessage("No coordinator node available");
}
Expand Down

0 comments on commit ff9f050

Please sign in to comment.