Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard Hive's OPTIMIZE table procedure with session property #9761

Merged
merged 1 commit into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_VIEW_TRANSLATION_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues;
import static io.trino.plugin.hive.HiveSessionProperties.NON_TRANSACTIONAL_OPTIMIZE_ENABLED;
import static io.trino.plugin.hive.HiveSessionProperties.getCompressionCodec;
import static io.trino.plugin.hive.HiveSessionProperties.getHiveStorageFormat;
import static io.trino.plugin.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior;
Expand All @@ -188,6 +189,7 @@
import static io.trino.plugin.hive.HiveSessionProperties.isCollectColumnStatisticsOnWrite;
import static io.trino.plugin.hive.HiveSessionProperties.isCreateEmptyBucketFiles;
import static io.trino.plugin.hive.HiveSessionProperties.isDelegateTransactionalManagedTableLocationToMetastore;
import static io.trino.plugin.hive.HiveSessionProperties.isNonTransactionalOptimizeEnabled;
import static io.trino.plugin.hive.HiveSessionProperties.isOptimizedMismatchedBucketCount;
import static io.trino.plugin.hive.HiveSessionProperties.isParallelPartitionedBucketedWrites;
import static io.trino.plugin.hive.HiveSessionProperties.isProjectionPushdownEnabled;
Expand Down Expand Up @@ -1938,6 +1940,12 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(ConnectorS
private Optional<ConnectorTableExecuteHandle> getTableHandleForOptimize(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Object> executeProperties)
{
// TODO lots of that is copied from beginInsert; rafactoring opportunity
if (!isNonTransactionalOptimizeEnabled(session)) {
// OPTIMIZE procedure is disabled by default; even though code is written in a way to avoid data loss, calling procedure is inherently
// unsafe due to non transactional nature of committing changes done to Hive table. If Trino looses connectivity to HDFS cluster while deleting
// post-optimize data files duplicate rows will be left in table and manual cleanup from user will be required.
throw new TrinoException(NOT_SUPPORTED, "OPTIMIZE procedure must be explicitly enabled via " + NON_TRANSACTIONAL_OPTIMIZE_ENABLED + " session property");
}

HiveIdentity identity = new HiveIdentity(session);
HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public final class HiveSessionProperties
private static final String LEGACY_HIVE_VIEW_TRANSLATION = "legacy_hive_view_translation";
public static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled";
public static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight";
public static final String NON_TRANSACTIONAL_OPTIMIZE_ENABLED = "non_transactional_optimize_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -476,6 +477,11 @@ public HiveSessionProperties(
throw new TrinoException(INVALID_SESSION_PROPERTY, format("%s must be > 0 and <= 1.0: %s", MINIMUM_ASSIGNED_SPLIT_WEIGHT, value));
}
},
false),
booleanProperty(
NON_TRANSACTIONAL_OPTIMIZE_ENABLED,
"Enable OPTIMIZE table procedure",
false,
false));
}

Expand Down Expand Up @@ -793,4 +799,9 @@ public static double getMinimumAssignedSplitWeight(ConnectorSession session)
{
return session.getProperty(MINIMUM_ASSIGNED_SPLIT_WEIGHT, Double.class);
}

public static boolean isNonTransactionalOptimizeEnabled(ConnectorSession session)
{
return session.getProperty(NON_TRANSACTIONAL_OPTIMIZE_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7766,7 +7766,15 @@ public void testOptimize()
Set<String> initialFiles = getTableFiles(tableName);
assertThat(initialFiles).hasSize(10);

assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')");
// OPTIMIZE must be explicitly enabled
assertThatThrownBy(() -> computeActual("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"))
.hasMessage("OPTIMIZE procedure must be explicitly enabled via non_transactional_optimize_enabled session property");
assertNationNTimes(tableName, 10);
assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles);

Session optimizeEnabledSession = optimizeEnabledSession();

assertUpdate(optimizeEnabledSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')");
assertNationNTimes(tableName, 10);

Set<String> compactedFiles = getTableFiles(tableName);
Expand All @@ -7775,7 +7783,7 @@ public void testOptimize()
assertThat(intersection(initialFiles, compactedFiles)).isEmpty();

// compact with low threshold; nothing should change
assertUpdate("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10B')");
assertUpdate(optimizeEnabledSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10B')");

assertThat(getTableFiles(tableName)).hasSameElementsAs(compactedFiles);
}
Expand All @@ -7792,7 +7800,7 @@ public void testOptimizeWithWriterScaling()
Set<String> initialFiles = getTableFiles(tableName);
assertThat(initialFiles).hasSize(4);

Session writerScalingSession = Session.builder(getSession())
Session writerScalingSession = Session.builder(optimizeEnabledSession())
.setSystemProperty("scale_writers", "true")
.setSystemProperty("writer_min_size", "100GB")
.build();
Expand Down Expand Up @@ -7826,13 +7834,14 @@ public void testOptimizeWithPartitioning()
Set<String> initialFiles = getTableFiles(tableName);
assertThat(initialFiles).hasSize(insertCount * partitionsCount);

Session writerScalingSession = Session.builder(getSession())
Session optimizeEnabledSession = optimizeEnabledSession();
Session writerScalingSession = Session.builder(optimizeEnabledSession)
.setSystemProperty("scale_writers", "true")
.setSystemProperty("writer_min_size", "100GB")
.build();

// optimize with unsupported WHERE
assertThatThrownBy(() -> computeActual("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB') WHERE nationkey = 1"))
assertThatThrownBy(() -> computeActual(optimizeEnabledSession, "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB') WHERE nationkey = 1"))
.hasMessageContaining("Unexpected FilterNode found in plan; probably connector was not able to handle provided WHERE expression");
assertNationNTimes(tableName, insertCount);
assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles);
Expand Down Expand Up @@ -7878,7 +7887,7 @@ public void testOptimizeWithBucketing()
assertNationNTimes(tableName, insertCount);
Set<String> initialFiles = getTableFiles(tableName);

assertThatThrownBy(() -> computeActual("ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"))
assertThatThrownBy(() -> computeActual(optimizeEnabledSession(), "ALTER TABLE " + tableName + " EXECUTE optimize(file_size_threshold => '10kB')"))
.hasMessageMatching("Optimizing bucketed Hive table .* is not supported");

assertThat(getTableFiles(tableName)).hasSameElementsAs(initialFiles);
Expand All @@ -7888,7 +7897,7 @@ public void testOptimizeWithBucketing()
@Test
public void testOptimizeHiveInformationSchema()
{
assertThatThrownBy(() -> computeActual("ALTER TABLE information_schema.tables EXECUTE optimize(file_size_threshold => '10kB')"))
assertThatThrownBy(() -> computeActual(optimizeEnabledSession(), "ALTER TABLE information_schema.tables EXECUTE optimize(file_size_threshold => '10kB')"))
.hasMessage("This connector does not support table procedures");
}

Expand All @@ -7900,10 +7909,17 @@ public void testOptimizeHiveSystemTable()

assertQuery("SELECT count(*) FROM " + tableName, "SELECT 0");

assertThatThrownBy(() -> computeActual(format("ALTER TABLE \"%s$partitions\" EXECUTE optimize(file_size_threshold => '10kB')", tableName)))
assertThatThrownBy(() -> computeActual(optimizeEnabledSession(), format("ALTER TABLE \"%s$partitions\" EXECUTE optimize(file_size_threshold => '10kB')", tableName)))
.hasMessage("This connector does not support table procedures");
}

private Session optimizeEnabledSession()
{
return Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "non_transactional_optimize_enabled", "true")
.build();
}

private void insertNationNTimes(String tableName, int times)
{
assertUpdate("INSERT INTO " + tableName + "(nationkey, name, regionkey, comment) " + join(" UNION ALL ", nCopies(times, "SELECT * FROM tpch.sf1.nation")), times * 25);
Expand Down