Skip to content

Commit

Permalink
Guard Hive's OPTIMIZE table procedure with session property
Browse files Browse the repository at this point in the history
OPTIMIZE procedure is disabled by default; even
though code is written in a way to avoid data loss, calling procedure
is inherently unsafe due to non transactional nature of
committing changes done to Hive table. If Trino looses connectivity to
HDFS cluster while deleting post-optimize data files duplicate rows will be
left in table and manual cleanup from user will be required.
  • Loading branch information
losipiuk committed Oct 25, 2021
1 parent 8f767f2 commit 2568c61
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_VIEW_TRANSLATION_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues;
import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED;
import static io.trino.plugin.hive.HiveSessionProperties.getCompressionCodec;
import static io.trino.plugin.hive.HiveSessionProperties.getHiveStorageFormat;
import static io.trino.plugin.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior;
Expand All @@ -188,6 +189,7 @@
import static io.trino.plugin.hive.HiveSessionProperties.isCollectColumnStatisticsOnWrite;
import static io.trino.plugin.hive.HiveSessionProperties.isCreateEmptyBucketFiles;
import static io.trino.plugin.hive.HiveSessionProperties.isDelegateTransactionalManagedTableLocationToMetastore;
import static io.trino.plugin.hive.HiveSessionProperties.isNonTransactionalOptimizeEnabled;
import static io.trino.plugin.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount;
import static io.trino.plugin.hive.HiveSessionProperties.isParallelPartitionedBucketedWrites;
import static io.trino.plugin.hive.HiveSessionProperties.isProjectionPushdownEnabled;
Expand Down Expand Up @@ -1938,6 +1940,12 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(ConnectorS
private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Object> executeProperties)
{
// TODO lots of that is copied from beginInsert; rafactoring opportunity
if (!isNonTransactionalOptimizeEnabled(session)) {
// OPTIMIZE procedure is disabled by default; even though code is written in a way to avoid data loss, calling procedure is inherently
// unsafe due to non transactional nature of committing changes done to Hive table. If Trino looses connectivity to HDFS cluster while deleting
// post-optimize data files duplicate rows will be left in table and manual cleanup from user will be required.
throw new TrinoException(NOT_SUPPORTED, "OPTIMIZE procedure must be explicitly enabled via " + NON_TRANSACTIONAL_OPTIMIZE_ENABLED + " session property");
}

HiveIdentity identity = new HiveIdentity(session);
HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public final class HiveSessionProperties
private static final String LEGACY_HIVE_VIEW_TRANSLATION = "legacy_hive_view_translation";
public static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled";
public static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";
public static final String NON_TRANSACTIONAL_OPTIMIZE_ENABLED = "non_transactional_optimize_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -476,6 +477,11 @@ public HiveSessionProperties(
throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be > 0 and <= 1.0: %s", MINIMUM_ASSIGNED_SPLIT_WEIGHT, value));
}
},
false),
booleanProperty(
NON_TRANSACTIONAL_OPTIMIZE_ENABLED,
"Enable OPTIMIZE table procedure",
false,
false));
}

Expand Down Expand Up @@ -793,4 +799,9 @@ public static double getMinimumAssignedSplitWeight(ConnectorSession session)
{
return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class);
}

public static boolean isNonTransactionalOptimizeEnabled(ConnectorSession session)
{
return session.getProperty(NON_TRANSACTIONAL_OPTIMIZE_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7766,7 +7766,15 @@ public void testOptimize()
Set<String> initialFiles = getTableFiles(tableName);
assertThat(initialFiles).hasSize(10);

assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')");
// OPTIMIZE must be explicitly enabled
assertThatThrownBy(() -> computeActual("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"))
.hasMessage("OPTIMIZE procedure must be explicitly enabled via non_transactional_optimize_enabled session property");
assertNationNTimes(tableName, 10);
assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles);

Session optimizeEnabledSession = optimizeEnabledSession();

assertUpdate(optimizeEnabledSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')");
assertNationNTimes(tableName, 10);

Set<String> compactedFiles = getTableFiles(tableName);
Expand All @@ -7775,7 +7783,7 @@ public void testOptimize()
assertThat(intersection(initialFiles, compactedFiles)).isEmpty();

// compact with low threshold; nothing should change
assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10B')");
assertUpdate(optimizeEnabledSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10B')");

assertThat(getTableFiles(tableName)).hasSameElementsAs(compactedFiles);
}
Expand All @@ -7792,7 +7800,7 @@ public void testOptimizeWithWriterScaling()
Set<String> initialFiles = getTableFiles(tableName);
assertThat(initialFiles).hasSize(4);

Session writerScalingSession = Session.builder(getSession())
Session writerScalingSession = Session.builder(optimizeEnabledSession())
.setSystemProperty("scale_writers", "true")
.setSystemProperty("writer_min_size", "100GB")
.build();
Expand Down Expand Up @@ -7826,13 +7834,14 @@ public void testOptimizeWithPartitioning()
Set<String> initialFiles = getTableFiles(tableName);
assertThat(initialFiles).hasSize(insertCount * partitionsCount);

Session writerScalingSession = Session.builder(getSession())
Session optimizeEnabledSession = optimizeEnabledSession();
Session writerScalingSession = Session.builder(optimizeEnabledSession)
.setSystemProperty("scale_writers", "true")
.setSystemProperty("writer_min_size", "100GB")
.build();

// optimize with unsupported WHERE
assertThatThrownBy(() -> computeActual("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB') WHERE nationkey = 1"))
assertThatThrownBy(() -> computeActual(optimizeEnabledSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB') WHERE nationkey = 1"))
.hasMessageContaining("Unexpected FilterNode found in plan; probably connector was not able to handle provided WHERE expression");
assertNationNTimes(tableName, insertCount);
assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles);
Expand Down Expand Up @@ -7878,7 +7887,7 @@ public void testOptimizeWithBucketing()
assertNationNTimes(tableName, insertCount);
Set<String> initialFiles = getTableFiles(tableName);

assertThatThrownBy(() -> computeActual("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"))
assertThatThrownBy(() -> computeActual(optimizeEnabledSession(), "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"))
.hasMessageMatching("Optimizing bucketed Hive table .* is not supported");

assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles);
Expand All @@ -7888,7 +7897,7 @@ public void testOptimizeWithBucketing()
@Test
public void testOptimizeHiveInformationSchema()
{
assertThatThrownBy(() -> computeActual("ALTER TABLE information_schema.tables EXECUTE optimize(file_size_threshold => '10kB')"))
assertThatThrownBy(() -> computeActual(optimizeEnabledSession(), "ALTER TABLE information_schema.tables EXECUTE optimize(file_size_threshold => '10kB')"))
.hasMessage("This connector does not support table procedures");
}

Expand All @@ -7900,10 +7909,17 @@ public void testOptimizeHiveSystemTable()

assertQuery("SELECT count(*) FROM " + tableName, "SELECT 0");

assertThatThrownBy(() -> computeActual(format("ALTER TABLE \"%s$partitions\" EXECUTE optimize(file_size_threshold => '10kB')", tableName)))
assertThatThrownBy(() -> computeActual(optimizeEnabledSession(), format("ALTER TABLE \"%s$partitions\" EXECUTE optimize(file_size_threshold => '10kB')", tableName)))
.hasMessage("This connector does not support table procedures");
}

private Session optimizeEnabledSession()
{
return Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "non_transactional_optimize_enabled", "true")
.build();
}

private void insertNationNTimes(String tableName, int times)
{
assertUpdate("INSERT INTO " + tableName + "(nationkey, name, regionkey, comment) " + join(" UNION ALL ", nCopies(times, "SELECT * FROM tpch.sf1.nation")), times * 25);
Expand Down

0 comments on commit 2568c61

Please sign in to comment.