Skip to content

Commit

Permalink
Put S3 Select for CSV files behind an experimental flag
Browse files Browse the repository at this point in the history
S3 Select queries on CSV files are shown to have correctness
problems. JSON files can still be enabled/disabled using the
existing config and session properties.
  • Loading branch information
alexjo2144 committed Jul 21, 2023
1 parent 3e73bca commit eeda594
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 9 deletions.
6 changes: 5 additions & 1 deletion docs/src/main/sphinx/connector/hive-s3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ workload:
Considerations and limitations
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* Only objects stored in CSV and JSON format are supported. Objects can be uncompressed,
* Only objects stored in JSON format are supported. Objects can be uncompressed,
or optionally compressed with gzip or bzip2.
* The "AllowQuotedRecordDelimiters" property is not supported. If this property
is specified, the query fails.
Expand All @@ -416,6 +416,10 @@ pushed down to S3 Select. Changes in the Hive connector :ref:`performance tuning
configuration properties <hive-performance-tuning-configuration>` are likely to impact
S3 Select pushdown performance.

S3 Select can be enabled for TEXTFILE data using the
``hive.s3select-pushdown.experimental-textfile-pushdown-enabled`` configuration property,
however this has been shown to produce incorrect results. For more information see
`the GitHub Issue. <https://github.com/trinodb/trino/issues/17775>`_

Understanding and tuning the maximum connections
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Expand Down
5 changes: 4 additions & 1 deletion docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,10 @@ Hive connector documentation.
`Table Statistics <#table-statistics>`__ for details.
- ``true``
* - ``hive.s3select-pushdown.enabled``
- Enable query pushdown to AWS S3 Select service.
- Enable query pushdown to JSON files using the AWS S3 Select service.
- ``false``
* - ``hive.s3select-pushdown.experimental-textfile-pushdown-enabled``
- Enable query pushdown to TEXTFILE tables using the AWS S3 Select service.
- ``false``
* - ``hive.s3select-pushdown.max-connections``
- Maximum number of simultaneously open connections to S3 for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public class HiveConfig
private boolean collectColumnStatisticsOnWrite = true;

private boolean s3SelectPushdownEnabled;
private boolean s3SelectExperimentalPushdownEnabled;
private int s3SelectPushdownMaxConnections = 500;

private boolean isTemporaryStagingDirectoryEnabled = true;
Expand Down Expand Up @@ -1006,13 +1007,26 @@ public boolean isS3SelectPushdownEnabled()
}

@Config("hive.s3select-pushdown.enabled")
@ConfigDescription("Enable query pushdown to AWS S3 Select service")
@ConfigDescription("Enable query pushdown to JSON files using the AWS S3 Select service")
public HiveConfig setS3SelectPushdownEnabled(boolean s3SelectPushdownEnabled)
{
this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
return this;
}

public boolean isS3SelectExperimentalPushdownEnabled()
{
return s3SelectExperimentalPushdownEnabled;
}

@Config("hive.s3select-pushdown.experimental-textfile-pushdown-enabled")
@ConfigDescription("Enable query pushdown to TEXTFILE tables using the AWS S3 Select service")
public HiveConfig setS3SelectExperimentalPushdownEnabled(boolean s3SelectExperimentalPushdownEnabled)
{
this.s3SelectExperimentalPushdownEnabled = s3SelectExperimentalPushdownEnabled;
return this;
}

@Min(1)
public int getS3SelectPushdownMaxConnections()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.filesystem.Location;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.HiveConfig;
import io.trino.plugin.hive.HiveRecordCursorProvider;
import io.trino.plugin.hive.ReaderColumns;
import io.trino.plugin.hive.s3select.csv.S3SelectCsvRecordReader;
Expand All @@ -43,6 +44,7 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns;
import static io.trino.plugin.hive.s3select.S3SelectDataType.CSV;
import static io.trino.plugin.hive.type.TypeInfoUtils.getTypeInfosFromTypeString;
import static io.trino.plugin.hive.util.HiveUtil.getDeserializerClassName;
import static io.trino.plugin.hive.util.SerdeConstants.COLUMN_NAME_DELIMITER;
Expand All @@ -55,12 +57,14 @@ public class S3SelectRecordCursorProvider
{
private final HdfsEnvironment hdfsEnvironment;
private final TrinoS3ClientFactory s3ClientFactory;
private final boolean experimentalPushdownEnabled;

@Inject
public S3SelectRecordCursorProvider(HdfsEnvironment hdfsEnvironment, TrinoS3ClientFactory s3ClientFactory)
public S3SelectRecordCursorProvider(HdfsEnvironment hdfsEnvironment, TrinoS3ClientFactory s3ClientFactory, HiveConfig hiveConfig)
{
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.s3ClientFactory = requireNonNull(s3ClientFactory, "s3ClientFactory is null");
this.experimentalPushdownEnabled = hiveConfig.isS3SelectExperimentalPushdownEnabled();
}

@Override
Expand Down Expand Up @@ -106,9 +110,12 @@ public Optional<ReaderRecordCursorWithProjections> createRecordCursor(

if (s3SelectDataTypeOptional.isPresent()) {
S3SelectDataType s3SelectDataType = s3SelectDataTypeOptional.get();
if (s3SelectDataType == CSV && !experimentalPushdownEnabled) {
return Optional.empty();
}

Optional<String> nullCharacterEncoding = Optional.empty();
if (s3SelectDataType == S3SelectDataType.CSV) {
if (s3SelectDataType == CSV) {
nullCharacterEncoding = S3SelectCsvRecordReader.nullCharacterEncoding(schema);
}
IonSqlQueryBuilder queryBuilder = new IonSqlQueryBuilder(typeManager, s3SelectDataType, nullCharacterEncoding);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ protected QueryRunner createQueryRunner()
.put("hive.s3.streaming.part-size", HIVE_S3_STREAMING_PART_SIZE.toString())
// This is required to enable AWS Athena partition projection
.put("hive.partition-projection-enabled", "true")
.put("hive.s3select-pushdown.experimental-textfile-pushdown-enabled", "true")
.buildOrThrow())
.build();
}
Expand Down Expand Up @@ -1916,6 +1917,70 @@ public void testJsonS3SelectPushdownWithSpecialCharacters()
}
}

@Test
public void testS3SelectExperimentalPushdown()
{
// Demonstrate correctness issues which have resulted in pushdown for TEXTFILE
// using CSV support in S3 Select being put behind a separate "experimental" flag.
// TODO: https://github.com/trinodb/trino/issues/17775
Session usingAppendInserts = Session.builder(getSession())
.setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "APPEND")
.build();
List<String> values = ImmutableList.of(
"1, true, 11",
"2, true, 22",
"3, NULL, NULL",
"4, false, 44");
Session withS3SelectPushdown = Session.builder(getSession())
.setCatalogSessionProperty("hive", "s3_select_pushdown_enabled", "true")
.setCatalogSessionProperty("hive", "json_native_reader_enabled", "false")
.setCatalogSessionProperty("hive", "text_file_native_reader_enabled", "false")
.build();

Session withoutS3SelectPushdown = Session.builder(getSession())
.setCatalogSessionProperty("hive", "json_native_reader_enabled", "false")
.setCatalogSessionProperty("hive", "text_file_native_reader_enabled", "false")
.build();

try (TestTable table = new TestTable(
sql -> getQueryRunner().execute(usingAppendInserts, sql),
"hive.%s.test_s3_select_pushdown_experimental_features".formatted(HIVE_TEST_SCHEMA),
"(id INT, bool_t BOOLEAN, int_t INT) WITH (format = 'TEXTFILE')",
values)) {
assertQuery(withoutS3SelectPushdown, "SELECT id FROM " + table.getName() + " WHERE int_t IS NULL", "VALUES 3");
assertThat(query(withS3SelectPushdown, "SELECT id FROM " + table.getName() + " WHERE int_t IS NULL")).returnsEmptyResult();

assertQueryFails(
withS3SelectPushdown,
"SELECT id FROM " + table.getName() + " WHERE bool_t = true",
"S3 returned an error: Error casting:.*");
}

List<String> specialCharacterValues = ImmutableList.of(
"1, 'a,comma'",
"2, 'a|pipe'",
"3, 'an''escaped quote'",
"4, 'a~null encoding'");
try (TestTable table = new TestTable(
sql -> getQueryRunner().execute(usingAppendInserts, sql),
"hive.%s.test_s3_select_pushdown_special_characters".formatted(HIVE_TEST_SCHEMA),
"(id INT, string_t VARCHAR) WITH (format = 'TEXTFILE', textfile_field_separator=',', textfile_field_separator_escape='|', null_format='~')",
specialCharacterValues)) {
// These two should return a result, but incorrectly return nothing
String selectWithComma = "SELECT id FROM " + table.getName() + " WHERE string_t ='a,comma'";
assertQuery(withoutS3SelectPushdown, selectWithComma, "VALUES 1");
assertThat(query(withS3SelectPushdown, selectWithComma)).returnsEmptyResult();

String selectWithPipe = "SELECT id FROM " + table.getName() + " WHERE string_t ='a|pipe'";
assertQuery(withoutS3SelectPushdown, selectWithPipe, "VALUES 2");
assertThat(query(withS3SelectPushdown, selectWithPipe)).returnsEmptyResult();

// These two are actually correct
assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t ='an''escaped quote'", "VALUES 3");
assertS3SelectQuery("SELECT id FROM " + table.getName() + " WHERE string_t ='a~null encoding'", "VALUES 4");
}
}

private void assertS3SelectQuery(@Language("SQL") String query, @Language("SQL") String expectedValues)
{
Session withS3SelectPushdown = Session.builder(getSession())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public static Set<HivePageSourceFactory> getDefaultHivePageSourceFactories(HdfsE

public static Set<HiveRecordCursorProvider> getDefaultHiveRecordCursorProviders(HiveConfig hiveConfig, HdfsEnvironment hdfsEnvironment)
{
return ImmutableSet.of(new S3SelectRecordCursorProvider(hdfsEnvironment, new TrinoS3ClientFactory(hiveConfig)));
return ImmutableSet.of(new S3SelectRecordCursorProvider(hdfsEnvironment, new TrinoS3ClientFactory(hiveConfig), hiveConfig));
}

public static Set<HiveFileWriterFactory> getDefaultHiveFileWriterFactories(HiveConfig hiveConfig, HdfsEnvironment hdfsEnvironment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public void testDefaults()
.setIgnoreCorruptedStatistics(false)
.setCollectColumnStatisticsOnWrite(true)
.setS3SelectPushdownEnabled(false)
.setS3SelectExperimentalPushdownEnabled(false)
.setS3SelectPushdownMaxConnections(500)
.setTemporaryStagingDirectoryEnabled(true)
.setTemporaryStagingDirectoryPath("/tmp/presto-${USER}")
Expand Down Expand Up @@ -178,6 +179,7 @@ public void testExplicitPropertyMappings()
.put("hive.ignore-corrupted-statistics", "true")
.put("hive.collect-column-statistics-on-write", "false")
.put("hive.s3select-pushdown.enabled", "true")
.put("hive.s3select-pushdown.experimental-textfile-pushdown-enabled", "true")
.put("hive.s3select-pushdown.max-connections", "1234")
.put("hive.temporary-staging-directory-enabled", "false")
.put("hive.temporary-staging-directory-path", "updated")
Expand Down Expand Up @@ -261,6 +263,7 @@ public void testExplicitPropertyMappings()
.setIgnoreCorruptedStatistics(true)
.setCollectColumnStatisticsOnWrite(false)
.setS3SelectPushdownEnabled(true)
.setS3SelectExperimentalPushdownEnabled(true)
.setS3SelectPushdownMaxConnections(1234)
.setTemporaryStagingDirectoryEnabled(false)
.setTemporaryStagingDirectoryPath("updated")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive.s3;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.plugin.hive.containers.HiveHadoop;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.sql.TestTable;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.List;

import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static java.lang.String.format;

public class TestMinioS3SelectQueries
extends AbstractTestQueryFramework
{
private static final String HIVE_TEST_SCHEMA = "hive_datalake";
private static final DataSize HIVE_S3_STREAMING_PART_SIZE = DataSize.of(5, MEGABYTE);

private String bucketName;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
this.bucketName = "test-hive-insert-overwrite-" + randomNameSuffix();
HiveMinioDataLake hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(bucketName, HiveHadoop.HIVE3_IMAGE));
hiveMinioDataLake.start();
return S3HiveQueryRunner.builder(hiveMinioDataLake)
.setHiveProperties(
ImmutableMap.<String, String>builder()
.put("hive.non-managed-table-writes-enabled", "true")
.put("hive.metastore-cache-ttl", "1d")
.put("hive.metastore-refresh-interval", "1d")
.put("hive.s3.streaming.part-size", HIVE_S3_STREAMING_PART_SIZE.toString())
.buildOrThrow())
.build();
}

@BeforeClass
public void setUp()
{
computeActual(format(
"CREATE SCHEMA hive.%1$s WITH (location='s3a://%2$s/%1$s')",
HIVE_TEST_SCHEMA,
bucketName));
}

@Test
public void testTextfileQueries()
{
// Demonstrate correctness issues which have resulted in pushdown for TEXTFILE
// using CSV support in S3 Select being put behind a separate "experimental" flag.
// TODO: https://github.com/trinodb/trino/issues/17775
List<String> values = ImmutableList.of(
"1, true, 11",
"2, true, 22",
"3, NULL, NULL",
"4, false, 44");
Session withS3SelectPushdown = Session.builder(getSession())
.setCatalogSessionProperty("hive", "s3_select_pushdown_enabled", "true")
.setCatalogSessionProperty("hive", "json_native_reader_enabled", "false")
.setCatalogSessionProperty("hive", "text_file_native_reader_enabled", "false")
.build();
Session withoutS3SelectPushdown = Session.builder(getSession())
.setCatalogSessionProperty("hive", "json_native_reader_enabled", "false")
.setCatalogSessionProperty("hive", "text_file_native_reader_enabled", "false")
.build();
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"hive.%s.test_textfile_queries".formatted(HIVE_TEST_SCHEMA),
"(id INT, bool_t BOOLEAN, int_t INT) WITH (format = 'TEXTFILE')",
values)) {
assertQuery(withS3SelectPushdown, "SELECT id FROM " + table.getName() + " WHERE int_t IS NULL", "VALUES 3");
assertQuery(withS3SelectPushdown, "SELECT id FROM " + table.getName() + " WHERE bool_t = true", "VALUES 1, 2");
}

List<String> specialCharacterValues = ImmutableList.of(
"1, 'a,comma'",
"2, 'a|pipe'",
"3, 'an''escaped quote'",
"4, 'a~null encoding'");
try (TestTable table = new TestTable(
getQueryRunner()::execute,
"hive.%s.test_s3_select_pushdown_special_characters".formatted(HIVE_TEST_SCHEMA),
"(id INT, string_t VARCHAR) WITH (format = 'TEXTFILE', textfile_field_separator=',', textfile_field_separator_escape='|', null_format='~')",
specialCharacterValues)) {
String selectWithComma = "SELECT id FROM " + table.getName() + " WHERE string_t = 'a,comma'";
assertQuery(withoutS3SelectPushdown, selectWithComma, "VALUES 1");
assertQuery(withS3SelectPushdown, selectWithComma, "VALUES 1");

String selectWithPipe = "SELECT id FROM " + table.getName() + " WHERE string_t = 'a|pipe'";
assertQuery(withoutS3SelectPushdown, selectWithPipe, "VALUES 2");
assertQuery(withS3SelectPushdown, selectWithPipe, "VALUES 2");

String selectWithQuote = "SELECT id FROM " + table.getName() + " WHERE string_t = 'an''escaped quote'";
assertQuery(withoutS3SelectPushdown, selectWithQuote, "VALUES 3");
assertQuery(withS3SelectPushdown, selectWithQuote, "VALUES 3");

String selectWithNullFormatEncoding = "SELECT id FROM " + table.getName() + " WHERE string_t = 'a~null encoding'";
assertQuery(withoutS3SelectPushdown, selectWithNullFormatEncoding, "VALUES 4");
assertQuery(withS3SelectPushdown, selectWithNullFormatEncoding, "VALUES 4");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ protected QueryRunner createQueryRunner()
ImmutableMap.Builder<String, String> hiveProperties = ImmutableMap.builder();
hiveProperties.put("hive.s3.endpoint", bucketEndpoint);
hiveProperties.put("hive.non-managed-table-writes-enabled", "true");
hiveProperties.put("hive.s3select-pushdown.experimental-textfile-pushdown-enabled", "true");
return HiveQueryRunner.builder()
.setHiveProperties(hiveProperties.buildOrThrow())
.setInitialTables(ImmutableList.of())
Expand Down
Loading

0 comments on commit eeda594

Please sign in to comment.