Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CTAS for Hive transactional tables #4516

Closed
wants to merge 13 commits into from
128 changes: 116 additions & 12 deletions presto-hive/src/main/java/io/prestosql/plugin/hive/AcidInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import org.apache.hadoop.fs.Path;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -33,15 +37,32 @@ public class AcidInfo
{
private final String partitionLocation;
private final List<DeleteDeltaInfo> deleteDeltas;
private final List<OriginalFileInfo> originalFiles;
private final int bucketId;

@JsonCreator
public AcidInfo(
@JsonProperty("partitionLocation") String partitionLocation,
@JsonProperty("deleteDeltas") List<DeleteDeltaInfo> deleteDeltas)
@JsonProperty("deleteDeltas") List<DeleteDeltaInfo> deleteDeltas,
@JsonProperty("originalFiles") List<OriginalFileInfo> originalFiles,
@JsonProperty("bucketId") int bucketId)
{
this.partitionLocation = requireNonNull(partitionLocation, "partitionLocation is null");
this.deleteDeltas = ImmutableList.copyOf(requireNonNull(deleteDeltas, "deleteDeltas is null"));
checkArgument(!deleteDeltas.isEmpty(), "deleteDeltas is empty");
this.originalFiles = ImmutableList.copyOf(requireNonNull(originalFiles, "originalFiles is null"));
this.bucketId = bucketId;
}

@JsonProperty
public List<OriginalFileInfo> getOriginalFiles()
{
return originalFiles;
}

@JsonProperty
public int getBucketId()
{
return bucketId;
}

@JsonProperty
Expand All @@ -62,20 +83,20 @@ public boolean equals(Object o)
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

AcidInfo that = (AcidInfo) o;
return partitionLocation.equals(that.partitionLocation) &&
deleteDeltas.equals(that.deleteDeltas);
return bucketId == that.bucketId &&
Objects.equals(partitionLocation, that.partitionLocation) &&
Objects.equals(deleteDeltas, that.deleteDeltas) &&
Objects.equals(originalFiles, that.originalFiles);
}

@Override
public int hashCode()
{
return Objects.hash(partitionLocation, deleteDeltas);
return Objects.hash(partitionLocation, deleteDeltas, originalFiles, bucketId);
}

@Override
Expand All @@ -84,6 +105,8 @@ public String toString()
return toStringHelper(this)
.add("partitionLocation", partitionLocation)
.add("deleteDeltas", deleteDeltas)
.add("originalFiles", originalFiles)
.add("bucketId", bucketId)
.toString();
}

Expand Down Expand Up @@ -156,6 +179,62 @@ public String toString()
}
}

public static class OriginalFileInfo
{
private final String name;
private final long fileSize;

@JsonCreator
public OriginalFileInfo(
@JsonProperty("name") String name,
@JsonProperty("fileSize") long fileSize)
{
this.name = requireNonNull(name, "name is null");
this.fileSize = fileSize;
}

@JsonProperty
public String getName()
{
return name;
}

@JsonProperty
public long getFileSize()
{
return fileSize;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OriginalFileInfo that = (OriginalFileInfo) o;
return fileSize == that.fileSize &&
name.equals(that.name);
}

@Override
public int hashCode()
{
return Objects.hash(name, fileSize);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("name", name)
.add("fileSize", fileSize)
.toString();
}
}

public static Builder builder(Path partitionPath)
{
return new Builder(partitionPath);
Expand All @@ -169,7 +248,8 @@ public static Builder builder(AcidInfo acidInfo)
public static class Builder
{
private final Path partitionLocation;
private final ImmutableList.Builder<DeleteDeltaInfo> deleteDeltaInfoBuilder = ImmutableList.builder();
private final List<DeleteDeltaInfo> deleteDeltaInfos = new ArrayList<>();
private final ListMultimap<Integer, OriginalFileInfo> bucketIdToOriginalFileInfoMap = ArrayListMultimap.create();

private Builder(Path partitionPath)
{
Expand All @@ -179,7 +259,7 @@ private Builder(Path partitionPath)
private Builder(AcidInfo acidInfo)
{
partitionLocation = new Path(acidInfo.getPartitionLocation());
deleteDeltaInfoBuilder.addAll(acidInfo.deleteDeltas);
deleteDeltaInfos.addAll(acidInfo.deleteDeltas);
}

public Builder addDeleteDelta(Path deleteDeltaPath, long minWriteId, long maxWriteId, int statementId)
Expand All @@ -192,17 +272,41 @@ public Builder addDeleteDelta(Path deleteDeltaPath, long minWriteId, long maxWri
deleteDeltaPath.getParent().toString(),
partitionLocation);

deleteDeltaInfoBuilder.add(new DeleteDeltaInfo(minWriteId, maxWriteId, statementId));
deleteDeltaInfos.add(new DeleteDeltaInfo(minWriteId, maxWriteId, statementId));
return this;
}

public Builder addOriginalFile(Path originalFilePath, long originalFileLength, int bucketId)
{
requireNonNull(originalFilePath, "originalFilePath is null");
Path partitionPathFromOriginalPath = originalFilePath.getParent();
// originalFilePath has scheme in the prefix (i.e. scheme://<path>), extract path from uri and compare.
checkArgument(
partitionLocation.toUri().getPath().equals(partitionPathFromOriginalPath.toUri().getPath()),
"Partition location in OriginalFile '%s' does not match stored location '%s'",
originalFilePath.getParent().toString(),
partitionLocation);
bucketIdToOriginalFileInfoMap.put(bucketId, new OriginalFileInfo(originalFilePath.getName(), originalFileLength));
return this;
}

public Optional<AcidInfo> buildWithRequiredOriginalFiles(int bucketId)
{
// 1. Fetch list of all the original files which have same bucket Id
// 2. Build AcidInfo
checkState(bucketId > -1 && bucketIdToOriginalFileInfoMap.containsKey(bucketId), "Bucket Id to OriginalFileInfo map should have " +
"entry for requested bucket Id");
List<DeleteDeltaInfo> deleteDeltas = ImmutableList.copyOf(deleteDeltaInfos);
return Optional.of(new AcidInfo(partitionLocation.toString(), deleteDeltas, bucketIdToOriginalFileInfoMap.get(bucketId), bucketId));
}

public Optional<AcidInfo> build()
{
List<DeleteDeltaInfo> deleteDeltas = deleteDeltaInfoBuilder.build();
List<DeleteDeltaInfo> deleteDeltas = ImmutableList.copyOf(deleteDeltaInfos);
if (deleteDeltas.isEmpty()) {
return Optional.empty();
}
return Optional.of(new AcidInfo(partitionLocation.toString(), deleteDeltas));
return Optional.of(new AcidInfo(partitionLocation.toString(), deleteDeltas, ImmutableList.of(), -1));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.prestosql.plugin.hive.util.HiveBucketing.BucketingVersion;
import io.prestosql.plugin.hive.util.HiveBucketing.HiveBucketFilter;
import io.prestosql.plugin.hive.util.HiveFileIterator;
import io.prestosql.plugin.hive.util.HiveFileIterator.NestedDirectoryNotAllowedException;
import io.prestosql.plugin.hive.util.InternalHiveSplitFactory;
import io.prestosql.plugin.hive.util.ResumableTask;
import io.prestosql.plugin.hive.util.ResumableTasks;
Expand All @@ -46,6 +45,7 @@
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
Expand Down Expand Up @@ -435,7 +435,8 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
}

List<Path> readPaths;
Optional<AcidInfo> acidInfo;
List<HadoopShims.HdfsFileStatusWithId> fileStatusOriginalFiles = ImmutableList.of();
AcidInfo.Builder acidInfoBuilder = AcidInfo.builder(path);
if (AcidUtils.isTransactionalTable(table.getParameters())) {
AcidUtils.Directory directory = hdfsEnvironment.doAs(hdfsContext.getIdentity().getUser(), () -> AcidUtils.getAcidState(
path,
Expand All @@ -457,10 +458,6 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)

readPaths = new ArrayList<>();

if (!directory.getOriginalFiles().isEmpty()) {
throw new PrestoException(NOT_SUPPORTED, "Original non-ACID files in transactional tables are not supported");
}

// base
if (directory.getBaseDirectory() != null) {
readPaths.add(directory.getBaseDirectory());
Expand All @@ -474,18 +471,29 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
}

// Create a registry of delete_delta directories for the partition
AcidInfo.Builder acidInfoBuilder = AcidInfo.builder(path);
for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
if (delta.isDeleteDelta()) {
acidInfoBuilder.addDeleteDelta(delta.getPath(), delta.getMinWriteId(), delta.getMaxWriteId(), delta.getStatementId());
}
}

acidInfo = acidInfoBuilder.build();
// initialize original files status list if present
fileStatusOriginalFiles = directory.getOriginalFiles();

for (HadoopShims.HdfsFileStatusWithId hdfsFileStatusWithId : fileStatusOriginalFiles) {
Path originalFilePath = hdfsFileStatusWithId.getFileStatus().getPath();
long originalFileLength = hdfsFileStatusWithId.getFileStatus().getLen();

if (originalFileLength == 0) {
continue;
}

int bucketId = getBucketNumber(originalFilePath.getName()).getAsInt();
acidInfoBuilder.addOriginalFile(originalFilePath, originalFileLength, bucketId);
}
}
else {
readPaths = ImmutableList.of(path);
acidInfo = Optional.empty();
}

// S3 Select pushdown works at the granularity of individual S3 objects,
Expand All @@ -496,18 +504,69 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
if (tableBucketInfo.isPresent()) {
ListenableFuture<?> lastResult = immediateFuture(null); // TODO document in addToQueue() that it is sufficient to hold on to last returned future
for (Path readPath : readPaths) {
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(readPath, fs, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfo));
int tableBucketCount = tableBucketInfo.get().getTableBucketCount();

// list all files in the partition
List<LocatedFileStatus> files = new ArrayList<>();
try {
Iterators.addAll(files, new HiveFileIterator(table, readPath, fs, directoryLister, namenodeStats, FAIL, ignoreAbsentPartitions));
}
catch (HiveFileIterator.NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
throw new PrestoException(
HIVE_INVALID_BUCKET_FILES,
format("Hive table '%s' is corrupt. Found sub-directory in bucket directory for partition: %s",
table.getSchemaTableName(),
splitFactory.getPartitionName()));
}
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(files, splitFactory, tableBucketInfo.get(), bucketConversion, splittable, acidInfoBuilder.build()));
}

for (HadoopShims.HdfsFileStatusWithId hdfsFileStatusWithId : fileStatusOriginalFiles) {
List<LocatedFileStatus> locatedFileStatuses = ImmutableList.of((LocatedFileStatus) hdfsFileStatusWithId.getFileStatus());
lastResult = hiveSplitSource.addToQueue(getBucketedSplits(locatedFileStatuses, splitFactory, tableBucketInfo.get(), bucketConversion, splittable,
acidInfoBuilder
.buildWithRequiredOriginalFiles(getBucketNumber(hdfsFileStatusWithId.getFileStatus().getPath().getName()).getAsInt())));
}

return lastResult;
}

for (Path readPath : readPaths) {
fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfo));
fileIterators.addLast(createInternalHiveSplitIterator(readPath, fs, splitFactory, splittable, acidInfoBuilder.build()));
}

if (!fileStatusOriginalFiles.isEmpty()) {
generateOriginalFilesSplits(fs, splitFactory, fileStatusOriginalFiles, acidInfoBuilder);
}

return COMPLETED_FUTURE;
}

private void generateOriginalFilesSplits(
FileSystem fs,
InternalHiveSplitFactory splitFactory,
List<HadoopShims.HdfsFileStatusWithId> originalFileLocations,
AcidInfo.Builder acidInfoBuilder)
{
fileIterators.addLast(
originalFileLocations.stream()
.map(file -> file.getFileStatus())
.map(fileStatus -> {
try {
return splitFactory.createInternalHiveSplit(
fileStatus,
hdfsEnvironment.doAs(hdfsContext.getIdentity().getUser(), () -> fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen())),
acidInfoBuilder.buildWithRequiredOriginalFiles(getBucketNumber(fileStatus.getPath().getName()).getAsInt()));
}
catch (IOException e) {
throw new PrestoException(HIVE_BAD_DATA, e);
}
})
.map(Optional::orElseThrow)
.iterator());
}

private ListenableFuture<?> addSplitsToSource(InputSplit[] targetSplits, InternalHiveSplitFactory splitFactory)
throws IOException
{
Expand Down Expand Up @@ -541,27 +600,13 @@ private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, F
.iterator();
}

private List<InternalHiveSplit> getBucketedSplits(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, BucketSplitInfo bucketSplitInfo, Optional<BucketConversion> bucketConversion, boolean splittable, Optional<AcidInfo> acidInfo)
private List<InternalHiveSplit> getBucketedSplits(List<LocatedFileStatus> files, InternalHiveSplitFactory splitFactory, BucketSplitInfo bucketSplitInfo, Optional<BucketConversion> bucketConversion, boolean splittable, Optional<AcidInfo> acidInfo)
{
int readBucketCount = bucketSplitInfo.getReadBucketCount();
int tableBucketCount = bucketSplitInfo.getTableBucketCount();
int partitionBucketCount = bucketConversion.map(BucketConversion::getPartitionBucketCount).orElse(tableBucketCount);
int bucketCount = max(readBucketCount, partitionBucketCount);

// list all files in the partition
List<LocatedFileStatus> files = new ArrayList<>(partitionBucketCount);
try {
Iterators.addAll(files, new HiveFileIterator(table, path, fileSystem, directoryLister, namenodeStats, FAIL, ignoreAbsentPartitions));
}
catch (NestedDirectoryNotAllowedException e) {
// Fail here to be on the safe side. This seems to be the same as what Hive does
throw new PrestoException(
HIVE_INVALID_BUCKET_FILES,
format("Hive table '%s' is corrupt. Found sub-directory in bucket directory for partition: %s",
table.getSchemaTableName(),
splitFactory.getPartitionName()));
}

// build mapping of file name to bucket
ListMultimap<Integer, LocatedFileStatus> bucketFiles = ArrayListMultimap.create();
for (LocatedFileStatus file : files) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public HiveInsertTableHandle(
locationHandle,
bucketProperty,
tableStorageFormat,
partitionStorageFormat);
partitionStorageFormat,
false);
}
}
Loading