From 35b2325e285bbd8c6d5e140864a125fca3ed14b8 Mon Sep 17 00:00:00 2001 From: David Phillips Date: Sat, 7 Sep 2019 22:10:23 -0700 Subject: [PATCH] Cleanup config handling in PrestoS3ClientFactory --- .../hive/s3select/PrestoS3ClientFactory.java | 35 ++++++++++++------- .../hive/s3select/PrestoS3SelectClient.java | 6 ++-- .../s3select/S3SelectCsvRecordReader.java | 4 +-- .../s3select/S3SelectLineRecordReader.java | 5 +-- .../S3SelectRecordCursorProvider.java | 10 ++---- 5 files changed, 28 insertions(+), 32 deletions(-) diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/PrestoS3ClientFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/PrestoS3ClientFactory.java index 5f0cf83a3454..c5c431e58b03 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/PrestoS3ClientFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/PrestoS3ClientFactory.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import javax.annotation.concurrent.GuardedBy; +import javax.inject.Inject; import java.net.URI; import java.util.Optional; @@ -67,28 +68,37 @@ public class PrestoS3ClientFactory { private static final String S3_SELECT_PUSHDOWN_MAX_CONNECTIONS = "hive.s3select-pushdown.max-connections"; - private static String s3UserAgentSuffix = "presto"; + + private final boolean enabled; + private final int defaultMaxConnections; @GuardedBy("this") private AmazonS3 s3Client; - synchronized AmazonS3 getS3Client(Configuration config, HiveConfig hiveConfig) + @Inject + public PrestoS3ClientFactory(HiveConfig config) { - if (s3Client != null) { - return s3Client; + this.enabled = config.isS3SelectPushdownEnabled(); + this.defaultMaxConnections = config.getS3SelectPushdownMaxConnections(); + } + + synchronized AmazonS3 getS3Client(Configuration config) + { + if (s3Client == null) { + s3Client = createS3Client(config); } + return s3Client; + } + private AmazonS3 createS3Client(Configuration config) + { HiveS3Config defaults = new HiveS3Config(); String userAgentPrefix = config.get(S3_USER_AGENT_PREFIX, defaults.getS3UserAgentPrefix()); int maxErrorRetries = config.getInt(S3_MAX_ERROR_RETRIES, defaults.getS3MaxErrorRetries()); boolean sslEnabled = config.getBoolean(S3_SSL_ENABLED, defaults.isS3SslEnabled()); Duration connectTimeout = Duration.valueOf(config.get(S3_CONNECT_TIMEOUT, defaults.getS3ConnectTimeout().toString())); Duration socketTimeout = Duration.valueOf(config.get(S3_SOCKET_TIMEOUT, defaults.getS3SocketTimeout().toString())); - int maxConnections = config.getInt(S3_SELECT_PUSHDOWN_MAX_CONNECTIONS, hiveConfig.getS3SelectPushdownMaxConnections()); - - if (hiveConfig.isS3SelectPushdownEnabled()) { - s3UserAgentSuffix = "presto-select"; - } + int maxConnections = config.getInt(S3_SELECT_PUSHDOWN_MAX_CONNECTIONS, defaultMaxConnections); ClientConfiguration clientConfiguration = new ClientConfiguration() .withMaxErrorRetry(maxErrorRetries) @@ -97,7 +107,7 @@ synchronized AmazonS3 getS3Client(Configuration config, HiveConfig hiveConfig) .withSocketTimeout(toIntExact(socketTimeout.toMillis())) .withMaxConnections(maxConnections) .withUserAgentPrefix(userAgentPrefix) - .withUserAgentSuffix(s3UserAgentSuffix); + .withUserAgentSuffix(enabled ? "presto-select" : "presto"); PrestoS3FileSystemStats stats = new PrestoS3FileSystemStats(); RequestMetricCollector metricCollector = new PrestoS3FileSystemMetricCollector(stats); @@ -134,11 +144,10 @@ synchronized AmazonS3 getS3Client(Configuration config, HiveConfig hiveConfig) clientBuilder.setForceGlobalBucketAccessEnabled(true); } - s3Client = clientBuilder.build(); - return s3Client; + return clientBuilder.build(); } - private AWSCredentialsProvider getAwsCredentialsProvider(Configuration conf, HiveS3Config defaults) + private static AWSCredentialsProvider getAwsCredentialsProvider(Configuration conf, HiveS3Config defaults) { Optional credentials = getAwsCredentials(conf); if (credentials.isPresent()) { diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/PrestoS3SelectClient.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/PrestoS3SelectClient.java index d4bc2e79b84c..e67a65d11241 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/PrestoS3SelectClient.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/PrestoS3SelectClient.java @@ -17,7 +17,6 @@ import com.amazonaws.services.s3.model.SelectObjectContentEventVisitor; import com.amazonaws.services.s3.model.SelectObjectContentRequest; import com.amazonaws.services.s3.model.SelectObjectContentResult; -import io.prestosql.plugin.hive.HiveConfig; import org.apache.hadoop.conf.Configuration; import java.io.Closeable; @@ -35,12 +34,11 @@ class PrestoS3SelectClient private SelectObjectContentRequest selectObjectRequest; private SelectObjectContentResult selectObjectContentResult; - public PrestoS3SelectClient(Configuration configuration, HiveConfig hiveConfig, PrestoS3ClientFactory s3ClientFactory) + public PrestoS3SelectClient(Configuration configuration, PrestoS3ClientFactory s3ClientFactory) { requireNonNull(configuration, "configuration is null"); - requireNonNull(hiveConfig, "hiveConfig is null"); requireNonNull(s3ClientFactory, "s3ClientFactory is null"); - this.s3Client = s3ClientFactory.getS3Client(configuration, hiveConfig); + this.s3Client = s3ClientFactory.getS3Client(configuration); } public InputStream getRecordsContent(SelectObjectContentRequest selectObjectRequest) diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectCsvRecordReader.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectCsvRecordReader.java index 20dac9f1b07f..d2108e2fff1f 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectCsvRecordReader.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectCsvRecordReader.java @@ -20,7 +20,6 @@ import com.amazonaws.services.s3.model.InputSerialization; import com.amazonaws.services.s3.model.OutputSerialization; import com.amazonaws.services.s3.model.SelectObjectContentRequest; -import io.prestosql.plugin.hive.HiveConfig; import io.prestosql.plugin.hive.s3.PrestoS3FileSystem; import io.prestosql.spi.PrestoException; import org.apache.hadoop.conf.Configuration; @@ -51,7 +50,6 @@ class S3SelectCsvRecordReader public S3SelectCsvRecordReader( Configuration configuration, - HiveConfig hiveConfig, Path path, long start, long length, @@ -59,7 +57,7 @@ public S3SelectCsvRecordReader( String ionSqlQuery, PrestoS3ClientFactory s3ClientFactory) { - super(configuration, hiveConfig, path, start, length, schema, ionSqlQuery, s3ClientFactory); + super(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory); } @Override diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectLineRecordReader.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectLineRecordReader.java index bfe93390d340..f7cca2a7e584 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectLineRecordReader.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectLineRecordReader.java @@ -18,7 +18,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Closer; import io.airlift.units.Duration; -import io.prestosql.plugin.hive.HiveConfig; import io.prestosql.plugin.hive.s3.HiveS3Config; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -75,7 +74,6 @@ public abstract class S3SelectLineRecordReader S3SelectLineRecordReader( Configuration configuration, - HiveConfig hiveConfig, Path path, long start, long length, @@ -84,7 +82,6 @@ public abstract class S3SelectLineRecordReader PrestoS3ClientFactory s3ClientFactory) { requireNonNull(configuration, "configuration is null"); - requireNonNull(hiveConfig, "hiveConfig is null"); requireNonNull(schema, "schema is null"); requireNonNull(path, "path is null"); requireNonNull(ionSqlQuery, "ionSqlQuery is null"); @@ -105,7 +102,7 @@ public abstract class S3SelectLineRecordReader this.maxBackoffTime = Duration.valueOf(configuration.get(S3_MAX_BACKOFF_TIME, defaults.getS3MaxBackoffTime().toString())); this.maxRetryTime = Duration.valueOf(configuration.get(S3_MAX_RETRY_TIME, defaults.getS3MaxRetryTime().toString())); - this.selectClient = new PrestoS3SelectClient(configuration, hiveConfig, s3ClientFactory); + this.selectClient = new PrestoS3SelectClient(configuration, s3ClientFactory); closer.register(selectClient); } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectRecordCursorProvider.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectRecordCursorProvider.java index 4f84f541b64a..3cb2a42e25f0 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectRecordCursorProvider.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/s3select/S3SelectRecordCursorProvider.java @@ -16,7 +16,6 @@ import com.google.common.collect.ImmutableSet; import io.prestosql.plugin.hive.HdfsEnvironment; import io.prestosql.plugin.hive.HiveColumnHandle; -import io.prestosql.plugin.hive.HiveConfig; import io.prestosql.plugin.hive.HiveRecordCursorProvider; import io.prestosql.plugin.hive.IonSqlQueryBuilder; import io.prestosql.spi.PrestoException; @@ -46,17 +45,12 @@ public class S3SelectRecordCursorProvider { private static final Set CSV_SERDES = ImmutableSet.of(LazySimpleSerDe.class.getName()); private final HdfsEnvironment hdfsEnvironment; - private final HiveConfig hiveConfig; private final PrestoS3ClientFactory s3ClientFactory; @Inject - public S3SelectRecordCursorProvider( - HdfsEnvironment hdfsEnvironment, - HiveConfig hiveConfig, - PrestoS3ClientFactory s3ClientFactory) + public S3SelectRecordCursorProvider(HdfsEnvironment hdfsEnvironment, PrestoS3ClientFactory s3ClientFactory) { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); - this.hiveConfig = requireNonNull(hiveConfig, "hiveConfig is null"); this.s3ClientFactory = requireNonNull(s3ClientFactory, "s3ClientFactory is null"); } @@ -90,7 +84,7 @@ public Optional createRecordCursor( if (CSV_SERDES.contains(serdeName)) { IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager); String ionSqlQuery = queryBuilder.buildSql(columns, effectivePredicate); - S3SelectLineRecordReader recordReader = new S3SelectCsvRecordReader(configuration, hiveConfig, path, start, length, schema, ionSqlQuery, s3ClientFactory); + S3SelectLineRecordReader recordReader = new S3SelectCsvRecordReader(configuration, path, start, length, schema, ionSqlQuery, s3ClientFactory); return Optional.of(new S3SelectRecordCursor<>(configuration, path, recordReader, length, schema, columns, hiveStorageTimeZone, typeManager)); }