From f3d8ffc9355512af39ff8bae4197e2effc1a7ca6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 21 Jul 2020 14:01:11 +0200 Subject: [PATCH 1/3] Correct parameter name --- .../prestosql/tests/hive/BucketingType.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/presto-product-tests/src/main/java/io/prestosql/tests/hive/BucketingType.java b/presto-product-tests/src/main/java/io/prestosql/tests/hive/BucketingType.java index ce8a223d6424..d8c9ff5ab498 100644 --- a/presto-product-tests/src/main/java/io/prestosql/tests/hive/BucketingType.java +++ b/presto-product-tests/src/main/java/io/prestosql/tests/hive/BucketingType.java @@ -24,7 +24,7 @@ public enum BucketingType { NONE { @Override - public String getHiveClustering(String columnNames, int buckets) + public String getHiveClustering(String columnName, int buckets) { return ""; } @@ -38,9 +38,9 @@ public List getHiveTableProperties() BUCKETED_DEFAULT { @Override - public String getHiveClustering(String columnNames, int buckets) + public String getHiveClustering(String columnName, int buckets) { - return defaultHiveClustering(columnNames, buckets); + return defaultHiveClustering(columnName, buckets); } @Override @@ -52,9 +52,9 @@ public List getHiveTableProperties() BUCKETED_V1 { @Override - public String getHiveClustering(String columnNames, int buckets) + public String getHiveClustering(String columnName, int buckets) { - return defaultHiveClustering(columnNames, buckets); + return defaultHiveClustering(columnName, buckets); } @Override @@ -66,9 +66,9 @@ public List getHiveTableProperties() BUCKETED_V2 { @Override - public String getHiveClustering(String columnNames, int buckets) + public String getHiveClustering(String columnName, int buckets) { - return defaultHiveClustering(columnNames, buckets); + return defaultHiveClustering(columnName, buckets); } @Override @@ -83,9 +83,9 @@ public List getHiveTableProperties() public abstract List getHiveTableProperties(); - private static String defaultHiveClustering(String columnNames, int buckets) + private static String defaultHiveClustering(String columnName, int buckets) { - requireNonNull(columnNames, "columnNames is null"); - return format("CLUSTERED BY(%s) INTO %s BUCKETS", columnNames, buckets); + requireNonNull(columnName, "columnName is null"); + return format("CLUSTERED BY(%s) INTO %s BUCKETS", columnName, buckets); } } From 8cea82bf2cd31d18aec793e930a60570d1325611 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Tue, 21 Jul 2020 20:42:17 +0200 Subject: [PATCH 2/3] Improve exception message --- .../io/prestosql/tests/hive/TestHiveTransactionalTable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java b/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java index 655411bec06e..afa632b5891e 100644 --- a/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java +++ b/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java @@ -100,7 +100,7 @@ public void testReadFullAcidBucketedV2() private void doTestReadFullAcid(boolean isPartitioned, BucketingType bucketingType) { if (getHiveVersionMajor() < 3) { - throw new SkipException("Presto Hive transactional tables are supported with Hive version 3 or above"); + throw new SkipException("Hive transactional tables are supported with Hive version 3 or above"); } try (TemporaryHiveTable table = TemporaryHiveTable.temporaryHiveTable(tableName("read_full_acid", isPartitioned, bucketingType))) { @@ -147,7 +147,7 @@ private void doTestReadFullAcid(boolean isPartitioned, BucketingType bucketingTy public void testReadInsertOnly(boolean isPartitioned, BucketingType bucketingType) { if (getHiveVersionMajor() < 3) { - throw new SkipException("Presto Hive transactional tables are supported with Hive version 3 or above"); + throw new SkipException("Hive transactional tables are supported with Hive version 3 or above"); } try (TemporaryHiveTable table = TemporaryHiveTable.temporaryHiveTable(tableName("insert_only", isPartitioned, bucketingType))) { From b1050f02cccbb46ef72d42f332e0b8e6e2da445e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Mon, 20 Jul 2020 12:15:39 +0200 Subject: [PATCH 3/3] Support CTAS for Hive transactional tables --- .../plugin/hive/HiveInsertTableHandle.java | 3 +- .../prestosql/plugin/hive/HiveMetadata.java | 33 ++++++++-- .../plugin/hive/HiveOutputTableHandle.java | 4 +- .../plugin/hive/HivePageSinkProvider.java | 1 + .../plugin/hive/HiveTableProperties.java | 10 ++- .../plugin/hive/HiveWritableTableHandle.java | 11 +++- .../plugin/hive/HiveWriterFactory.java | 46 +++++++++++--- .../hive/TestHiveIntegrationSmokeTest.java | 3 +- .../plugin/hive/TestHivePageSink.java | 1 + .../plugin/hive/TestHiveWriterFactory.java | 8 ++- .../prestosql/tests/hive/BucketingType.java | 34 ++++++++++ .../hive/TestHiveTransactionalTable.java | 63 +++++++++++++++++++ .../tests/hive/TransactionalTableType.java | 14 +++++ 13 files changed, 212 insertions(+), 19 deletions(-) diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveInsertTableHandle.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveInsertTableHandle.java index 49603e4d7584..7d4985b537de 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveInsertTableHandle.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveInsertTableHandle.java @@ -44,6 +44,7 @@ public HiveInsertTableHandle( locationHandle, bucketProperty, tableStorageFormat, - partitionStorageFormat); + partitionStorageFormat, + false); } } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java index 4febe2e8edf7..1a8b7ee42ea2 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveMetadata.java @@ -198,6 +198,7 @@ import static io.prestosql.plugin.hive.HiveTableProperties.getOrcBloomFilterFpp; import static io.prestosql.plugin.hive.HiveTableProperties.getPartitionedBy; import static io.prestosql.plugin.hive.HiveTableProperties.getSingleCharacterProperty; +import static io.prestosql.plugin.hive.HiveTableProperties.isTransactional; import static io.prestosql.plugin.hive.HiveType.HIVE_STRING; import static io.prestosql.plugin.hive.HiveType.toHiveType; import static io.prestosql.plugin.hive.HiveWriterFactory.computeBucketedFileName; @@ -245,6 +246,7 @@ import static io.prestosql.spi.type.BigintType.BIGINT; import static io.prestosql.spi.type.TypeUtils.isFloatingPointNaN; import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType; +import static java.lang.Boolean.parseBoolean; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -588,6 +590,12 @@ private ConnectorTableMetadata doGetTableMetadata(ConnectorSession session, Sche properties.put(SORTED_BY_PROPERTY, property.getSortedBy()); }); + // Transactional properties + String transactionalProperty = table.getParameters().get(HiveMetadata.TRANSACTIONAL); + if (parseBoolean(transactionalProperty)) { + properties.put(HiveTableProperties.TRANSACTIONAL, true); + } + // ORC format specific properties String orcBloomFilterColumns = table.getParameters().get(ORC_BLOOM_FILTER_COLUMNS_KEY); if (orcBloomFilterColumns != null) { @@ -883,7 +891,9 @@ private Map getEmptyTableProperties(ConnectorTableMetadata table // When metastore is configured with metastore.create.as.acid=true, it will also change Presto-created tables // behind the scenes. In particular, this won't work with CTAS. // TODO (https://github.com/prestosql/presto/issues/1956) convert this into normal table property - tableProperties.put(TRANSACTIONAL, "false"); + + boolean transactional = HiveTableProperties.isTransactional(tableMetadata.getProperties()).orElse(false); + tableProperties.put(TRANSACTIONAL, String.valueOf(transactional)); bucketProperty.ifPresent(hiveBucketProperty -> tableProperties.put(BUCKETING_VERSION, Integer.toString(hiveBucketProperty.getBucketingVersion().getVersion()))); @@ -1274,6 +1284,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto HiveStorageFormat tableStorageFormat = getHiveStorageFormat(tableMetadata.getProperties()); List partitionedBy = getPartitionedBy(tableMetadata.getProperties()); Optional bucketProperty = getBucketProperty(tableMetadata.getProperties()); + boolean transactional = isTransactional(tableMetadata.getProperties()).orElse(false); // get the root directory for the database SchemaTableName schemaTableName = tableMetadata.getTable(); @@ -1309,6 +1320,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto bucketProperty, session.getUser(), tableProperties, + transactional, externalLocation.isPresent()); WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle); @@ -1346,7 +1358,7 @@ public Optional finishCreateTable(ConnectorSession sess partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates); if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { - List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, partitionUpdates); + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, true, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { @@ -1404,6 +1416,7 @@ private List computePartitionUpdatesForMissingBuckets( ConnectorSession session, HiveWritableTableHandle handle, Table table, + boolean isCreateTable, List partitionUpdates) { ImmutableList.Builder partitionUpdatesForMissingBucketsBuilder = ImmutableList.builder(); @@ -1417,6 +1430,7 @@ private List computePartitionUpdatesForMissingBuckets( storageFormat, partitionUpdate.getTargetPath(), bucketCount, + isCreateTable && handle.isTransactional(), partitionUpdate); partitionUpdatesForMissingBucketsBuilder.add(new PartitionUpdate( partitionUpdate.getName(), @@ -1437,6 +1451,7 @@ private List computeFileNamesForMissingBuckets( HiveStorageFormat storageFormat, Path targetPath, int bucketCount, + boolean transactionalCreateTable, PartitionUpdate partitionUpdate) { if (partitionUpdate.getFileNames().size() == bucketCount) { @@ -1450,7 +1465,13 @@ private List computeFileNamesForMissingBuckets( Set fileNames = ImmutableSet.copyOf(partitionUpdate.getFileNames()); ImmutableList.Builder missingFileNamesBuilder = ImmutableList.builder(); for (int i = 0; i < bucketCount; i++) { - String fileName = computeBucketedFileName(session.getQueryId(), i) + fileExtension; + String fileName; + if (transactionalCreateTable) { + fileName = computeBucketedFileName(Optional.empty(), i) + fileExtension; + } + else { + fileName = computeBucketedFileName(Optional.of(session.getQueryId()), i) + fileExtension; + } if (!fileNames.contains(fileName)) { missingFileNamesBuilder.add(fileName); } @@ -1562,7 +1583,7 @@ public Optional finishInsert(ConnectorSession session, } if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) { - List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, partitionUpdates); + List partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, false, partitionUpdates); // replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets)); for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) { @@ -2344,6 +2365,10 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector if (!isCollectColumnStatisticsOnWrite(session)) { return TableStatisticsMetadata.empty(); } + if (isTransactional(tableMetadata.getProperties()).orElse(false)) { + // TODO(https://github.com/prestosql/presto/issues/1956) updating table statistics for trasactional not supported right now. + return TableStatisticsMetadata.empty(); + } List partitionedBy = firstNonNull(getPartitionedBy(tableMetadata.getProperties()), ImmutableList.of()); return getStatisticsCollectionMetadata(tableMetadata.getColumns(), partitionedBy, Optional.empty(), false); } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveOutputTableHandle.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveOutputTableHandle.java index 949999b0d79f..b8e6e870c4fe 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveOutputTableHandle.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveOutputTableHandle.java @@ -48,6 +48,7 @@ public HiveOutputTableHandle( @JsonProperty("bucketProperty") Optional bucketProperty, @JsonProperty("tableOwner") String tableOwner, @JsonProperty("additionalTableParameters") Map additionalTableParameters, + @JsonProperty("transactional") boolean transactional, @JsonProperty("external") boolean external) { super( @@ -58,7 +59,8 @@ public HiveOutputTableHandle( locationHandle, bucketProperty, tableStorageFormat, - partitionStorageFormat); + partitionStorageFormat, + transactional); this.partitionedBy = ImmutableList.copyOf(requireNonNull(partitionedBy, "partitionedBy is null")); this.tableOwner = requireNonNull(tableOwner, "tableOwner is null"); diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSinkProvider.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSinkProvider.java index 706b9ea6c1df..52b44fd985b7 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSinkProvider.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HivePageSinkProvider.java @@ -135,6 +135,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean handle.getSchemaName(), handle.getTableName(), isCreateTable, + handle.isTransactional(), handle.getInputColumns(), handle.getTableStorageFormat(), handle.getPartitionStorageFormat(), diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTableProperties.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTableProperties.java index a6251cebe837..a7176a1d98be 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTableProperties.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveTableProperties.java @@ -36,6 +36,7 @@ import static io.prestosql.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1; import static io.prestosql.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V2; import static io.prestosql.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; +import static io.prestosql.spi.session.PropertyMetadata.booleanProperty; import static io.prestosql.spi.session.PropertyMetadata.doubleProperty; import static io.prestosql.spi.session.PropertyMetadata.enumProperty; import static io.prestosql.spi.session.PropertyMetadata.integerProperty; @@ -69,6 +70,7 @@ public class HiveTableProperties public static final String CSV_SEPARATOR = "csv_separator"; public static final String CSV_QUOTE = "csv_quote"; public static final String CSV_ESCAPE = "csv_escape"; + public static final String TRANSACTIONAL = "transactional"; private final List> tableProperties; @@ -153,7 +155,8 @@ public HiveTableProperties( stringProperty(NULL_FORMAT_PROPERTY, "Serialization format for NULL value", null, false), stringProperty(CSV_SEPARATOR, "CSV separator character", null, false), stringProperty(CSV_QUOTE, "CSV quote character", null, false), - stringProperty(CSV_ESCAPE, "CSV escape character", null, false)); + stringProperty(CSV_ESCAPE, "CSV escape character", null, false), + booleanProperty(TRANSACTIONAL, "Table is transactional", null, false)); } public List> getTableProperties() @@ -291,4 +294,9 @@ private static String sortingColumnToString(SortingColumn column) { return column.getColumnName() + ((column.getOrder() == DESCENDING) ? " DESC" : ""); } + + public static Optional isTransactional(Map tableProperties) + { + return Optional.ofNullable((Boolean) tableProperties.get(TRANSACTIONAL)); + } } diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWritableTableHandle.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWritableTableHandle.java index 4443495c3a75..67e8f491a33a 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWritableTableHandle.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWritableTableHandle.java @@ -34,6 +34,7 @@ public class HiveWritableTableHandle private final Optional bucketProperty; private final HiveStorageFormat tableStorageFormat; private final HiveStorageFormat partitionStorageFormat; + private final boolean transactional; public HiveWritableTableHandle( String schemaName, @@ -43,7 +44,8 @@ public HiveWritableTableHandle( LocationHandle locationHandle, Optional bucketProperty, HiveStorageFormat tableStorageFormat, - HiveStorageFormat partitionStorageFormat) + HiveStorageFormat partitionStorageFormat, + boolean transactional) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); @@ -53,6 +55,7 @@ public HiveWritableTableHandle( this.bucketProperty = requireNonNull(bucketProperty, "bucketProperty is null"); this.tableStorageFormat = requireNonNull(tableStorageFormat, "tableStorageFormat is null"); this.partitionStorageFormat = requireNonNull(partitionStorageFormat, "partitionStorageFormat is null"); + this.transactional = transactional; } @JsonProperty @@ -109,6 +112,12 @@ public HiveStorageFormat getPartitionStorageFormat() return partitionStorageFormat; } + @JsonProperty + public boolean isTransactional() + { + return transactional; + } + @Override public String toString() { diff --git a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java index 2ab1ba57b974..58885a3d9d29 100644 --- a/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java +++ b/presto-hive/src/main/java/io/prestosql/plugin/hive/HiveWriterFactory.java @@ -64,6 +64,7 @@ import java.util.OptionalInt; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.function.Consumer; import static com.google.common.base.Preconditions.checkArgument; @@ -117,6 +118,7 @@ public class HiveWriterFactory private final LocationHandle locationHandle; private final LocationService locationService; private final String queryId; + private final boolean isCreateTransactionalTable; private final HivePageSinkMetadataProvider pageSinkMetadataProvider; private final TypeManager typeManager; @@ -145,6 +147,7 @@ public HiveWriterFactory( String schemaName, String tableName, boolean isCreateTable, + boolean isTransactional, List inputColumns, HiveStorageFormat tableStorageFormat, HiveStorageFormat partitionStorageFormat, @@ -210,6 +213,7 @@ public HiveWriterFactory( this.partitionColumnNames = partitionColumnNames.build(); this.partitionColumnTypes = partitionColumnTypes.build(); this.dataColumns = dataColumns.build(); + this.isCreateTransactionalTable = isCreateTable && isTransactional; Path writePath; if (isCreateTable) { @@ -268,13 +272,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt checkArgument(bucketNumber.isEmpty(), "Bucket number provided by for table that is not bucketed"); } - String fileName; - if (bucketNumber.isPresent()) { - fileName = computeBucketedFileName(queryId, bucketNumber.getAsInt()); - } - else { - fileName = queryId + "_" + randomUUID(); - } + String fileName = computeFileName(bucketNumber); List partitionValues = createPartitionValues(partitionColumnTypes, partitionColumns, position); @@ -595,10 +593,40 @@ private void validateSchema(Optional partitionName, Properties schema) } } - public static String computeBucketedFileName(String queryId, int bucket) + private String computeFileName(OptionalInt bucketNumber) + { + // Currently CTAS for transactional tables in Presto creates non-transactional ("original") files. + // Hive requires "original" files of transactional tables to conform to the following naming pattern: + // + // For bucketed tables we drop query id from file names and just leave _0 + // For non bucketed tables we use 000000_ + + if (bucketNumber.isPresent()) { + if (isCreateTransactionalTable) { + return computeBucketedFileName(Optional.empty(), bucketNumber.getAsInt()); + } + return computeBucketedFileName(Optional.of(queryId), bucketNumber.getAsInt()); + } + + if (isCreateTransactionalTable) { + String paddedBucket = Strings.padStart("0", BUCKET_NUMBER_PADDING, '0'); + UUID uuid = randomUUID(); + return format("0%s_%s%s", + paddedBucket, + Long.toUnsignedString(uuid.getLeastSignificantBits()), + Long.toUnsignedString(uuid.getMostSignificantBits())); + } + + return queryId + "_" + randomUUID(); + } + + public static String computeBucketedFileName(Optional queryId, int bucket) { String paddedBucket = Strings.padStart(Integer.toString(bucket), BUCKET_NUMBER_PADDING, '0'); - return format("0%s_0_%s", paddedBucket, queryId); + if (queryId.isPresent()) { + return format("0%s_0_%s", paddedBucket, queryId.get()); + } + return format("0%s_0", paddedBucket); } public static String getFileExtension(JobConf conf, StorageFormat storageFormat) diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java index 2cf5aba0dd48..a2a8b9772f68 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveIntegrationSmokeTest.java @@ -3259,7 +3259,8 @@ public void testShowCreateTable() " orc_bloom_filter_columns = ARRAY['c1','c2'],\n" + " orc_bloom_filter_fpp = 7E-1,\n" + " partitioned_by = ARRAY['c5'],\n" + - " sorted_by = ARRAY['c1','c 2 DESC']\n" + + " sorted_by = ARRAY['c1','c 2 DESC'],\n" + + " transactional = true\n" + ")", getSession().getCatalog().get(), getSession().getSchema().get(), diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHivePageSink.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHivePageSink.java index a8c41906ebe6..f4f2e716e8d5 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHivePageSink.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHivePageSink.java @@ -267,6 +267,7 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio Optional.empty(), "test", ImmutableMap.of(), + false, false); JsonCodec partitionUpdateCodec = JsonCodec.jsonCodec(PartitionUpdate.class); HivePageSinkProvider provider = new HivePageSinkProvider( diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveWriterFactory.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveWriterFactory.java index 65b230c76e91..0166a757f265 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveWriterFactory.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveWriterFactory.java @@ -15,6 +15,8 @@ import org.testng.annotations.Test; +import java.util.Optional; + import static io.prestosql.plugin.hive.HiveWriterFactory.computeBucketedFileName; import static org.apache.hadoop.hive.ql.exec.Utilities.getBucketIdFromFile; import static org.testng.Assert.assertEquals; @@ -24,8 +26,12 @@ public class TestHiveWriterFactory @Test public void testComputeBucketedFileName() { - String name = computeBucketedFileName("20180102_030405_00641_x1y2z", 1234); + String name = computeBucketedFileName(Optional.of("20180102_030405_00641_x1y2z"), 1234); assertEquals(name, "001234_0_20180102_030405_00641_x1y2z"); assertEquals(getBucketIdFromFile(name), 1234); + + name = computeBucketedFileName(Optional.empty(), 1234); + assertEquals(name, "001234_0"); + assertEquals(getBucketIdFromFile(name), 1234); } } diff --git a/presto-product-tests/src/main/java/io/prestosql/tests/hive/BucketingType.java b/presto-product-tests/src/main/java/io/prestosql/tests/hive/BucketingType.java index d8c9ff5ab498..125c7d800de8 100644 --- a/presto-product-tests/src/main/java/io/prestosql/tests/hive/BucketingType.java +++ b/presto-product-tests/src/main/java/io/prestosql/tests/hive/BucketingType.java @@ -34,6 +34,12 @@ public List getHiveTableProperties() { return ImmutableList.of(); } + + @Override + public List getPrestoTableProperties(String columnName, int buckets) + { + return ImmutableList.of(); + } }, BUCKETED_DEFAULT { @@ -48,6 +54,14 @@ public List getHiveTableProperties() { return ImmutableList.of(); } + + @Override + public List getPrestoTableProperties(String columnName, int buckets) + { + return ImmutableList.of( + "bucketed_by = ARRAY['" + columnName + "']", + "bucket_count = " + buckets); + } }, BUCKETED_V1 { @@ -62,6 +76,15 @@ public List getHiveTableProperties() { return ImmutableList.of("'bucketing_version'='1'"); } + + @Override + public List getPrestoTableProperties(String columnName, int buckets) + { + return ImmutableList.of( + "bucketing_version = 1", + "bucketed_by = ARRAY['" + columnName + "']", + "bucket_count = " + buckets); + } }, BUCKETED_V2 { @@ -76,6 +99,15 @@ public List getHiveTableProperties() { return ImmutableList.of("'bucketing_version'='2'"); } + + @Override + public List getPrestoTableProperties(String columnName, int buckets) + { + return ImmutableList.of( + "bucketing_version = 2", + "bucketed_by = ARRAY['" + columnName + "']", + "bucket_count = " + buckets); + } }, /**/; @@ -83,6 +115,8 @@ public List getHiveTableProperties() public abstract List getHiveTableProperties(); + public abstract List getPrestoTableProperties(String columnName, int buckets); + private static String defaultHiveClustering(String columnName, int buckets) { requireNonNull(columnName, "columnName is null"); diff --git a/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java b/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java index afa632b5891e..e71916cd5f65 100644 --- a/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java +++ b/presto-product-tests/src/main/java/io/prestosql/tests/hive/TestHiveTransactionalTable.java @@ -296,6 +296,58 @@ public Object[][] partitioningAndBucketingTypeDataProvider() }; } + @Test(groups = HIVE_TRANSACTIONAL, dataProvider = "testCreateAcidTableDataProvider") + public void testCtasAcidTable(boolean isPartitioned, BucketingType bucketingType) + { + if (getHiveVersionMajor() < 3) { + throw new SkipException("Hive transactional tables are supported with Hive version 3 or above"); + } + + try (TemporaryHiveTable table = TemporaryHiveTable.temporaryHiveTable(format("ctas_transactional_%s", randomTableSuffix()))) { + String tableName = table.getName(); + query("CREATE TABLE " + tableName + " " + + prestoTableProperties(ACID, isPartitioned, bucketingType) + + " AS SELECT * FROM (VALUES (21, 1, 1), (22, 1, 2), (23, 2, 2)) t(col, fcol, partcol)"); + + // can we query from Presto + assertThat(query("SELECT col, fcol FROM " + tableName + " WHERE partcol = 2 ORDER BY col")) + .containsOnly(row(22, 1), row(23, 2)); + + // can we query from Hive + assertThat(onHive().executeQuery("SELECT col, fcol FROM " + tableName + " WHERE partcol = 2 ORDER BY col")) + .containsOnly(row(22, 1), row(23, 2)); + } + } + + @Test(groups = HIVE_TRANSACTIONAL, dataProvider = "testCreateAcidTableDataProvider") + public void testCreateAcidTable(boolean isPartitioned, BucketingType bucketingType) + { + if (getHiveVersionMajor() < 3) { + throw new SkipException("Hive transactional tables are supported with Hive version 3 or above"); + } + + try (TemporaryHiveTable table = TemporaryHiveTable.temporaryHiveTable(format("create_transactional_%s", randomTableSuffix()))) { + String tableName = table.getName(); + query("CREATE TABLE " + tableName + " (col INTEGER, fcol INTEGER, partcol INTEGER)" + + prestoTableProperties(ACID, isPartitioned, bucketingType)); + + assertThat(() -> query("INSERT INTO " + tableName + " VALUES (1,2,3)")).failsWithMessageMatching(".*Writes to Hive transactional tables are not supported.*"); + } + } + + @DataProvider + public Object[][] testCreateAcidTableDataProvider() + { + return new Object[][] { + {false, BucketingType.NONE}, + {false, BucketingType.BUCKETED_DEFAULT}, + {false, BucketingType.BUCKETED_V1}, + {false, BucketingType.BUCKETED_V2}, + {true, BucketingType.NONE}, + {true, BucketingType.BUCKETED_DEFAULT}, + }; + } + private static String hiveTableProperties(TransactionalTableType transactionalTableType, BucketingType bucketingType) { ImmutableList.Builder tableProperties = ImmutableList.builder(); @@ -305,6 +357,17 @@ private static String hiveTableProperties(TransactionalTableType transactionalTa return tableProperties.build().stream().collect(joining(",", "TBLPROPERTIES (", ")")); } + private static String prestoTableProperties(TransactionalTableType transactionalTableType, boolean isPartitioned, BucketingType bucketingType) + { + ImmutableList.Builder tableProperties = ImmutableList.builder(); + tableProperties.addAll(transactionalTableType.getPrestoTableProperties()); + tableProperties.addAll(bucketingType.getPrestoTableProperties("fcol", 4)); + if (isPartitioned) { + tableProperties.add("partitioned_by = ARRAY['partcol']"); + } + return tableProperties.build().stream().collect(joining(",", "WITH (", ")")); + } + private static void compactTableAndWait(CompactionMode compactMode, String tableName, String partitionString, Duration timeout) { log.info("Running %s compaction on %s", compactMode, tableName); diff --git a/presto-product-tests/src/main/java/io/prestosql/tests/hive/TransactionalTableType.java b/presto-product-tests/src/main/java/io/prestosql/tests/hive/TransactionalTableType.java index ba2d8035761d..442d6e9a1318 100644 --- a/presto-product-tests/src/main/java/io/prestosql/tests/hive/TransactionalTableType.java +++ b/presto-product-tests/src/main/java/io/prestosql/tests/hive/TransactionalTableType.java @@ -25,6 +25,12 @@ List getHiveTableProperties() { return ImmutableList.of("'transactional'='true'"); } + + @Override + List getPrestoTableProperties() + { + return ImmutableList.of("transactional = true"); + } }, INSERT_ONLY { @Override @@ -32,8 +38,16 @@ List getHiveTableProperties() { return ImmutableList.of("'transactional'='true'", "'transactional_properties'='insert_only'"); } + + @Override + List getPrestoTableProperties() + { + throw new RuntimeException("insert_only tables not supported"); + } }, /**/; abstract List getHiveTableProperties(); + + abstract List getPrestoTableProperties(); }