From 2568c61857946248f58b8fa0d385c3d7ee635980 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Mon, 25 Oct 2021 16:03:41 +0200 Subject: [PATCH] Guard Hive's OPTIMIZE table procedure with session property OPTIMIZE procedure is disabled by default; even though code is written in a way to avoid data loss, calling procedure is inherently unsafe due to non transactional nature of committing changes done to Hive table. If Trino looses connectivity to HDFS cluster while deleting post-optimize data files duplicate rows will be left in table and manual cleanup from user will be required. --- .../io/trino/plugin/hive/HiveMetadata.java | 8 +++++ .../plugin/hive/HiveSessionProperties.java | 11 +++++++ .../plugin/hive/TestHiveConnectorTest.java | 32 ++++++++++++++----- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 703cd3ba24d9..80de7b20f717 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -179,6 +179,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_VIEW_TRANSLATION_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues; +import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED; import static io.trino.plugin.hive.HiveSessionProperties.getCompressionCodec; import static io.trino.plugin.hive.HiveSessionProperties.getHiveStorageFormat; import static io.trino.plugin.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior; @@ -188,6 +189,7 @@ import static io.trino.plugin.hive.HiveSessionProperties.isCollectColumnStatisticsOnWrite; import static io.trino.plugin.hive.HiveSessionProperties.isCreateEmptyBucketFiles; import static io.trino.plugin.hive.HiveSessionProperties.isDelegateTransactionalManagedTableLocationToMetastore; +import static io.trino.plugin.hive.HiveSessionProperties.isNonTransactionalOptimizeEnabled; import static io.trino.plugin.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount; import static io.trino.plugin.hive.HiveSessionProperties.isParallelPartitionedBucketedWrites; import static io.trino.plugin.hive.HiveSessionProperties.isProjectionPushdownEnabled; @@ -1938,6 +1940,12 @@ public Optional getTableHandleForExecute(ConnectorS private Optional getTableHandleForOptimize(ConnectorSession session, ConnectorTableHandle tableHandle, Map executeProperties) { // TODO lots of that is copied from beginInsert; rafactoring opportunity + if (!isNonTransactionalOptimizeEnabled(session)) { + // OPTIMIZE procedure is disabled by default; even though code is written in a way to avoid data loss, calling procedure is inherently + // unsafe due to non transactional nature of committing changes done to Hive table. If Trino looses connectivity to HDFS cluster while deleting + // post-optimize data files duplicate rows will be left in table and manual cleanup from user will be required. + throw new TrinoException(NOT_SUPPORTED, "OPTIMIZE procedure must be explicitly enabled via " + NON_TRANSACTIONAL_OPTIMIZE_ENABLED + " session property"); + } HiveIdentity identity = new HiveIdentity(session); HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle; diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java index f1c83bcdbb15..33dc135e4640 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java @@ -113,6 +113,7 @@ public final class HiveSessionProperties private static final String LEGACY_HIVE_VIEW_TRANSLATION = "legacy_hive_view_translation"; public static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled"; public static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; + public static final String NON_TRANSACTIONAL_OPTIMIZE_ENABLED = "non_transactional_optimize_enabled"; private final List> sessionProperties; @@ -476,6 +477,11 @@ public HiveSessionProperties( throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be > 0 and <= 1.0: %s", MINIMUM_ASSIGNED_SPLIT_WEIGHT, value)); } }, + false), + booleanProperty( + NON_TRANSACTIONAL_OPTIMIZE_ENABLED, + "Enable OPTIMIZE table procedure", + false, false)); } @@ -793,4 +799,9 @@ public static double getMinimumAssignedSplitWeight(ConnectorSession session) { return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class); } + + public static boolean isNonTransactionalOptimizeEnabled(ConnectorSession session) + { + return session.getProperty(NON_TRANSACTIONAL_OPTIMIZE_ENABLED, Boolean.class); + } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java index 5e46902caea9..cc47471580c4 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConnectorTest.java @@ -7766,7 +7766,15 @@ public void testOptimize() Set initialFiles = getTableFiles(tableName); assertThat(initialFiles).hasSize(10); - assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"); + // OPTIMIZE must be explicitly enabled + assertThatThrownBy(() -> computeActual("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')")) + .hasMessage("OPTIMIZE procedure must be explicitly enabled via non_transactional_optimize_enabled session property"); + assertNationNTimes(tableName, 10); + assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles); + + Session optimizeEnabledSession = optimizeEnabledSession(); + + assertUpdate(optimizeEnabledSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"); assertNationNTimes(tableName, 10); Set compactedFiles = getTableFiles(tableName); @@ -7775,7 +7783,7 @@ public void testOptimize() assertThat(intersection(initialFiles, compactedFiles)).isEmpty(); // compact with low threshold; nothing should change - assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10B')"); + assertUpdate(optimizeEnabledSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10B')"); assertThat(getTableFiles(tableName)).hasSameElementsAs(compactedFiles); } @@ -7792,7 +7800,7 @@ public void testOptimizeWithWriterScaling() Set initialFiles = getTableFiles(tableName); assertThat(initialFiles).hasSize(4); - Session writerScalingSession = Session.builder(getSession()) + Session writerScalingSession = Session.builder(optimizeEnabledSession()) .setSystemProperty("scale_writers", "true") .setSystemProperty("writer_min_size", "100GB") .build(); @@ -7826,13 +7834,14 @@ public void testOptimizeWithPartitioning() Set initialFiles = getTableFiles(tableName); assertThat(initialFiles).hasSize(insertCount * partitionsCount); - Session writerScalingSession = Session.builder(getSession()) + Session optimizeEnabledSession = optimizeEnabledSession(); + Session writerScalingSession = Session.builder(optimizeEnabledSession) .setSystemProperty("scale_writers", "true") .setSystemProperty("writer_min_size", "100GB") .build(); // optimize with unsupported WHERE - assertThatThrownBy(() -> computeActual("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB') WHERE nationkey = 1")) + assertThatThrownBy(() -> computeActual(optimizeEnabledSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB') WHERE nationkey = 1")) .hasMessageContaining("Unexpected FilterNode found in plan; probably connector was not able to handle provided WHERE expression"); assertNationNTimes(tableName, insertCount); assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles); @@ -7878,7 +7887,7 @@ public void testOptimizeWithBucketing() assertNationNTimes(tableName, insertCount); Set initialFiles = getTableFiles(tableName); - assertThatThrownBy(() -> computeActual("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')")) + assertThatThrownBy(() -> computeActual(optimizeEnabledSession(), "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')")) .hasMessageMatching("Optimizing bucketed Hive table .* is not supported"); assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles); @@ -7888,7 +7897,7 @@ public void testOptimizeWithBucketing() @Test public void testOptimizeHiveInformationSchema() { - assertThatThrownBy(() -> computeActual("ALTER TABLE information_schema.tables EXECUTE optimize(file_size_threshold => '10kB')")) + assertThatThrownBy(() -> computeActual(optimizeEnabledSession(), "ALTER TABLE information_schema.tables EXECUTE optimize(file_size_threshold => '10kB')")) .hasMessage("This connector does not support table procedures"); } @@ -7900,10 +7909,17 @@ public void testOptimizeHiveSystemTable() assertQuery("SELECT count(*) FROM " + tableName, "SELECT 0"); - assertThatThrownBy(() -> computeActual(format("ALTER TABLE \"%s$partitions\" EXECUTE optimize(file_size_threshold => '10kB')", tableName))) + assertThatThrownBy(() -> computeActual(optimizeEnabledSession(), format("ALTER TABLE \"%s$partitions\" EXECUTE optimize(file_size_threshold => '10kB')", tableName))) .hasMessage("This connector does not support table procedures"); } + private Session optimizeEnabledSession() + { + return Session.builder(getSession()) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "non_transactional_optimize_enabled", "true") + .build(); + } + private void insertNationNTimes(String tableName, int times) { assertUpdate("INSERT INTO " + tableName + "(nationkey, name, regionkey, comment) " + join(" UNION ALL ", nCopies(times, "SELECT * FROM tpch.sf1.nation")), times * 25);