Skip to content

Commit

Permalink
[HUDI-4354] Add --force-empty-sync flag to deltastreamer (apache#6027)
Browse files Browse the repository at this point in the history
  • Loading branch information
qjqqyy authored and fengjian committed Apr 5, 2023
1 parent 805e769 commit 890e144
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
scheduledCompactionInstant = writeClient.scheduleCompaction(Option.empty());
}

if (!isEmpty) {
if (!isEmpty || cfg.forceEmptyMetaSync) {
runMetaSync();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ public static class Config implements Serializable {
@Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
public Boolean enableMetaSync = false;

@Parameter(names = {"--force-empty-sync"}, description = "Force syncing meta even on empty commit")
public Boolean forceEmptyMetaSync = false;

@Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
public String syncClientToolClassNames = HiveSyncTool.class.getName();

Expand Down Expand Up @@ -443,6 +446,7 @@ public boolean equals(Object o) {
&& Objects.equals(filterDupes, config.filterDupes)
&& Objects.equals(enableHiveSync, config.enableHiveSync)
&& Objects.equals(enableMetaSync, config.enableMetaSync)
&& Objects.equals(forceEmptyMetaSync, config.forceEmptyMetaSync)
&& Objects.equals(syncClientToolClassNames, config.syncClientToolClassNames)
&& Objects.equals(maxPendingCompactions, config.maxPendingCompactions)
&& Objects.equals(maxPendingClustering, config.maxPendingClustering)
Expand All @@ -468,7 +472,7 @@ public int hashCode() {
baseFileFormat, propsFilePath, configs, sourceClassName,
sourceOrderingField, payloadClassName, schemaProviderClassName,
transformerClassNames, sourceLimit, operation, filterDupes,
enableHiveSync, enableMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering,
enableHiveSync, enableMetaSync, forceEmptyMetaSync, syncClientToolClassNames, maxPendingCompactions, maxPendingClustering,
continuousMode, minSyncIntervalSeconds, sparkMaster, commitOnErrors,
deltaSyncSchedulingWeight, compactSchedulingWeight, clusterSchedulingWeight, deltaSyncSchedulingMinShare,
compactSchedulingMinShare, clusterSchedulingMinShare, forceDisableCompaction, checkpoint,
Expand All @@ -494,6 +498,7 @@ public String toString() {
+ ", filterDupes=" + filterDupes
+ ", enableHiveSync=" + enableHiveSync
+ ", enableMetaSync=" + enableMetaSync
+ ", forceEmptyMetaSync=" + forceEmptyMetaSync
+ ", syncClientToolClassNames=" + syncClientToolClassNames
+ ", maxPendingCompactions=" + maxPendingCompactions
+ ", maxPendingClustering=" + maxPendingClustering
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2169,6 +2169,27 @@ public void testDropPartitionColumns() throws Exception {
assertFalse(tableFields.contains("partition_path"));
}

@Test
public void testForceEmptyMetaSync() throws Exception {
String tableBasePath = dfsBasePath + "/test_force_empty_meta_sync";

HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
cfg.sourceLimit = 0;
cfg.allowCommitOnNoCheckpointChange = true;
cfg.enableMetaSync = true;
cfg.forceEmptyMetaSync = true;

new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
TestHelpers.assertRecordCount(0, tableBasePath, sqlContext);

// make sure hive table is present
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
hiveSyncConfig.setHadoopConf(hiveServer.getHiveConf());
HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig);
final String tableName = hiveSyncConfig.getString(META_SYNC_TABLE_NAME);
assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " should exist");
}

class TestDeltaSync extends DeltaSync {

public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,
Expand Down

0 comments on commit 890e144

Please sign in to comment.