diff --git a/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java b/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java new file mode 100644 index 000000000000..d6e36f5031c2 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/exceptions/CommitStateUnknownException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.exceptions; + +/** + * Exception for a failure to confirm either affirmatively or negatively that a commit was applied. The client + * cannot take any further action without possibly corrupting the table. + */ +public class CommitStateUnknownException extends RuntimeException { + + private static final String COMMON_INFO = + "Cannot determine whether the commit was successful or not, the underlying data files may or " + + "may not be needed. Manual intervention via the Remove Orphan Files Action can remove these " + + "files when a connection to the Catalog can be re-established if the commit was actually unsuccessful.\n" + + "Please check to see whether or not your commit was successful before retrying this commit. Retrying " + + "an already successful operation will result in duplicate records or unintentional modifications.\n" + + "At this time no files will be deleted including possibly unused manifest lists."; + + public CommitStateUnknownException(Throwable cause) { + super(cause.getMessage() + "\n" + COMMON_INFO, cause); + } +} diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 791871b810dc..3a179f99fbe4 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -32,6 +32,7 @@ import java.util.function.Consumer; import org.apache.iceberg.events.Listeners; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -299,6 +300,8 @@ public void commit() { taskOps.commit(base, updated.withUUID()); }); + } catch (CommitStateUnknownException commitStateUnknownException) { + throw commitStateUnknownException; } catch (RuntimeException e) { Exceptions.suppressAndThrow(e, this::cleanAll); } diff --git a/core/src/main/java/org/apache/iceberg/TableOperations.java b/core/src/main/java/org/apache/iceberg/TableOperations.java index 98184231f631..3648b1211ba3 100644 --- a/core/src/main/java/org/apache/iceberg/TableOperations.java +++ b/core/src/main/java/org/apache/iceberg/TableOperations.java @@ -52,6 +52,13 @@ public interface TableOperations { * Implementations must check that the base metadata is current to avoid overwriting updates. * Once the atomic commit operation succeeds, implementations must not perform any operations that * may fail because failure in this method cannot be distinguished from commit failure. + *

+ * Implementations must throw a {@link org.apache.iceberg.exceptions.CommitStateUnknownException} + * in cases where it cannot be determined if the commit succeeded or failed. + * For example if a network partition causes the confirmation of the commit to be lost, + * the implementation should throw a CommitStateUnknownException. This is important because downstream users of + * this API need to know whether they can clean up the commit or not, if the state is unknown then it is not safe + * to remove any files. All other exceptions will be treated as if the commit has failed. * * @param base table metadata on which changes were based * @param metadata new table metadata with updates diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 07e066c28c78..bdc632b89602 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -36,6 +36,9 @@ private TableProperties() { public static final String COMMIT_TOTAL_RETRY_TIME_MS = "commit.retry.total-timeout-ms"; public static final int COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT = 1800000; // 30 minutes + public static final String COMMIT_NUM_STATUS_CHECKS = "commit.num-status-checks"; + public static final int COMMIT_NUM_STATUS_CHECKS_DEFAULT = 3; + public static final String MANIFEST_TARGET_SIZE_BYTES = "commit.manifest.target-size-bytes"; public static final long MANIFEST_TARGET_SIZE_BYTES_DEFAULT = 8388608; // 8 MB diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 2a820061064f..ce05a895cd27 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -54,17 +54,23 @@ import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchIcebergTableException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.ConfigProperties; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_STATUS_CHECKS_DEFAULT; + /** * TODO we should be able to extract some more commonalities to BaseMetastoreTableOperations to * avoid code duplication between this class and Metacat Tables. @@ -72,6 +78,8 @@ public class HiveTableOperations extends BaseMetastoreTableOperations { private static final Logger LOG = LoggerFactory.getLogger(HiveTableOperations.class); + private static final int COMMIT_STATUS_CHECK_WAIT_MS = 1000; + private static final String HIVE_ACQUIRE_LOCK_TIMEOUT_MS = "iceberg.hive.lock-timeout-ms"; private static final String HIVE_LOCK_CHECK_MIN_WAIT_MS = "iceberg.hive.lock-check-min-wait-ms"; private static final String HIVE_LOCK_CHECK_MAX_WAIT_MS = "iceberg.hive.lock-check-max-wait-ms"; @@ -91,6 +99,12 @@ private static class WaitingForLockException extends RuntimeException { } } + private enum CommitStatus { + FAILURE, + SUCCESS, + UNKNOWN + } + private final HiveClientPool metaClients; private final String fullName; private final String database; @@ -153,12 +167,13 @@ protected void doRefresh() { refreshFromMetadataLocation(metadataLocation); } + @SuppressWarnings("checkstyle:CyclomaticComplexity") @Override protected void doCommit(TableMetadata base, TableMetadata metadata) { String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf); - boolean threw = true; + CommitStatus commitStatus = CommitStatus.FAILURE; boolean updateHiveTable = false; Optional lockId = Optional.empty(); try { @@ -203,8 +218,23 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { .orElseGet(ImmutableMap::of); setHmsTableParameters(newMetadataLocation, tbl, metadata.properties(), removedProps, hiveEngineEnabled, summary); - persistTable(tbl, updateHiveTable); - threw = false; + try { + persistTable(tbl, updateHiveTable); + commitStatus = CommitStatus.SUCCESS; + } catch (Throwable persistFailure) { + LOG.error("Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.", + database, tableName, persistFailure); + commitStatus = checkCommitStatus(newMetadataLocation, metadata); + switch (commitStatus) { + case SUCCESS: + break; + case FAILURE: + throw persistFailure; + case UNKNOWN: + throw new CommitStateUnknownException(persistFailure); + } + } + } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException e) { throw new AlreadyExistsException("Table already exists: %s.%s", database, tableName); @@ -222,11 +252,56 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { throw new RuntimeException("Interrupted during commit", e); } finally { - cleanupMetadataAndUnlock(threw, newMetadataLocation, lockId); + cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lockId); + } + } + + /** + * Attempt to load the table and see if any current or past metadata location matches the one we were attempting + * to set. This is used as a last resort when we are dealing with exceptions that may indicate the commit has + * failed but are not proof that this is the case. Past locations must also be searched on the chance that a second + * committer was able to successfully commit on top of our commit. + * + * @param newMetadataLocation the path of the new commit file + * @param config metadata to use for configuration + * @return Commit Status of Success, Failure or Unknown + */ + private CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) { + int maxAttempts = PropertyUtil.propertyAsInt(config.properties(), COMMIT_NUM_STATUS_CHECKS, + COMMIT_NUM_STATUS_CHECKS_DEFAULT); + + AtomicReference status = new AtomicReference<>(CommitStatus.UNKNOWN); + + Tasks.foreach(newMetadataLocation) + .retry(maxAttempts) + .suppressFailureWhenFinished() + .exponentialBackoff(COMMIT_STATUS_CHECK_WAIT_MS, COMMIT_STATUS_CHECK_WAIT_MS, Long.MAX_VALUE, 2.0) + .onFailure((location, checkException) -> + LOG.error("Cannot check if commit to {}.{} exists.", database, tableName, checkException)) + .run(location -> { + TableMetadata metadata = refresh(); + String currentMetadataLocation = metadata.metadataFileLocation(); + boolean commitSuccess = currentMetadataLocation.equals(newMetadataLocation) || + metadata.previousFiles().stream().anyMatch(log -> log.file().equals(newMetadataLocation)); + if (commitSuccess) { + LOG.info("Commit status check: Commit to {}.{} of {} succeeded", database, tableName, newMetadataLocation); + status.set(CommitStatus.SUCCESS); + } else { + LOG.info("Commit status check: Commit to {}.{} of {} failed", database, tableName, newMetadataLocation); + status.set(CommitStatus.FAILURE); + } + }); + + if (status.get() == CommitStatus.UNKNOWN) { + LOG.error("Cannot determine commit state to {}.{}. Failed during checking {} times. " + + "Treating commit state as unknown.", + database, tableName, maxAttempts); } + return status.get(); } - private void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException { + @VisibleForTesting + void persistTable(Table hmsTable, boolean updateHiveTable) throws TException, InterruptedException { if (updateHiveTable) { metaClients.run(client -> { EnvironmentContext envContext = new EnvironmentContext( @@ -335,7 +410,8 @@ private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hive return storageDescriptor; } - private long acquireLock() throws UnknownHostException, TException, InterruptedException { + @VisibleForTesting + long acquireLock() throws UnknownHostException, TException, InterruptedException { final LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database); lockComponent.setTablename(tableName); final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent), @@ -401,10 +477,10 @@ private long acquireLock() throws UnknownHostException, TException, InterruptedE return lockId; } - private void cleanupMetadataAndUnlock(boolean errorThrown, String metadataLocation, Optional lockId) { + private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation, Optional lockId) { try { - if (errorThrown) { - // if anything went wrong, clean up the uncommitted metadata file + if (commitStatus == CommitStatus.FAILURE) { + // If we are sure the commit failed, clean up the uncommitted metadata file io().deleteFile(metadataLocation); } } catch (RuntimeException e) { @@ -425,8 +501,8 @@ private void unlock(Optional lockId) { } } - // visible for testing - protected void doUnlock(long lockId) throws TException, InterruptedException { + @VisibleForTesting + void doUnlock(long lockId) throws TException, InterruptedException { metaClients.run(client -> { client.unlock(lockId); return null; diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java index 81ac5043ae4f..bb1b0332e120 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCommits.java @@ -19,17 +19,26 @@ package org.apache.iceberg.hive; +import java.io.File; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.types.Types; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; public class TestHiveCommits extends HiveTableBaseTest { @@ -66,4 +75,248 @@ public void testSuppressUnlockExceptions() throws TException, InterruptedExcepti // the commit must succeed Assert.assertEquals(1, ops.current().schema().columns().size()); } + + /** + * Pretends we throw an error while persisting that actually fails to commit serverside + */ + @Test + public void testThriftExceptionFailureOnCommit() throws TException, InterruptedException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + + table.updateSchema() + .addColumn("n", Types.IntegerType.get()) + .commit(); + + ops.refresh(); + + TableMetadata metadataV2 = ops.current(); + + Assert.assertEquals(2, ops.current().schema().columns().size()); + + HiveTableOperations spyOps = spy(ops); + + failCommitAndThrowException(spyOps); + + AssertHelpers.assertThrows("We should rethrow generic runtime errors if the " + + "commit actually doesn't succeed", RuntimeException.class, "Metastore operation failed", + () -> spyOps.commit(metadataV2, metadataV1)); + + ops.refresh(); + Assert.assertEquals("Current metadata should not have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata should still exist", metadataFileExists(metadataV2)); + Assert.assertEquals("No new metadata files should exist", 2, metadataFileCount(ops.current())); + } + + /** + * Pretends we throw an error while persisting that actually does commit serverside + */ + @Test + public void testThriftExceptionSuccessOnCommit() throws TException, InterruptedException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + + table.updateSchema() + .addColumn("n", Types.IntegerType.get()) + .commit(); + + ops.refresh(); + + TableMetadata metadataV2 = ops.current(); + + Assert.assertEquals(2, ops.current().schema().columns().size()); + + HiveTableOperations spyOps = spy(ops); + + // Simulate a communication error after a successful commit + commitAndThrowException(ops, spyOps); + + // Shouldn't throw because the commit actually succeeds even though persistTable throws an exception + spyOps.commit(metadataV2, metadataV1); + + ops.refresh(); + Assert.assertNotEquals("Current metadata should have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + Assert.assertEquals("Commit should have been successful and new metadata file should be made", + 3, metadataFileCount(ops.current())); + } + + /** + * Pretends we throw an exception while persisting and don't know what happened, can't check to find out, + * but in reality the commit failed + */ + @Test + public void testThriftExceptionUnknownFailedCommit() throws TException, InterruptedException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + + table.updateSchema() + .addColumn("n", Types.IntegerType.get()) + .commit(); + + ops.refresh(); + + TableMetadata metadataV2 = ops.current(); + + Assert.assertEquals(2, ops.current().schema().columns().size()); + + HiveTableOperations spyOps = spy(ops); + + failCommitAndThrowException(spyOps); + breakFallbackCatalogCommitCheck(spyOps); + + AssertHelpers.assertThrows("Should throw CommitStateUnknownException since the catalog check was blocked", + CommitStateUnknownException.class, "Datacenter on fire", + () -> spyOps.commit(metadataV2, metadataV1)); + + ops.refresh(); + + Assert.assertEquals("Current metadata should not have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + Assert.assertEquals("Client could not determine outcome so new metadata file should also exist", + 3, metadataFileCount(ops.current())); + } + + /** + * Pretends we throw an exception while persisting and don't know what happened, can't check to find out, + * but in reality the commit succeeded + */ + @Test + public void testThriftExceptionsUnknownSuccessCommit() throws TException, InterruptedException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + + table.updateSchema() + .addColumn("n", Types.IntegerType.get()) + .commit(); + + ops.refresh(); + + TableMetadata metadataV2 = ops.current(); + + Assert.assertEquals(2, ops.current().schema().columns().size()); + + HiveTableOperations spyOps = spy(ops); + + commitAndThrowException(ops, spyOps); + breakFallbackCatalogCommitCheck(spyOps); + + AssertHelpers.assertThrows("Should throw CommitStateUnknownException since the catalog check was blocked", + CommitStateUnknownException.class, "Datacenter on fire", + () -> spyOps.commit(metadataV2, metadataV1)); + + ops.refresh(); + + Assert.assertFalse("Current metadata should have changed", ops.current().equals(metadataV2)); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + } + + /** + * Pretends we threw an exception while persisting, the commit succeeded, the lock expired, + * and a second committer placed a commit on top of ours before the first committer was able to check + * if their commit succeeded or not + * + * Timeline: + * Client 1 commits which throws an exception but suceeded + * Client 1's lock expires while waiting to do the recheck for commit success + * Client 2 acquires a lock, commits successfully on top of client 1's commit and release lock + * Client 1 check's to see if their commit was successful + * + * This tests to make sure a disconnected client 1 doesn't think their commit failed just because it isn't the + * current one during the recheck phase. + */ + @Test + public void testThriftExceptionConcurrentCommit() throws TException, InterruptedException, UnknownHostException { + Table table = catalog.loadTable(TABLE_IDENTIFIER); + HiveTableOperations ops = (HiveTableOperations) ((HasTableOperations) table).operations(); + + TableMetadata metadataV1 = ops.current(); + + table.updateSchema() + .addColumn("n", Types.IntegerType.get()) + .commit(); + + ops.refresh(); + + TableMetadata metadataV2 = ops.current(); + + Assert.assertEquals(2, ops.current().schema().columns().size()); + + HiveTableOperations spyOps = spy(ops); + + AtomicLong lockId = new AtomicLong(); + doAnswer(i -> { + lockId.set(ops.acquireLock()); + return lockId.get(); + }).when(spyOps).acquireLock(); + + concurrentCommitAndThrowException(ops, spyOps, table, lockId); + + /* + This commit and our concurrent commit should succeed even though this commit throws an exception + after the persist operation succeeds + */ + spyOps.commit(metadataV2, metadataV1); + + ops.refresh(); + Assert.assertNotEquals("Current metadata should have changed", metadataV2, ops.current()); + Assert.assertTrue("Current metadata file should still exist", metadataFileExists(ops.current())); + Assert.assertEquals("The column addition from the concurrent commit should have been successful", + 2, ops.current().schema().columns().size()); + } + + private void commitAndThrowException(HiveTableOperations realOperations, HiveTableOperations spyOperations) + throws TException, InterruptedException { + // Simulate a communication error after a successful commit + doAnswer(i -> { + org.apache.hadoop.hive.metastore.api.Table tbl = + i.getArgumentAt(0, org.apache.hadoop.hive.metastore.api.Table.class); + realOperations.persistTable(tbl, true); + throw new TException("Datacenter on fire"); + }).when(spyOperations).persistTable(any(), anyBoolean()); + } + + private void concurrentCommitAndThrowException(HiveTableOperations realOperations, HiveTableOperations spyOperations, + Table table, AtomicLong lockId) + throws TException, InterruptedException { + // Simulate a communication error after a successful commit + doAnswer(i -> { + org.apache.hadoop.hive.metastore.api.Table tbl = + i.getArgumentAt(0, org.apache.hadoop.hive.metastore.api.Table.class); + realOperations.persistTable(tbl, true); + // Simulate lock expiration or removal + realOperations.doUnlock(lockId.get()); + table.refresh(); + table.updateSchema().addColumn("newCol", Types.IntegerType.get()).commit(); + throw new TException("Datacenter on fire"); + }).when(spyOperations).persistTable(any(), anyBoolean()); + } + + private void failCommitAndThrowException(HiveTableOperations spyOperations) throws TException, InterruptedException { + doThrow(new TException("Datacenter on fire")) + .when(spyOperations) + .persistTable(any(), anyBoolean()); + } + + private void breakFallbackCatalogCommitCheck(HiveTableOperations spyOperations) { + when(spyOperations.refresh()) + .thenThrow(new RuntimeException("Still on fire")); // Failure on commit check + } + + private boolean metadataFileExists(TableMetadata metadata) { + return new File(metadata.metadataFileLocation().replace("file:", "")).exists(); + } + + private int metadataFileCount(TableMetadata metadata) { + return new File(metadata.metadataFileLocation().replace("file:", "")).getParentFile() + .listFiles(file -> file.getName().endsWith("metadata.json")).length; + } }