Skip to content

Commit

Permalink
Hive: Don't delete files when commit state is unknown (apache#2328)
Browse files Browse the repository at this point in the history
  • Loading branch information
RussellSpitzer authored and aokolnychyi committed Apr 24, 2021
1 parent cb2ec36 commit 08d4945
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.exceptions;

/**
* Exception for a failure to confirm either affirmatively or negatively that a commit was applied. The client
* cannot take any further action without possibly corrupting the table.
*/
public class CommitStateUnknownException extends RuntimeException {

private static final String COMMON_INFO =
"Cannot determine whether the commit was successful or not, the underlying data files may or " +
"may not be needed. Manual intervention via the Remove Orphan Files Action can remove these " +
"files when a connection to the Catalog can be re-established if the commit was actually unsuccessful.\n" +
"Please check to see whether or not your commit was successful before retrying this commit. Retrying " +
"an already successful operation will result in duplicate records or unintentional modifications.\n" +
"At this time no files will be deleted including possibly unused manifest lists.";

public CommitStateUnknownException(Throwable cause) {
super(cause.getMessage() + "\n" + COMMON_INFO, cause);
}
}
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.function.Consumer;
import org.apache.iceberg.events.Listeners;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -299,6 +300,8 @@ public void commit() {
taskOps.commit(base, updated.withUUID());
});

} catch (CommitStateUnknownException commitStateUnknownException) {
throw commitStateUnknownException;
} catch (RuntimeException e) {
Exceptions.suppressAndThrow(e, this::cleanAll);
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public interface TableOperations {
* Implementations must check that the base metadata is current to avoid overwriting updates.
* Once the atomic commit operation succeeds, implementations must not perform any operations that
* may fail because failure in this method cannot be distinguished from commit failure.
* <p>
* Implementations must throw a {@link org.apache.iceberg.exceptions.CommitStateUnknownException}
* in cases where it cannot be determined if the commit succeeded or failed.
* For example if a network partition causes the confirmation of the commit to be lost,
* the implementation should throw a CommitStateUnknownException. This is important because downstream users of
* this API need to know whether they can clean up the commit or not, if the state is unknown then it is not safe
* to remove any files. All other exceptions will be treated as if the commit has failed.
*
* @param base table metadata on which changes were based
* @param metadata new table metadata with updates
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ private TableProperties() {
public static final String COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms";
public static final int COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 1800000; // 30 minutes

public static final String COMMIT_NUM_STATUS_CHECKS = "commit.num-status-checks";
public static final int COMMIT_NUM_STATUS_CHECKS_DEFAULT = 3;

public static final String MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes";
public static final long MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 4 * 1024 * 1024; // 4 MB

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -55,25 +54,33 @@
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT;

/**
* TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to
* avoid code duplication between this class and Metacat Tables.
*/
public class HiveTableOperations extends BaseMetastoreTableOperations {
private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class);

private static final int COMMIT_STATUS_CHECK_WAIT_MS = 1000;

private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms";
private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms";
private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms";
Expand All @@ -93,6 +100,12 @@ private static class WaitingForLockException extends RuntimeException {
}
}

private enum CommitStatus {
FAILURE,
SUCCESS,
UNKNOWN
}

private final HiveClientPool metaClients;
private final String fullName;
private final String database;
Expand Down Expand Up @@ -158,12 +171,11 @@ protected void doRefresh() {
@SuppressWarnings("checkstyle:CyclomaticComplexity")
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
String newMetadataLocation = metadata.metadataFileLocation() == null ?
writeNewMetadata(metadata, currentVersion() + 1) :
metadata.metadataFileLocation();
String newMetadataLocation = base == null && metadata.metadataFileLocation() != null ?
metadata.metadataFileLocation() : writeNewMetadata(metadata, currentVersion() + 1);
boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);

boolean threw = true;
CommitStatus commitStatus = CommitStatus.FAILURE;
boolean updateHiveTable = false;
Optional<Long> lockId = Optional.empty();
try {
Expand Down Expand Up @@ -208,8 +220,23 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
.orElseGet(ImmutableMap::of);
setHmsTableParameters(newMetadataLocation, tbl, metadata.properties(), removedProps, hiveEngineEnabled, summary);

persistTable(tbl, updateHiveTable);
threw = false;
try {
persistTable(tbl, updateHiveTable);
commitStatus = CommitStatus.SUCCESS;
} catch (Throwable persistFailure) {
LOG.error("Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
database, tableName, persistFailure);
commitStatus = checkCommitStatus(newMetadataLocation, metadata);
switch (commitStatus) {
case SUCCESS:
break;
case FAILURE:
throw persistFailure;
case UNKNOWN:
throw new CommitStateUnknownException(persistFailure);
}
}

} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) {
throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName);

Expand All @@ -227,11 +254,56 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new RuntimeException("Interrupted during commit", e);

} finally {
cleanupMetadataAndUnlock(threw, newMetadataLocation, lockId);
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId);
}
}

private void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException {
/**
* Attempt to load the table and see if any current or past metadata location matches the one we were attempting
* to set. This is used as a last resort when we are dealing with exceptions that may indicate the commit has
* failed but are not proof that this is the case. Past locations must also be searched on the chance that a second
* committer was able to successfully commit on top of our commit.
*
* @param newMetadataLocation the path of the new commit file
* @param config metadata to use for configuration
* @return Commit Status of Success, Failure or Unknown
*/
private CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
int maxAttempts = PropertyUtil.propertyAsInt(config.properties(), COMMIT_NUM_STATUS_CHECKS,
COMMIT_NUM_STATUS_CHECKS_DEFAULT);

AtomicReference<CommitStatus> status = new AtomicReference<>(CommitStatus.UNKNOWN);

Tasks.foreach(newMetadataLocation)
.retry(maxAttempts)
.suppressFailureWhenFinished()
.exponentialBackoff(COMMIT_STATUS_CHECK_WAIT_MS, COMMIT_STATUS_CHECK_WAIT_MS, Long.MAX_VALUE, 2.0)
.onFailure((location, checkException) ->
LOG.error("Cannot check if commit to {}.{} exists.", database, tableName, checkException))
.run(location -> {
TableMetadata metadata = refresh();
String currentMetadataLocation = metadata.metadataFileLocation();
boolean commitSuccess = currentMetadataLocation.equals(newMetadataLocation) ||
metadata.previousFiles().stream().anyMatch(log -> log.file().equals(newMetadataLocation));
if (commitSuccess) {
LOG.info("Commit status check: Commit to {}.{} of {} succeeded", database, tableName, newMetadataLocation);
status.set(CommitStatus.SUCCESS);
} else {
LOG.info("Commit status check: Commit to {}.{} of {} failed", database, tableName, newMetadataLocation);
status.set(CommitStatus.FAILURE);
}
});

if (status.get() == CommitStatus.UNKNOWN) {
LOG.error("Cannot determine commit state to {}.{}. Failed during checking {} times. " +
"Treating commit state as unknown.",
database, tableName, maxAttempts);
}
return status.get();
}

@VisibleForTesting
void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException {
if (updateHiveTable) {
metaClients.run(client -> {
EnvironmentContext envContext = new EnvironmentContext(
Expand Down Expand Up @@ -286,12 +358,6 @@ private void setHmsTableParameters(String newMetadataLocation, Table tbl, Map<St
parameters = new HashMap<>();
}

// unset existing PDT properties to make sure we pick new values
List<String> existingPdtParams = parameters.keySet().stream()
.filter(key -> key.startsWith("pdt.") || key.startsWith("spark.sql.sources."))
.collect(Collectors.toList());
existingPdtParams.forEach(parameters::remove);

// push all Iceberg table properties into HMS
icebergTableProps.forEach(parameters::put);

Expand Down Expand Up @@ -350,7 +416,8 @@ private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hive
return storageDescriptor;
}

private long acquireLock() throws UnknownHostException, TException, InterruptedException {
@VisibleForTesting
long acquireLock() throws UnknownHostException, TException, InterruptedException {
final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
lockComponent.setTablename(tableName);
final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent),
Expand Down Expand Up @@ -416,10 +483,10 @@ private long acquireLock() throws UnknownHostException, TException, InterruptedE
return lockId;
}

private void cleanupMetadataAndUnlock(boolean errorThrown, String metadataLocation, Optional<Long> lockId) {
private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional<Long> lockId) {
try {
if (errorThrown) {
// if anything went wrong, clean up the uncommitted metadata file
if (commitStatus == CommitStatus.FAILURE) {
// If we are sure the commit failed, clean up the uncommitted metadata file
io().deleteFile(metadataLocation);
}
} catch (RuntimeException e) {
Expand All @@ -440,8 +507,8 @@ private void unlock(Optional<Long> lockId) {
}
}

// visible for testing
protected void doUnlock(long lockId) throws TException, InterruptedException {
@VisibleForTesting
void doUnlock(long lockId) throws TException, InterruptedException {
metaClients.run(client -> {
client.unlock(lockId);
return null;
Expand Down
Loading

0 comments on commit 08d4945

Please sign in to comment.