Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CI only] Support CTAS for Hive transactional tables #4574

Merged
merged 3 commits into from
Jul 25, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ public HiveInsertTableHandle(
locationHandle,
bucketProperty,
tableStorageFormat,
partitionStorageFormat);
partitionStorageFormat,
false);
}
}
Original file line number Diff line number Diff line change
@@ -198,6 +198,7 @@
import static io.prestosql.plugin.hive.HiveTableProperties.getOrcBloomFilterFpp;
import static io.prestosql.plugin.hive.HiveTableProperties.getPartitionedBy;
import static io.prestosql.plugin.hive.HiveTableProperties.getSingleCharacterProperty;
import static io.prestosql.plugin.hive.HiveTableProperties.isTransactional;
import static io.prestosql.plugin.hive.HiveType.HIVE_STRING;
import static io.prestosql.plugin.hive.HiveType.toHiveType;
import static io.prestosql.plugin.hive.HiveWriterFactory.computeBucketedFileName;
@@ -245,6 +246,7 @@
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.spi.type.TypeUtils.isFloatingPointNaN;
import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
import static java.lang.Boolean.parseBoolean;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
@@ -588,6 +590,12 @@ private ConnectorTableMetadata doGetTableMetadata(ConnectorSession session, Sche
properties.put(SORTED_BY_PROPERTY, property.getSortedBy());
});

// Transactional properties
String transactionalProperty = table.getParameters().get(HiveMetadata.TRANSACTIONAL);
if (parseBoolean(transactionalProperty)) {
properties.put(HiveTableProperties.TRANSACTIONAL, true);
}

// ORC format specific properties
String orcBloomFilterColumns = table.getParameters().get(ORC_BLOOM_FILTER_COLUMNS_KEY);
if (orcBloomFilterColumns != null) {
@@ -883,7 +891,9 @@ private Map<String, String> getEmptyTableProperties(ConnectorTableMetadata table
// When metastore is configured with metastore.create.as.acid=true, it will also change Presto-created tables
// behind the scenes. In particular, this won't work with CTAS.
// TODO (https://github.com/prestosql/presto/issues/1956) convert this into normal table property
tableProperties.put(TRANSACTIONAL, "false");

boolean transactional = HiveTableProperties.isTransactional(tableMetadata.getProperties()).orElse(false);
tableProperties.put(TRANSACTIONAL, String.valueOf(transactional));

bucketProperty.ifPresent(hiveBucketProperty ->
tableProperties.put(BUCKETING_VERSION, Integer.toString(hiveBucketProperty.getBucketingVersion().getVersion())));
@@ -1274,6 +1284,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
HiveStorageFormat tableStorageFormat = getHiveStorageFormat(tableMetadata.getProperties());
List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
boolean transactional = isTransactional(tableMetadata.getProperties()).orElse(false);

// get the root directory for the database
SchemaTableName schemaTableName = tableMetadata.getTable();
@@ -1309,6 +1320,7 @@ public HiveOutputTableHandle beginCreateTable(ConnectorSession session, Connecto
bucketProperty,
session.getUser(),
tableProperties,
transactional,
externalLocation.isPresent());

WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
@@ -1346,7 +1358,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
partitionUpdates = PartitionUpdate.mergePartitionUpdates(partitionUpdates);

if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) {
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, partitionUpdates);
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, true, partitionUpdates);
// replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback
partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets));
for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) {
@@ -1404,6 +1416,7 @@ private List<PartitionUpdate> computePartitionUpdatesForMissingBuckets(
ConnectorSession session,
HiveWritableTableHandle handle,
Table table,
boolean isCreateTable,
List<PartitionUpdate> partitionUpdates)
{
ImmutableList.Builder<PartitionUpdate> partitionUpdatesForMissingBucketsBuilder = ImmutableList.builder();
@@ -1417,6 +1430,7 @@ private List<PartitionUpdate> computePartitionUpdatesForMissingBuckets(
storageFormat,
partitionUpdate.getTargetPath(),
bucketCount,
isCreateTable && handle.isTransactional(),
partitionUpdate);
partitionUpdatesForMissingBucketsBuilder.add(new PartitionUpdate(
partitionUpdate.getName(),
@@ -1437,6 +1451,7 @@ private List<String> computeFileNamesForMissingBuckets(
HiveStorageFormat storageFormat,
Path targetPath,
int bucketCount,
boolean transactionalCreateTable,
PartitionUpdate partitionUpdate)
{
if (partitionUpdate.getFileNames().size() == bucketCount) {
@@ -1450,7 +1465,13 @@ private List<String> computeFileNamesForMissingBuckets(
Set<String> fileNames = ImmutableSet.copyOf(partitionUpdate.getFileNames());
ImmutableList.Builder<String> missingFileNamesBuilder = ImmutableList.builder();
for (int i = 0; i < bucketCount; i++) {
String fileName = computeBucketedFileName(session.getQueryId(), i) + fileExtension;
String fileName;
if (transactionalCreateTable) {
fileName = computeBucketedFileName(Optional.empty(), i) + fileExtension;
}
else {
fileName = computeBucketedFileName(Optional.of(session.getQueryId()), i) + fileExtension;
}
if (!fileNames.contains(fileName)) {
missingFileNamesBuilder.add(fileName);
}
@@ -1562,7 +1583,7 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
}

if (handle.getBucketProperty().isPresent() && isCreateEmptyBucketFiles(session)) {
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, partitionUpdates);
List<PartitionUpdate> partitionUpdatesForMissingBuckets = computePartitionUpdatesForMissingBuckets(session, handle, table, false, partitionUpdates);
// replace partitionUpdates before creating the empty files so that those files will be cleaned up if we end up rollback
partitionUpdates = PartitionUpdate.mergePartitionUpdates(concat(partitionUpdates, partitionUpdatesForMissingBuckets));
for (PartitionUpdate partitionUpdate : partitionUpdatesForMissingBuckets) {
@@ -2344,6 +2365,10 @@ public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Connector
if (!isCollectColumnStatisticsOnWrite(session)) {
return TableStatisticsMetadata.empty();
}
if (isTransactional(tableMetadata.getProperties()).orElse(false)) {
// TODO(https://github.com/prestosql/presto/issues/1956) updating table statistics for trasactional not supported right now.
return TableStatisticsMetadata.empty();
}
List<String> partitionedBy = firstNonNull(getPartitionedBy(tableMetadata.getProperties()), ImmutableList.of());
return getStatisticsCollectionMetadata(tableMetadata.getColumns(), partitionedBy, Optional.empty(), false);
}
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ public HiveOutputTableHandle(
@JsonProperty("bucketProperty") Optional<HiveBucketProperty> bucketProperty,
@JsonProperty("tableOwner") String tableOwner,
@JsonProperty("additionalTableParameters") Map<String, String> additionalTableParameters,
@JsonProperty("transactional") boolean transactional,
@JsonProperty("external") boolean external)
{
super(
@@ -58,7 +59,8 @@ public HiveOutputTableHandle(
locationHandle,
bucketProperty,
tableStorageFormat,
partitionStorageFormat);
partitionStorageFormat,
transactional);

this.partitionedBy = ImmutableList.copyOf(requireNonNull(partitionedBy, "partitionedBy is null"));
this.tableOwner = requireNonNull(tableOwner, "tableOwner is null");
Original file line number Diff line number Diff line change
@@ -135,6 +135,7 @@ private ConnectorPageSink createPageSink(HiveWritableTableHandle handle, boolean
handle.getSchemaName(),
handle.getTableName(),
isCreateTable,
handle.isTransactional(),
handle.getInputColumns(),
handle.getTableStorageFormat(),
handle.getPartitionStorageFormat(),
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
import static io.prestosql.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1;
import static io.prestosql.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V2;
import static io.prestosql.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.prestosql.spi.session.PropertyMetadata.booleanProperty;
import static io.prestosql.spi.session.PropertyMetadata.doubleProperty;
import static io.prestosql.spi.session.PropertyMetadata.enumProperty;
import static io.prestosql.spi.session.PropertyMetadata.integerProperty;
@@ -69,6 +70,7 @@ public class HiveTableProperties
public static final String CSV_SEPARATOR = "csv_separator";
public static final String CSV_QUOTE = "csv_quote";
public static final String CSV_ESCAPE = "csv_escape";
public static final String TRANSACTIONAL = "transactional";

private final List<PropertyMetadata<?>> tableProperties;

@@ -153,7 +155,8 @@ public HiveTableProperties(
stringProperty(NULL_FORMAT_PROPERTY, "Serialization format for NULL value", null, false),
stringProperty(CSV_SEPARATOR, "CSV separator character", null, false),
stringProperty(CSV_QUOTE, "CSV quote character", null, false),
stringProperty(CSV_ESCAPE, "CSV escape character", null, false));
stringProperty(CSV_ESCAPE, "CSV escape character", null, false),
booleanProperty(TRANSACTIONAL, "Table is transactional", null, false));
}

public List<PropertyMetadata<?>> getTableProperties()
@@ -291,4 +294,9 @@ private static String sortingColumnToString(SortingColumn column)
{
return column.getColumnName() + ((column.getOrder() == DESCENDING) ? " DESC" : "");
}

public static Optional<Boolean> isTransactional(Map<String, Object> tableProperties)
{
return Optional.ofNullable((Boolean) tableProperties.get(TRANSACTIONAL));
}
}
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ public class HiveWritableTableHandle
private final Optional<HiveBucketProperty> bucketProperty;
private final HiveStorageFormat tableStorageFormat;
private final HiveStorageFormat partitionStorageFormat;
private final boolean transactional;

public HiveWritableTableHandle(
String schemaName,
@@ -43,7 +44,8 @@ public HiveWritableTableHandle(
LocationHandle locationHandle,
Optional<HiveBucketProperty> bucketProperty,
HiveStorageFormat tableStorageFormat,
HiveStorageFormat partitionStorageFormat)
HiveStorageFormat partitionStorageFormat,
boolean transactional)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
@@ -53,6 +55,7 @@ public HiveWritableTableHandle(
this.bucketProperty = requireNonNull(bucketProperty, "bucketProperty is null");
this.tableStorageFormat = requireNonNull(tableStorageFormat, "tableStorageFormat is null");
this.partitionStorageFormat = requireNonNull(partitionStorageFormat, "partitionStorageFormat is null");
this.transactional = transactional;
}

@JsonProperty
@@ -109,6 +112,12 @@ public HiveStorageFormat getPartitionStorageFormat()
return partitionStorageFormat;
}

@JsonProperty
public boolean isTransactional()
{
return transactional;
}

@Override
public String toString()
{
Original file line number Diff line number Diff line change
@@ -64,6 +64,7 @@
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkArgument;
@@ -117,6 +118,7 @@ public class HiveWriterFactory
private final LocationHandle locationHandle;
private final LocationService locationService;
private final String queryId;
private final boolean isCreateTransactionalTable;

private final HivePageSinkMetadataProvider pageSinkMetadataProvider;
private final TypeManager typeManager;
@@ -145,6 +147,7 @@ public HiveWriterFactory(
String schemaName,
String tableName,
boolean isCreateTable,
boolean isTransactional,
List<HiveColumnHandle> inputColumns,
HiveStorageFormat tableStorageFormat,
HiveStorageFormat partitionStorageFormat,
@@ -210,6 +213,7 @@ public HiveWriterFactory(
this.partitionColumnNames = partitionColumnNames.build();
this.partitionColumnTypes = partitionColumnTypes.build();
this.dataColumns = dataColumns.build();
this.isCreateTransactionalTable = isCreateTable && isTransactional;

Path writePath;
if (isCreateTable) {
@@ -268,13 +272,7 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
checkArgument(bucketNumber.isEmpty(), "Bucket number provided by for table that is not bucketed");
}

String fileName;
if (bucketNumber.isPresent()) {
fileName = computeBucketedFileName(queryId, bucketNumber.getAsInt());
}
else {
fileName = queryId + "_" + randomUUID();
}
String fileName = computeFileName(bucketNumber);

List<String> partitionValues = createPartitionValues(partitionColumnTypes, partitionColumns, position);

@@ -595,10 +593,40 @@ private void validateSchema(Optional<String> partitionName, Properties schema)
}
}

public static String computeBucketedFileName(String queryId, int bucket)
private String computeFileName(OptionalInt bucketNumber)
{
// Currently CTAS for transactional tables in Presto creates non-transactional ("original") files.
// Hive requires "original" files of transactional tables to conform to the following naming pattern:
//
// For bucketed tables we drop query id from file names and just leave <bucketId>_0
// For non bucketed tables we use 000000_<uuid_as_number>

if (bucketNumber.isPresent()) {
if (isCreateTransactionalTable) {
return computeBucketedFileName(Optional.empty(), bucketNumber.getAsInt());
}
return computeBucketedFileName(Optional.of(queryId), bucketNumber.getAsInt());
}

if (isCreateTransactionalTable) {
String paddedBucket = Strings.padStart("0", BUCKET_NUMBER_PADDING, '0');
UUID uuid = randomUUID();
return format("0%s_%s%s",
paddedBucket,
Long.toUnsignedString(uuid.getLeastSignificantBits()),
Long.toUnsignedString(uuid.getMostSignificantBits()));
}

return queryId + "_" + randomUUID();
}

public static String computeBucketedFileName(Optional<String> queryId, int bucket)
{
String paddedBucket = Strings.padStart(Integer.toString(bucket), BUCKET_NUMBER_PADDING, '0');
return format("0%s_0_%s", paddedBucket, queryId);
if (queryId.isPresent()) {
return format("0%s_0_%s", paddedBucket, queryId.get());
}
return format("0%s_0", paddedBucket);
}

public static String getFileExtension(JobConf conf, StorageFormat storageFormat)
Original file line number Diff line number Diff line change
@@ -3259,7 +3259,8 @@ public void testShowCreateTable()
" orc_bloom_filter_columns = ARRAY['c1','c2'],\n" +
" orc_bloom_filter_fpp = 7E-1,\n" +
" partitioned_by = ARRAY['c5'],\n" +
" sorted_by = ARRAY['c1','c 2 DESC']\n" +
" sorted_by = ARRAY['c1','c 2 DESC'],\n" +
" transactional = true\n" +
")",
getSession().getCatalog().get(),
getSession().getSchema().get(),
Original file line number Diff line number Diff line change
@@ -267,6 +267,7 @@ private static ConnectorPageSink createPageSink(HiveTransactionHandle transactio
Optional.empty(),
"test",
ImmutableMap.of(),
false,
false);
JsonCodec<PartitionUpdate> partitionUpdateCodec = JsonCodec.jsonCodec(PartitionUpdate.class);
HivePageSinkProvider provider = new HivePageSinkProvider(
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@

import org.testng.annotations.Test;

import java.util.Optional;

import static io.prestosql.plugin.hive.HiveWriterFactory.computeBucketedFileName;
import static org.apache.hadoop.hive.ql.exec.Utilities.getBucketIdFromFile;
import static org.testng.Assert.assertEquals;
@@ -24,8 +26,12 @@ public class TestHiveWriterFactory
@Test
public void testComputeBucketedFileName()
{
String name = computeBucketedFileName("20180102_030405_00641_x1y2z", 1234);
String name = computeBucketedFileName(Optional.of("20180102_030405_00641_x1y2z"), 1234);
assertEquals(name, "001234_0_20180102_030405_00641_x1y2z");
assertEquals(getBucketIdFromFile(name), 1234);

name = computeBucketedFileName(Optional.empty(), 1234);
assertEquals(name, "001234_0");
assertEquals(getBucketIdFromFile(name), 1234);
}
}
Loading