Skip to content

Commit

Permalink
Cleanup config handling in PrestoS3ClientFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Sep 23, 2019
1 parent 7524a44 commit 35b2325
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.conf.Configuration;

import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;

import java.net.URI;
import java.util.Optional;
Expand Down Expand Up @@ -67,28 +68,37 @@
public class PrestoS3ClientFactory
{
private static final String S3_SELECT_PUSHDOWN_MAX_CONNECTIONS = "hive.s3select-pushdown.max-connections";
private static String s3UserAgentSuffix = "presto";

private final boolean enabled;
private final int defaultMaxConnections;

@GuardedBy("this")
private AmazonS3 s3Client;

synchronized AmazonS3 getS3Client(Configuration config, HiveConfig hiveConfig)
@Inject
public PrestoS3ClientFactory(HiveConfig config)
{
if (s3Client != null) {
return s3Client;
this.enabled = config.isS3SelectPushdownEnabled();
this.defaultMaxConnections = config.getS3SelectPushdownMaxConnections();
}

synchronized AmazonS3 getS3Client(Configuration config)
{
if (s3Client == null) {
s3Client = createS3Client(config);
}
return s3Client;
}

private AmazonS3 createS3Client(Configuration config)
{
HiveS3Config defaults = new HiveS3Config();
String userAgentPrefix = config.get(S3_USER_AGENT_PREFIX, defaults.getS3UserAgentPrefix());
int maxErrorRetries = config.getInt(S3_MAX_ERROR_RETRIES, defaults.getS3MaxErrorRetries());
boolean sslEnabled = config.getBoolean(S3_SSL_ENABLED, defaults.isS3SslEnabled());
Duration connectTimeout = Duration.valueOf(config.get(S3_CONNECT_TIMEOUT, defaults.getS3ConnectTimeout().toString()));
Duration socketTimeout = Duration.valueOf(config.get(S3_SOCKET_TIMEOUT, defaults.getS3SocketTimeout().toString()));
int maxConnections = config.getInt(S3_SELECT_PUSHDOWN_MAX_CONNECTIONS, hiveConfig.getS3SelectPushdownMaxConnections());

if (hiveConfig.isS3SelectPushdownEnabled()) {
s3UserAgentSuffix = "presto-select";
}
int maxConnections = config.getInt(S3_SELECT_PUSHDOWN_MAX_CONNECTIONS, defaultMaxConnections);

ClientConfiguration clientConfiguration = new ClientConfiguration()
.withMaxErrorRetry(maxErrorRetries)
Expand All @@ -97,7 +107,7 @@ synchronized AmazonS3 getS3Client(Configuration config, HiveConfig hiveConfig)
.withSocketTimeout(toIntExact(socketTimeout.toMillis()))
.withMaxConnections(maxConnections)
.withUserAgentPrefix(userAgentPrefix)
.withUserAgentSuffix(s3UserAgentSuffix);
.withUserAgentSuffix(enabled ? "presto-select" : "presto");

PrestoS3FileSystemStats stats = new PrestoS3FileSystemStats();
RequestMetricCollector metricCollector = new PrestoS3FileSystemMetricCollector(stats);
Expand Down Expand Up @@ -134,11 +144,10 @@ synchronized AmazonS3 getS3Client(Configuration config, HiveConfig hiveConfig)
clientBuilder.setForceGlobalBucketAccessEnabled(true);
}

s3Client = clientBuilder.build();
return s3Client;
return clientBuilder.build();
}

private AWSCredentialsProvider getAwsCredentialsProvider(Configuration conf, HiveS3Config defaults)
private static AWSCredentialsProvider getAwsCredentialsProvider(Configuration conf, HiveS3Config defaults)
{
Optional<AWSCredentials> credentials = getAwsCredentials(conf);
if (credentials.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.amazonaws.services.s3.model.SelectObjectContentEventVisitor;
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
import com.amazonaws.services.s3.model.SelectObjectContentResult;
import io.prestosql.plugin.hive.HiveConfig;
import org.apache.hadoop.conf.Configuration;

import java.io.Closeable;
Expand All @@ -35,12 +34,11 @@ class PrestoS3SelectClient
private SelectObjectContentRequest selectObjectRequest;
private SelectObjectContentResult selectObjectContentResult;

public PrestoS3SelectClient(Configuration configuration, HiveConfig hiveConfig, PrestoS3ClientFactory s3ClientFactory)
public PrestoS3SelectClient(Configuration configuration, PrestoS3ClientFactory s3ClientFactory)
{
requireNonNull(configuration, "configuration is null");
requireNonNull(hiveConfig, "hiveConfig is null");
requireNonNull(s3ClientFactory, "s3ClientFactory is null");
this.s3Client = s3ClientFactory.getS3Client(configuration, hiveConfig);
this.s3Client = s3ClientFactory.getS3Client(configuration);
}

public InputStream getRecordsContent(SelectObjectContentRequest selectObjectRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.amazonaws.services.s3.model.InputSerialization;
import com.amazonaws.services.s3.model.OutputSerialization;
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.s3.PrestoS3FileSystem;
import io.prestosql.spi.PrestoException;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -51,15 +50,14 @@ class S3SelectCsvRecordReader

public S3SelectCsvRecordReader(
Configuration configuration,
HiveConfig hiveConfig,
Path path,
long start,
long length,
Properties schema,
String ionSqlQuery,
PrestoS3ClientFactory s3ClientFactory)
{
super(configuration, hiveConfig, path, start, length, schema, ionSqlQuery, s3ClientFactory);
super(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Closer;
import io.airlift.units.Duration;
import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.s3.HiveS3Config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -75,7 +74,6 @@ public abstract class S3SelectLineRecordReader

S3SelectLineRecordReader(
Configuration configuration,
HiveConfig hiveConfig,
Path path,
long start,
long length,
Expand All @@ -84,7 +82,6 @@ public abstract class S3SelectLineRecordReader
PrestoS3ClientFactory s3ClientFactory)
{
requireNonNull(configuration, "configuration is null");
requireNonNull(hiveConfig, "hiveConfig is null");
requireNonNull(schema, "schema is null");
requireNonNull(path, "path is null");
requireNonNull(ionSqlQuery, "ionSqlQuery is null");
Expand All @@ -105,7 +102,7 @@ public abstract class S3SelectLineRecordReader
this.maxBackoffTime = Duration.valueOf(configuration.get(S3_MAX_BACKOFF_TIME, defaults.getS3MaxBackoffTime().toString()));
this.maxRetryTime = Duration.valueOf(configuration.get(S3_MAX_RETRY_TIME, defaults.getS3MaxRetryTime().toString()));

this.selectClient = new PrestoS3SelectClient(configuration, hiveConfig, s3ClientFactory);
this.selectClient = new PrestoS3SelectClient(configuration, s3ClientFactory);
closer.register(selectClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.google.common.collect.ImmutableSet;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HiveConfig;
import io.prestosql.plugin.hive.HiveRecordCursorProvider;
import io.prestosql.plugin.hive.IonSqlQueryBuilder;
import io.prestosql.spi.PrestoException;
Expand Down Expand Up @@ -46,17 +45,12 @@ public class S3SelectRecordCursorProvider
{
private static final Set<String> CSV_SERDES = ImmutableSet.of(LazySimpleSerDe.class.getName());
private final HdfsEnvironment hdfsEnvironment;
private final HiveConfig hiveConfig;
private final PrestoS3ClientFactory s3ClientFactory;

@Inject
public S3SelectRecordCursorProvider(
HdfsEnvironment hdfsEnvironment,
HiveConfig hiveConfig,
PrestoS3ClientFactory s3ClientFactory)
public S3SelectRecordCursorProvider(HdfsEnvironment hdfsEnvironment, PrestoS3ClientFactory s3ClientFactory)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hiveConfig = requireNonNull(hiveConfig, "hiveConfig is null");
this.s3ClientFactory = requireNonNull(s3ClientFactory, "s3ClientFactory is null");
}

Expand Down Expand Up @@ -90,7 +84,7 @@ public Optional<RecordCursor> createRecordCursor(
if (CSV_SERDES.contains(serdeName)) {
IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager);
String ionSqlQuery = queryBuilder.buildSql(columns, effectivePredicate);
S3SelectLineRecordReader recordReader = new S3SelectCsvRecordReader(configuration, hiveConfig, path, start, length, schema, ionSqlQuery, s3ClientFactory);
S3SelectLineRecordReader recordReader = new S3SelectCsvRecordReader(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory);
return Optional.of(new S3SelectRecordCursor<>(configuration, path, recordReader, length, schema, columns, hiveStorageTimeZone, typeManager));
}

Expand Down

0 comments on commit 35b2325

Please sign in to comment.